[FLINK-39410] The flink-cdc-pipeline-connector-elasticsearch module offers better compatibility with Flink 1.20 and Flink 2.x#4368
Conversation
… compatibility with Flink 1.20 and Flink 2.x. Signed-off-by: Pei Yu <125331682@qq.com>
|
The CI error doesn't seem to be related to my PR, I ran the error use case locally and it works fine. |
There was a problem hiding this comment.
Pull request overview
This PR updates the Elasticsearch pipeline connector to target newer Flink versions (Flink 1.20 by default, and Flink 2.x via profile) by introducing a small compatibility layer around Flink’s async sink APIs and aligning the connector implementation with the newer flink-sql-connector-elasticsearchx artifacts.
Changes:
- Add Flink 1.20 and Flink 2.2 compatibility adapters for async sink base/writer and init-context bridging.
- Update the Elasticsearch 8 async sink/writer implementation to use the new adapter types and newer async sink callback API (
ResultHandler). - Bump
flink-sql-connector-elasticsearch6/7dependency version for Flink 1.20 and add aflink2Maven profile to switch to the Flink 2.x connector artifact line.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-flink2-compat/.../AsyncSinkWriterAdapter.java | Adds Flink 2.2 adapter to bridge Sink.InitContext to WriterInitContext for AsyncSinkWriter. |
| flink-cdc-flink2-compat/.../AsyncSinkBaseAdapter.java | Adds Flink 2.2 adapter surface for restoring writers via restoreWriterAdapter. |
| flink-cdc-flink2-compat/.../WriterInitContextAdapter.java | Implements a WriterInitContext wrapper around Sink.InitContext. |
| flink-cdc-flink2-compat/.../StatefulSinkWriterAdapter.java | Adds a compat interface to represent a stateful writer in Flink 2.2. |
| flink-cdc-flink1-compat/.../AsyncSinkWriterAdapter.java | Adds Flink 1.20 adapter wrapper for AsyncSinkWriter. |
| flink-cdc-flink1-compat/.../AsyncSinkBaseAdapter.java | Adds Flink 1.20 adapter surface for writer restoration (currently has a compile issue). |
| flink-cdc-flink1-compat/.../WriterInitContextAdapter.java | Implements WriterInitContext wrapper around Sink.InitContext for Flink 1.20. |
| flink-cdc-flink1-compat/.../StatefulSinkWriterAdapter.java | Adds a Flink 1.20 compat interface for a stateful writer type. |
| .../Elasticsearch8AsyncWriter.java | Migrates writer to adapter base + new ResultHandler callback style. |
| .../Elasticsearch8AsyncSink.java | Migrates sink to adapter base and updates writer creation/restore paths for compatibility. |
| .../ElasticsearchEventSerializer.java | Removes now-unneeded open(Sink.InitContext) override. |
| .../pom.xml | Bumps Flink ES SQL connector version and adds a flink2 profile to switch artifact line. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> restoreWriter( | ||
| WriterInitContext context, | ||
| Collection<BufferedRequestState<RequestEntryT>> recoveredState) | ||
| throws IOException { |
There was a problem hiding this comment.
StatefulSinkWriter is referenced in the restoreWriter(...) override but isn’t imported or qualified anywhere in this file. Given this module targets Flink 1.20 where the writer type may be StatefulSink.StatefulSinkWriter, this currently won’t compile. Fix by using the correct fully-qualified type (or importing the proper Flink 1.20 type) so the override matches AsyncSinkBase#restoreWriter’s signature.
| public StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> restoreWriter( | |
| WriterInitContext context, | |
| Collection<BufferedRequestState<RequestEntryT>> recoveredState) | |
| throws IOException { | |
| public org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter< | |
| InputT, BufferedRequestState<RequestEntryT>> | |
| restoreWriter( | |
| WriterInitContext context, | |
| Collection<BufferedRequestState<RequestEntryT>> recoveredState) | |
| throws IOException { |
There was a problem hiding this comment.
This class will only compile with flink1.x, and the method signatures match.
| @@ -166,13 +167,13 @@ private void handleFailedRequest( | |||
| numRecordsOutErrorsCounter.inc(requestEntries.size()); | |||
|
|
|||
| if (isRetryable(error.getCause())) { | |||
There was a problem hiding this comment.
handleFailedRequest(...) calls isRetryable(error.getCause()). If error.getCause() is null (which is possible for many exception types), this will lead to a NullPointerException inside isRetryable/FatalExceptionClassifier, masking the real failure. Pass a non-null throwable (e.g., error.getCause() != null ? error.getCause() : error) or update isRetryable to handle null safely.
| if (isRetryable(error.getCause())) { | |
| Throwable retryableError = error.getCause() != null ? error.getCause() : error; | |
| if (isRetryable(retryableError)) { |
| /** | ||
| * Creates a new {@link StatefulSinkWriter} for writing elements to Elasticsearch. | ||
| * | ||
| * @param context the initialization context. | ||
| * @return a new instance of {@link Elasticsearch8AsyncWriter}. | ||
| */ | ||
| @Override | ||
| public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter( | ||
| InitContext context) { | ||
| public SinkWriter<InputT> createWriter(InitContext context) { |
There was a problem hiding this comment.
The Javadocs for createWriter/restoreWriterAdapter still refer to returning/creating a StatefulSinkWriter, but the method now returns SinkWriter<InputT> and the restore method returns StatefulSinkWriterAdapter. Please update the links/wording to match the new signatures to avoid misleading API documentation.
Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Currently, the
flink-sql-connector-elasticsearchxcomponent, which theflink-cdc-pipeline-connector-elasticsearchmodule depends on, is still compatible with Flink 1.17. This is unnecessary; compatibility with Flink 1.20 and Flink 2.2+ is sufficient.4 new adapters:
AsyncSinkBaseAdapter
AsyncSinkWriterAdapter
StatefulSinkWriterAdapter
WriterInitContextAdapter