-
Notifications
You must be signed in to change notification settings - Fork 7
feat: Reconnect feature on stream error #419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
471f00b
Small simplification
CarlosGamero 258bff8
Reduce duplication
CarlosGamero 138aec7
Replacing deprecates
CarlosGamero d4fa780
Improve init and close flows to allow restarts
CarlosGamero 4becc11
Restart PoC
CarlosGamero e181872
Proper reconnect flow
CarlosGamero 0cf53f0
Merge branch 'main' into feat/reconnect_on_error
CarlosGamero 533d0dc
Improve error report
CarlosGamero e9a720b
Reconnect logs fixes
CarlosGamero 3f6cea3
Remove duplication by using logger child
CarlosGamero a324be7
AI fix
CarlosGamero 6de55c9
lint
CarlosGamero d7fab3c
Adding test
CarlosGamero d8f4e88
Handling healtcheck during reconnect
CarlosGamero bc9e20c
Fixing small issue
CarlosGamero 2e1de62
Adding test to cover reconnect error
CarlosGamero 86c8732
Test fix
CarlosGamero 783978e
Test fix
CarlosGamero c54eba2
Minor improvement
CarlosGamero 08a6975
Trying to fix tests
CarlosGamero 8659a3e
AI comment
CarlosGamero a14d4b3
test fix
CarlosGamero d006089
plt kafka update
CarlosGamero 92f224d
Adding comment
CarlosGamero 2b2b9af
test fix
CarlosGamero File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ import { pipeline } from 'node:stream/promises' | |
| import { setTimeout } from 'node:timers/promises' | ||
| import { | ||
| InternalError, | ||
| resolveGlobalErrorLogObject, | ||
| stringValueSerializer, | ||
| type TransactionObservabilityManager, | ||
| } from '@lokalise/node-core' | ||
|
|
@@ -85,6 +86,7 @@ TODO: Proper retry mechanism + DLQ -> https://lokalise.atlassian.net/browse/EDEX | |
| In the meantime, we will retry in memory up to 3 times | ||
| */ | ||
| const MAX_IN_MEMORY_RETRIES = 3 | ||
| const MAX_RECONNECT_ATTEMPTS = 5 | ||
|
|
||
| export abstract class AbstractKafkaConsumer< | ||
| TopicsConfig extends TopicConfig[], | ||
|
|
@@ -94,11 +96,12 @@ export abstract class AbstractKafkaConsumer< | |
| TopicsConfig, | ||
| KafkaConsumerOptions<TopicsConfig, ExecutionContext, BatchProcessingEnabled> | ||
| > { | ||
| private readonly consumer: Consumer<string, object, string, string> | ||
| private consumer?: Consumer<string, object, string, string> | ||
| private consumerStream?: MessagesStream<string, object, string, string> | ||
| private messageBatchStream?: KafkaMessageBatchStream< | ||
| DeserializedMessage<SupportedMessageValues<TopicsConfig>> | ||
| > | ||
| private isReconnecting: boolean | ||
|
|
||
| private readonly transactionObservabilityManager: TransactionObservabilityManager | ||
| private readonly executionContext: ExecutionContext | ||
|
|
@@ -109,45 +112,18 @@ export abstract class AbstractKafkaConsumer< | |
| executionContext: ExecutionContext, | ||
| ) { | ||
| super(dependencies, options) | ||
|
|
||
| this.transactionObservabilityManager = dependencies.transactionObservabilityManager | ||
| this.executionContext = executionContext | ||
|
|
||
| this.consumer = new Consumer({ | ||
| ...this.options.kafka, | ||
| ...this.options, | ||
| autocommit: false, // Handling commits manually | ||
| deserializers: { | ||
| key: stringDeserializer, | ||
| value: safeJsonDeserializer, | ||
| headerKey: stringDeserializer, | ||
| headerValue: stringDeserializer, | ||
| }, | ||
| }) | ||
|
|
||
| const logDetails = { origin: this.constructor.name, groupId: this.options.groupId } | ||
| /* v8 ignore start */ | ||
| this.consumer.on('consumer:group:join', (_) => | ||
| this.logger.debug(logDetails, 'Consumer is joining a group'), | ||
| ) | ||
| this.consumer.on('consumer:group:rejoin', () => | ||
| this.logger.debug(logDetails, 'Consumer is re-joining a group after a rebalance'), | ||
| ) | ||
| this.consumer.on('consumer:group:leave', (_) => | ||
| this.logger.debug(logDetails, 'Consumer is leaving the group'), | ||
| ) | ||
| this.consumer.on('consumer:group:rebalance', (_) => | ||
| this.logger.debug(logDetails, 'Group is rebalancing'), | ||
| ) | ||
| /* v8 ignore stop */ | ||
| this.isReconnecting = false | ||
| } | ||
|
|
||
| /** | ||
| * Returns true if all client's connections are currently connected and the client is connected to at least one broker. | ||
| * Returns `true` if all client connections are currently active and the client is connected to at least one broker. | ||
| * During a reconnect attempt, returns `true` until all reconnect attempts are exhausted. | ||
| */ | ||
| get isConnected(): boolean { | ||
| // Streams are created only when init method was called | ||
| if (!this.consumerStream && !this.messageBatchStream) return false | ||
| if (!this.consumer) return this.isReconnecting | ||
| try { | ||
| return this.consumer.isConnected() | ||
| /* v8 ignore start */ | ||
|
|
@@ -159,12 +135,12 @@ export abstract class AbstractKafkaConsumer< | |
| } | ||
|
|
||
| /** | ||
| * Returns `true` if the consumer is not closed, and it is currently an active member of a consumer group. | ||
| * This method will return `false` during consumer group rebalancing. | ||
| * Returns `true` if the consumer is not closed and is an active member of a consumer group. | ||
| * Returns `false` during consumer group rebalancing. | ||
| * During a reconnect attempt, returns `true` until all reconnect attempts are exhausted. | ||
| */ | ||
| get isActive(): boolean { | ||
| // Streams are created only when init method was called | ||
| if (!this.consumerStream && !this.messageBatchStream) return false | ||
| if (!this.consumer) return this.isReconnecting | ||
| try { | ||
| return this.consumer.isActive() | ||
| /* v8 ignore start */ | ||
|
|
@@ -176,10 +152,24 @@ export abstract class AbstractKafkaConsumer< | |
| } | ||
|
|
||
| async init(): Promise<void> { | ||
| if (this.consumerStream) return Promise.resolve() | ||
| if (this.consumer) return Promise.resolve() | ||
|
|
||
| const topics = Object.keys(this.options.handlers) | ||
| if (topics.length === 0) throw new Error('At least one topic must be defined') | ||
|
|
||
| // Consumer needs to be recreated; once you call close, it ends in a final state, so we need to start from scratch | ||
| this.consumer = new Consumer({ | ||
| ...this.options.kafka, | ||
| ...this.options, | ||
| autocommit: false, // Handling commits manually | ||
| deserializers: { | ||
| key: stringDeserializer, | ||
| value: safeJsonDeserializer, | ||
| headerKey: stringDeserializer, | ||
| headerValue: stringDeserializer, | ||
| }, | ||
| }) | ||
|
|
||
| try { | ||
| const { handlers: _, ...consumeOptions } = this.options // Handlers cannot be passed to consume method | ||
|
|
||
|
|
@@ -202,12 +192,13 @@ export abstract class AbstractKafkaConsumer< | |
| }) | ||
|
|
||
| // Use pipeline for better error handling and backpressure management. | ||
| // pipeline() internally listens for errors on all streams | ||
| // pipeline() internally listens for errors on all streams and rejects if any stream errors. | ||
| // The .catch() here reports the error; reconnection is handled by handleStream's .catch() below. | ||
| pipeline(this.consumerStream, this.messageBatchStream).catch((error) => | ||
| this.handlerError(error), | ||
| this.handleError(error), | ||
| ) | ||
| } else { | ||
| this.consumerStream.on('error', (error) => this.handlerError(error)) | ||
| this.consumerStream.on('error', (error) => this.handleError(error)) | ||
| } | ||
| } catch (error) { | ||
| throw new InternalError({ | ||
|
|
@@ -217,63 +208,84 @@ export abstract class AbstractKafkaConsumer< | |
| }) | ||
| } | ||
|
|
||
| if (this.messageBatchStream) { | ||
| this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) | ||
| } else { | ||
| this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) | ||
| } | ||
| this.handleStream( | ||
| this.messageBatchStream ? this.messageBatchStream : this.consumerStream, | ||
| ).catch((error) => this.reconnect(error)) | ||
| } | ||
|
|
||
| private async handleSyncStream( | ||
| stream: MessagesStream<string, object, string, string>, | ||
| ): Promise<void> { | ||
| for await (const message of stream) { | ||
| private async handleStream( | ||
| stream: | ||
| | MessagesStream<string, object, string, string> | ||
| | KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>, | ||
| ) { | ||
| for await (const messageOrBatch of stream) { | ||
| await this.consume( | ||
| message.topic, | ||
| message as DeserializedMessage<SupportedMessageValues<TopicsConfig>>, | ||
| Array.isArray(messageOrBatch) ? messageOrBatch[0].topic : messageOrBatch.topic, | ||
| messageOrBatch, | ||
| ) | ||
| } | ||
| } | ||
| private async handleSyncStreamBatch( | ||
| stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>, | ||
| ): Promise<void> { | ||
| for await (const messageBatch of stream) { | ||
| await this.consume(messageBatch[0].topic, messageBatch) | ||
| } | ||
| } | ||
|
|
||
| async close(): Promise<void> { | ||
| if (!this.consumerStream && !this.messageBatchStream) { | ||
| // Leaving the group in case consumer joined but streams were not created | ||
| if (this.isActive) this.consumer.leaveGroup() | ||
| return | ||
| } | ||
| if (!this.consumer) return Promise.resolve() | ||
|
|
||
| if (this.consumerStream) { | ||
| await this.consumerStream.close() | ||
| this.consumerStream = undefined | ||
| } | ||
| await this.consumerStream?.close() | ||
| this.consumerStream = undefined | ||
|
|
||
| if (this.messageBatchStream) { | ||
| await new Promise((resolve) => this.messageBatchStream?.end(resolve)) | ||
| this.messageBatchStream = undefined | ||
| } | ||
| await new Promise((resolve) => | ||
| this.messageBatchStream ? this.messageBatchStream?.end(resolve) : resolve(undefined), | ||
| ) | ||
| this.messageBatchStream = undefined | ||
|
|
||
| await this.consumer.close() | ||
| try { | ||
| await this.consumer.close() | ||
| } catch { | ||
| // Ignoring errors at this stage | ||
| } | ||
| this.consumer = undefined | ||
| } | ||
CarlosGamero marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private resolveHandler(topic: SupportedTopics<TopicsConfig>) { | ||
| return this.options.handlers[topic] | ||
| private async reconnect(error: unknown): Promise<void> { | ||
| this.isReconnecting = true | ||
| this.logger.info( | ||
| { error: resolveGlobalErrorLogObject(error) }, | ||
| 'Stream error detected, attempting to reconnect', | ||
| ) | ||
|
|
||
| for (let attempt = 0; attempt < MAX_RECONNECT_ATTEMPTS; attempt++) { | ||
| try { | ||
| await this.close() | ||
| await setTimeout(Math.pow(2, attempt) * 1000) // Backoff delay starting with 1s | ||
| await this.init() | ||
| this.isReconnecting = false | ||
| return | ||
| } catch (error) { | ||
| this.logger.warn( | ||
| { | ||
| attempt, | ||
| maxAttempts: MAX_RECONNECT_ATTEMPTS, | ||
| error: resolveGlobalErrorLogObject(error), | ||
| }, | ||
| 'Reconnect attempt failed', | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| await this.close() // closing in case something is open after last init call | ||
| this.isReconnecting = false | ||
| this.handleError(new Error('Consumer failed to reconnect after max attempts'), { | ||
| maxAttempts: MAX_RECONNECT_ATTEMPTS, | ||
| }) | ||
| } | ||
|
|
||
| private async consume( | ||
| topic: string, | ||
| topic: SupportedTopics<TopicsConfig>, | ||
| messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>, | ||
| ): Promise<void> { | ||
| const messageProcessingStartTimestamp = Date.now() | ||
| this.logger.debug({ origin: this.constructor.name, topic }, 'Consuming message(s)') | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Origin is now added with logger.child to avoid repetition |
||
| this.logger.debug({ topic }, 'Consuming message(s)') | ||
|
|
||
| const handlerConfig = this.resolveHandler(topic) | ||
| const handlerConfig = this.options.handlers[topic] | ||
|
|
||
| // if there is no handler for the message, we ignore it (simulating subscription) | ||
| if (!handlerConfig) return this.commit(messageOrBatch) | ||
|
|
@@ -285,11 +297,11 @@ export abstract class AbstractKafkaConsumer< | |
| ) | ||
|
|
||
| if (!validMessages.length) { | ||
| this.logger.debug({ origin: this.constructor.name, topic }, 'Received not valid message(s)') | ||
| this.logger.debug({ topic }, 'Received not valid message(s)') | ||
| return this.commit(messageOrBatch) | ||
| } else { | ||
| this.logger.debug( | ||
| { origin: this.constructor.name, topic, validMessagesCount: validMessages.length }, | ||
| { topic, validMessagesCount: validMessages.length }, | ||
| 'Received valid message(s) to process', | ||
| ) | ||
| } | ||
|
|
@@ -339,7 +351,7 @@ export abstract class AbstractKafkaConsumer< | |
| const parseResult = handlerConfig.schema.safeParse(message.value) | ||
|
|
||
| if (!parseResult.success) { | ||
| this.handlerError(parseResult.error, { | ||
| this.handleError(parseResult.error, { | ||
| topic: message.topic, | ||
| message: stringValueSerializer(message.value), | ||
| }) | ||
|
|
@@ -415,7 +427,7 @@ export abstract class AbstractKafkaConsumer< | |
| const errorContext = Array.isArray(messageOrBatch) | ||
| ? { batchSize: messageOrBatch.length } | ||
| : { message: stringValueSerializer(messageOrBatch.value) } | ||
| this.handlerError(error, { topic, ...errorContext }) | ||
| this.handleError(error, { topic, ...errorContext }) | ||
| } | ||
|
|
||
| return { status: 'error', errorReason: 'handlerError' } | ||
|
|
@@ -435,32 +447,32 @@ export abstract class AbstractKafkaConsumer< | |
| } | ||
| } | ||
|
|
||
| private commit(messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>) { | ||
| private async commit(messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>) { | ||
| let messageToCommit: DeserializedMessage<SupportedMessageValues<TopicsConfig>> | ||
| if (Array.isArray(messageOrBatch)) { | ||
| if (messageOrBatch.length === 0) return Promise.resolve() | ||
|
|
||
| // biome-ignore lint/style/noNonNullAssertion: we check the length above | ||
| return this.commitMessage(messageOrBatch[messageOrBatch.length - 1]!) | ||
| messageToCommit = messageOrBatch[messageOrBatch.length - 1]! | ||
| } else { | ||
| return this.commitMessage(messageOrBatch) | ||
| messageToCommit = messageOrBatch | ||
| } | ||
| } | ||
|
|
||
| private async commitMessage(message: DeserializedMessage<SupportedMessageValues<TopicsConfig>>) { | ||
| const logDetails = { | ||
| topic: message.topic, | ||
| offset: message.offset, | ||
| timestamp: message.timestamp, | ||
| topic: messageToCommit.topic, | ||
| offset: messageToCommit.offset, | ||
| timestamp: messageToCommit.timestamp, | ||
| } | ||
| this.logger.debug(logDetails, 'Trying to commit message') | ||
|
|
||
| try { | ||
| await message.commit() | ||
| await messageToCommit.commit() | ||
| this.logger.debug(logDetails, 'Message committed successfully') | ||
| } catch (error) { | ||
| this.logger.debug(logDetails, 'Message commit failed') | ||
| if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error) | ||
| this.handlerError(error) | ||
| return error instanceof ResponseError | ||
| ? this.handleResponseErrorOnCommit(error) | ||
| : this.handleError(error) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -483,7 +495,7 @@ export abstract class AbstractKafkaConsumer< | |
| `Failed to commit message: ${error.message}`, | ||
| ) | ||
| } else { | ||
| this.handlerError(error) | ||
| this.handleError(error) | ||
| } | ||
| } | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer needs to be recreated, once you call
closeit ends in a final state so we need to start from scratchThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add this as a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! added :D