Skip to content

[FLINK-39410] The flink-cdc-pipeline-connector-elasticsearch module offers better compatibility with Flink 1.20 and Flink 2.x#4368

Open
sd4324530 wants to merge 3 commits intoapache:masterfrom
sd4324530:flink2-es
Open

[FLINK-39410] The flink-cdc-pipeline-connector-elasticsearch module offers better compatibility with Flink 1.20 and Flink 2.x#4368
sd4324530 wants to merge 3 commits intoapache:masterfrom
sd4324530:flink2-es

Conversation

@sd4324530
Copy link
Copy Markdown
Contributor

Currently, the flink-sql-connector-elasticsearchx component, which the flink-cdc-pipeline-connector-elasticsearch module depends on, is still compatible with Flink 1.17. This is unnecessary; compatibility with Flink 1.20 and Flink 2.2+ is sufficient.

4 new adapters:
AsyncSinkBaseAdapter
AsyncSinkWriterAdapter
StatefulSinkWriterAdapter
WriterInitContextAdapter

… compatibility with Flink 1.20 and Flink 2.x.

Signed-off-by: Pei Yu <125331682@qq.com>
@sd4324530
Copy link
Copy Markdown
Contributor Author

The CI error doesn't seem to be related to my PR, I ran the error use case locally and it works fine.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the Elasticsearch pipeline connector to target newer Flink versions (Flink 1.20 by default, and Flink 2.x via profile) by introducing a small compatibility layer around Flink’s async sink APIs and aligning the connector implementation with the newer flink-sql-connector-elasticsearchx artifacts.

Changes:

  • Add Flink 1.20 and Flink 2.2 compatibility adapters for async sink base/writer and init-context bridging.
  • Update the Elasticsearch 8 async sink/writer implementation to use the new adapter types and newer async sink callback API (ResultHandler).
  • Bump flink-sql-connector-elasticsearch6/7 dependency version for Flink 1.20 and add a flink2 Maven profile to switch to the Flink 2.x connector artifact line.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
flink-cdc-flink2-compat/.../AsyncSinkWriterAdapter.java Adds Flink 2.2 adapter to bridge Sink.InitContext to WriterInitContext for AsyncSinkWriter.
flink-cdc-flink2-compat/.../AsyncSinkBaseAdapter.java Adds Flink 2.2 adapter surface for restoring writers via restoreWriterAdapter.
flink-cdc-flink2-compat/.../WriterInitContextAdapter.java Implements a WriterInitContext wrapper around Sink.InitContext.
flink-cdc-flink2-compat/.../StatefulSinkWriterAdapter.java Adds a compat interface to represent a stateful writer in Flink 2.2.
flink-cdc-flink1-compat/.../AsyncSinkWriterAdapter.java Adds Flink 1.20 adapter wrapper for AsyncSinkWriter.
flink-cdc-flink1-compat/.../AsyncSinkBaseAdapter.java Adds Flink 1.20 adapter surface for writer restoration (currently has a compile issue).
flink-cdc-flink1-compat/.../WriterInitContextAdapter.java Implements WriterInitContext wrapper around Sink.InitContext for Flink 1.20.
flink-cdc-flink1-compat/.../StatefulSinkWriterAdapter.java Adds a Flink 1.20 compat interface for a stateful writer type.
.../Elasticsearch8AsyncWriter.java Migrates writer to adapter base + new ResultHandler callback style.
.../Elasticsearch8AsyncSink.java Migrates sink to adapter base and updates writer creation/restore paths for compatibility.
.../ElasticsearchEventSerializer.java Removes now-unneeded open(Sink.InitContext) override.
.../pom.xml Bumps Flink ES SQL connector version and adds a flink2 profile to switch artifact line.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +56 to +59
public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> restoreWriter(
WriterInitContext context,
Collection<BufferedRequestState<RequestEntryT>> recoveredState)
throws IOException {
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatefulSinkWriter is referenced in the restoreWriter(...) override but isn’t imported or qualified anywhere in this file. Given this module targets Flink 1.20 where the writer type may be StatefulSink.StatefulSinkWriter, this currently won’t compile. Fix by using the correct fully-qualified type (or importing the proper Flink 1.20 type) so the override matches AsyncSinkBase#restoreWriter’s signature.

Suggested change
public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> restoreWriter(
WriterInitContext context,
Collection<BufferedRequestState<RequestEntryT>> recoveredState)
throws IOException {
public org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter<
InputT, BufferedRequestState<RequestEntryT>>
restoreWriter(
WriterInitContext context,
Collection<BufferedRequestState<RequestEntryT>> recoveredState)
throws IOException {

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class will only compile with flink1.x, and the method signatures match.

@@ -166,13 +167,13 @@ private void handleFailedRequest(
numRecordsOutErrorsCounter.inc(requestEntries.size());

if (isRetryable(error.getCause())) {
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleFailedRequest(...) calls isRetryable(error.getCause()). If error.getCause() is null (which is possible for many exception types), this will lead to a NullPointerException inside isRetryable/FatalExceptionClassifier, masking the real failure. Pass a non-null throwable (e.g., error.getCause() != null ? error.getCause() : error) or update isRetryable to handle null safely.

Suggested change
if (isRetryable(error.getCause())) {
Throwable retryableError = error.getCause() != null ? error.getCause() : error;
if (isRetryable(retryableError)) {

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 81 to +88
/**
* Creates a new {@link StatefulSinkWriter} for writing elements to Elasticsearch.
*
* @param context the initialization context.
* @return a new instance of {@link Elasticsearch8AsyncWriter}.
*/
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
InitContext context) {
public SinkWriter<InputT> createWriter(InitContext context) {
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadocs for createWriter/restoreWriterAdapter still refer to returning/creating a StatefulSinkWriter, but the method now returns SinkWriter<InputT> and the restore method returns StatefulSinkWriterAdapter. Please update the links/wording to match the new signatures to avoid misleading API documentation.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants