From 471f00b5185b99a553d0dc2fb398c1ea4de74c90 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 13:39:39 +0100 Subject: [PATCH 01/24] Small simplification --- packages/kafka/lib/AbstractKafkaConsumer.ts | 25 +++++++++------------ 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index fa1948ef..ee9ace1e 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -262,18 +262,14 @@ export abstract class AbstractKafkaConsumer< await this.consumer.close() } - private resolveHandler(topic: SupportedTopics) { - return this.options.handlers[topic] - } - private async consume( - topic: string, + topic: SupportedTopics, messageOrBatch: MessageOrBatch>, ): Promise { const messageProcessingStartTimestamp = Date.now() this.logger.debug({ origin: this.constructor.name, topic }, 'Consuming message(s)') - const handlerConfig = this.resolveHandler(topic) + const handlerConfig = this.options.handlers[topic] // if there is no handler for the message, we ignore it (simulating subscription) if (!handlerConfig) return this.commit(messageOrBatch) @@ -435,27 +431,26 @@ export abstract class AbstractKafkaConsumer< } } - private commit(messageOrBatch: MessageOrBatch>) { + private async commit(messageOrBatch: MessageOrBatch>) { + let messageToCommit: DeserializedMessage> if (Array.isArray(messageOrBatch)) { if (messageOrBatch.length === 0) return Promise.resolve() // biome-ignore lint/style/noNonNullAssertion: we check the length above - return this.commitMessage(messageOrBatch[messageOrBatch.length - 1]!) + messageToCommit = messageOrBatch[messageOrBatch.length - 1]! } else { - return this.commitMessage(messageOrBatch) + messageToCommit = messageOrBatch } - } - private async commitMessage(message: DeserializedMessage>) { const logDetails = { - topic: message.topic, - offset: message.offset, - timestamp: message.timestamp, + topic: messageToCommit.topic, + offset: messageToCommit.offset, + timestamp: messageToCommit.timestamp, } this.logger.debug(logDetails, 'Trying to commit message') try { - await message.commit() + await messageToCommit.commit() this.logger.debug(logDetails, 'Message committed successfully') } catch (error) { this.logger.debug(logDetails, 'Message commit failed') From 258bff8983145fb7efdfc0ccabfeae99f1b79e94 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 13:48:16 +0100 Subject: [PATCH 02/24] Reduce duplication --- packages/kafka/lib/AbstractKafkaConsumer.ts | 29 ++++++++------------- 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index ee9ace1e..efa5c5c0 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -217,30 +217,23 @@ export abstract class AbstractKafkaConsumer< }) } - if (this.messageBatchStream) { - this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) - } else { - this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) - } + this.handleStream( + this.messageBatchStream ? this.messageBatchStream : this.consumerStream, + ).catch((error) => this.handlerError(error)) } - private async handleSyncStream( - stream: MessagesStream, - ): Promise { - for await (const message of stream) { + private async handleStream( + stream: + | MessagesStream + | KafkaMessageBatchStream>>, + ) { + for await (const messageOrBatch of stream) { await this.consume( - message.topic, - message as DeserializedMessage>, + Array.isArray(messageOrBatch) ? messageOrBatch[0].topic : messageOrBatch.topic, + messageOrBatch, ) } } - private async handleSyncStreamBatch( - stream: KafkaMessageBatchStream>>, - ): Promise { - for await (const messageBatch of stream) { - await this.consume(messageBatch[0].topic, messageBatch) - } - } async close(): Promise { if (!this.consumerStream && !this.messageBatchStream) { From 138aec75ac1ad58810e058de0e3b20c993105a58 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 14:34:49 +0100 Subject: [PATCH 03/24] Replacing deprecates --- .../kafka/test/consumer/PermissionBatchConsumer.spec.ts | 6 +++--- packages/kafka/test/consumer/PermissionConsumer.spec.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts b/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts index 11450848..fa001dd2 100644 --- a/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts +++ b/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts @@ -64,7 +64,7 @@ describe('PermissionBatchConsumer', () => { // Given consumer = new PermissionBatchConsumer(testContext.cradle, { handlers: {} }) // When - Then - await expect(consumer.close()).resolves.not.toThrowError() + await expect(consumer.close()).resolves.not.toThrow() }) it('should not fail on init if it is already initiated', async () => { @@ -74,7 +74,7 @@ describe('PermissionBatchConsumer', () => { await consumer.init() // Then - await expect(consumer.init()).resolves.not.toThrowError() + await expect(consumer.init()).resolves.not.toThrow() }) it('should fail if kafka is not available', async () => { @@ -109,7 +109,7 @@ describe('PermissionBatchConsumer', () => { consumer = new PermissionBatchConsumer(testContext.cradle) // When - Then - await expect(consumer.init()).resolves.not.toThrowError() + await expect(consumer.init()).resolves.not.toThrow() }) }) diff --git a/packages/kafka/test/consumer/PermissionConsumer.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.spec.ts index 85025121..8e41a3d7 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.spec.ts @@ -65,7 +65,7 @@ describe('PermissionConsumer', () => { // Given consumer = new PermissionConsumer(testContext.cradle, { handlers: {} }) // When - Then - await expect(consumer.close()).resolves.not.toThrowError() + await expect(consumer.close()).resolves.not.toThrow() }) it('should not fail on init if it is already initiated', async () => { @@ -75,7 +75,7 @@ describe('PermissionConsumer', () => { await consumer.init() // Then - await expect(consumer.init()).resolves.not.toThrowError() + await expect(consumer.init()).resolves.not.toThrow() }) it('should fail if kafka is not available', async () => { @@ -110,7 +110,7 @@ describe('PermissionConsumer', () => { consumer = new PermissionConsumer(testContext.cradle) // When - Then - await expect(consumer.init()).resolves.not.toThrowError() + await expect(consumer.init()).resolves.not.toThrow() }) }) From d4fa780405e4d0779973eb8d296e068287232635 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 15:45:05 +0100 Subject: [PATCH 04/24] Improve init and close flows to allow restarts --- packages/kafka/lib/AbstractKafkaConsumer.ts | 76 +++++++-------------- 1 file changed, 26 insertions(+), 50 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index efa5c5c0..a2a61d57 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -94,7 +94,7 @@ export abstract class AbstractKafkaConsumer< TopicsConfig, KafkaConsumerOptions > { - private readonly consumer: Consumer + private consumer?: Consumer private consumerStream?: MessagesStream private messageBatchStream?: KafkaMessageBatchStream< DeserializedMessage> @@ -109,45 +109,15 @@ export abstract class AbstractKafkaConsumer< executionContext: ExecutionContext, ) { super(dependencies, options) - this.transactionObservabilityManager = dependencies.transactionObservabilityManager this.executionContext = executionContext - - this.consumer = new Consumer({ - ...this.options.kafka, - ...this.options, - autocommit: false, // Handling commits manually - deserializers: { - key: stringDeserializer, - value: safeJsonDeserializer, - headerKey: stringDeserializer, - headerValue: stringDeserializer, - }, - }) - - const logDetails = { origin: this.constructor.name, groupId: this.options.groupId } - /* v8 ignore start */ - this.consumer.on('consumer:group:join', (_) => - this.logger.debug(logDetails, 'Consumer is joining a group'), - ) - this.consumer.on('consumer:group:rejoin', () => - this.logger.debug(logDetails, 'Consumer is re-joining a group after a rebalance'), - ) - this.consumer.on('consumer:group:leave', (_) => - this.logger.debug(logDetails, 'Consumer is leaving the group'), - ) - this.consumer.on('consumer:group:rebalance', (_) => - this.logger.debug(logDetails, 'Group is rebalancing'), - ) - /* v8 ignore stop */ } /** * Returns true if all client's connections are currently connected and the client is connected to at least one broker. */ get isConnected(): boolean { - // Streams are created only when init method was called - if (!this.consumerStream && !this.messageBatchStream) return false + if (!this.consumer) return false try { return this.consumer.isConnected() /* v8 ignore start */ @@ -163,8 +133,7 @@ export abstract class AbstractKafkaConsumer< * This method will return `false` during consumer group rebalancing. */ get isActive(): boolean { - // Streams are created only when init method was called - if (!this.consumerStream && !this.messageBatchStream) return false + if (!this.consumer) return false try { return this.consumer.isActive() /* v8 ignore start */ @@ -176,10 +145,23 @@ export abstract class AbstractKafkaConsumer< } async init(): Promise { - if (this.consumerStream) return Promise.resolve() + if (this.consumer) return Promise.resolve() + const topics = Object.keys(this.options.handlers) if (topics.length === 0) throw new Error('At least one topic must be defined') + this.consumer = new Consumer({ + ...this.options.kafka, + ...this.options, + autocommit: false, // Handling commits manually + deserializers: { + key: stringDeserializer, + value: safeJsonDeserializer, + headerKey: stringDeserializer, + headerValue: stringDeserializer, + }, + }) + try { const { handlers: _, ...consumeOptions } = this.options // Handlers cannot be passed to consume method @@ -236,23 +218,17 @@ export abstract class AbstractKafkaConsumer< } async close(): Promise { - if (!this.consumerStream && !this.messageBatchStream) { - // Leaving the group in case consumer joined but streams were not created - if (this.isActive) this.consumer.leaveGroup() - return - } + await this.consumerStream?.close() + this.consumerStream = undefined - if (this.consumerStream) { - await this.consumerStream.close() - this.consumerStream = undefined - } - - if (this.messageBatchStream) { - await new Promise((resolve) => this.messageBatchStream?.end(resolve)) - this.messageBatchStream = undefined - } + await new Promise((resolve) => + this.messageBatchStream ? this.messageBatchStream?.end(resolve) : resolve(undefined), + ) + this.messageBatchStream = undefined - await this.consumer.close() + this.consumer?.leaveGroup() + await this.consumer?.close() + this.consumer = undefined } private async consume( From 4becc11267cf3e539f82533764269f7c5b88f03e Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 15:49:43 +0100 Subject: [PATCH 05/24] Restart PoC --- packages/kafka/lib/AbstractKafkaConsumer.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index a2a61d57..2b8c78c9 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -201,7 +201,11 @@ export abstract class AbstractKafkaConsumer< this.handleStream( this.messageBatchStream ? this.messageBatchStream : this.consumerStream, - ).catch((error) => this.handlerError(error)) + ).catch(async () => { + // TODO: PoC for testing -> we will refine it once we validate this works + add tests + await this.close() + await this.init() + }) } private async handleStream( From e181872899fc8092bb0e5e23da7ddd45aff2d13a Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 16:06:21 +0100 Subject: [PATCH 06/24] Proper reconnect flow --- packages/kafka/lib/AbstractKafkaConsumer.ts | 36 +++++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 2b8c78c9..ee659179 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -3,6 +3,7 @@ import { pipeline } from 'node:stream/promises' import { setTimeout } from 'node:timers/promises' import { InternalError, + resolveGlobalErrorLogObject, stringValueSerializer, type TransactionObservabilityManager, } from '@lokalise/node-core' @@ -85,6 +86,7 @@ TODO: Proper retry mechanism + DLQ -> https://lokalise.atlassian.net/browse/EDEX In the meantime, we will retry in memory up to 3 times */ const MAX_IN_MEMORY_RETRIES = 3 +const MAX_RECONNECT_ATTEMPTS = 5 export abstract class AbstractKafkaConsumer< TopicsConfig extends TopicConfig[], @@ -184,7 +186,8 @@ export abstract class AbstractKafkaConsumer< }) // Use pipeline for better error handling and backpressure management. - // pipeline() internally listens for errors on all streams + // pipeline() internally listens for errors on all streams and rejects if any stream errors. + // The .catch() here reports the error; reconnection is handled by handleStream's .catch() below. pipeline(this.consumerStream, this.messageBatchStream).catch((error) => this.handlerError(error), ) @@ -201,11 +204,7 @@ export abstract class AbstractKafkaConsumer< this.handleStream( this.messageBatchStream ? this.messageBatchStream : this.consumerStream, - ).catch(async () => { - // TODO: PoC for testing -> we will refine it once we validate this works + add tests - await this.close() - await this.init() - }) + ).catch((error) => this.reconnect(error)) } private async handleStream( @@ -235,6 +234,31 @@ export abstract class AbstractKafkaConsumer< this.consumer = undefined } + private async reconnect(error: unknown): Promise { + this.logger.warn( + { origin: this.constructor.name, error: resolveGlobalErrorLogObject(error) }, + 'Stream error detected, attempting to reconnect', + ) + + for (let attempt = 0; attempt < MAX_RECONNECT_ATTEMPTS; attempt++) { + try { + await this.close() + await setTimeout(Math.pow(2, attempt) * 1000) // Backoff delay starting with 1s + await this.init() + return + } catch (error) { + this.logger.warn( + { origin: this.constructor.name, attempt, error: resolveGlobalErrorLogObject(error) }, + 'Reconnect attempt failed', + ) + } + } + + this.handlerError(new Error('Consumer failed to reconnect after max attempts'), { + maxAttempts: MAX_RECONNECT_ATTEMPTS, + }) + } + private async consume( topic: SupportedTopics, messageOrBatch: MessageOrBatch>, From 533d0dc0ae5bb42a4fe6544883f49787b05b8c53 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 17:21:25 +0100 Subject: [PATCH 07/24] Improve error report --- packages/kafka/lib/AbstractKafkaConsumer.ts | 17 +++++++++-------- packages/kafka/lib/AbstractKafkaPublisher.ts | 3 +-- packages/kafka/lib/AbstractKafkaService.ts | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index ee659179..9b2b7211 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -189,10 +189,10 @@ export abstract class AbstractKafkaConsumer< // pipeline() internally listens for errors on all streams and rejects if any stream errors. // The .catch() here reports the error; reconnection is handled by handleStream's .catch() below. pipeline(this.consumerStream, this.messageBatchStream).catch((error) => - this.handlerError(error), + this.handleError(error), ) } else { - this.consumerStream.on('error', (error) => this.handlerError(error)) + this.consumerStream.on('error', (error) => this.handleError(error)) } } catch (error) { throw new InternalError({ @@ -254,7 +254,7 @@ export abstract class AbstractKafkaConsumer< } } - this.handlerError(new Error('Consumer failed to reconnect after max attempts'), { + this.handleError(new Error('Consumer failed to reconnect after max attempts'), { maxAttempts: MAX_RECONNECT_ATTEMPTS, }) } @@ -332,7 +332,7 @@ export abstract class AbstractKafkaConsumer< const parseResult = handlerConfig.schema.safeParse(message.value) if (!parseResult.success) { - this.handlerError(parseResult.error, { + this.handleError(parseResult.error, { topic: message.topic, message: stringValueSerializer(message.value), }) @@ -408,7 +408,7 @@ export abstract class AbstractKafkaConsumer< const errorContext = Array.isArray(messageOrBatch) ? { batchSize: messageOrBatch.length } : { message: stringValueSerializer(messageOrBatch.value) } - this.handlerError(error, { topic, ...errorContext }) + this.handleError(error, { topic, ...errorContext }) } return { status: 'error', errorReason: 'handlerError' } @@ -451,8 +451,9 @@ export abstract class AbstractKafkaConsumer< this.logger.debug(logDetails, 'Message committed successfully') } catch (error) { this.logger.debug(logDetails, 'Message commit failed') - if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error) - this.handlerError(error) + return error instanceof ResponseError + ? this.handleResponseErrorOnCommit(error) + : this.handleError(error) } } @@ -475,7 +476,7 @@ export abstract class AbstractKafkaConsumer< `Failed to commit message: ${error.message}`, ) } else { - this.handlerError(error) + this.handleError(error) } } } diff --git a/packages/kafka/lib/AbstractKafkaPublisher.ts b/packages/kafka/lib/AbstractKafkaPublisher.ts index fdb76f1a..378b2798 100644 --- a/packages/kafka/lib/AbstractKafkaPublisher.ts +++ b/packages/kafka/lib/AbstractKafkaPublisher.ts @@ -122,10 +122,9 @@ export abstract class AbstractKafkaPublisher< } catch (error) { const errorDetails = { topic, - publisher: this.constructor.name, message: stringValueSerializer(message), } - this.handlerError(error, errorDetails) + this.handleError(error, errorDetails) throw new InternalError({ message: `Error while publishing to Kafka: ${(error as Error).message}`, errorCode: 'KAFKA_PUBLISH_ERROR', diff --git a/packages/kafka/lib/AbstractKafkaService.ts b/packages/kafka/lib/AbstractKafkaService.ts index bfaf3164..0f759d6a 100644 --- a/packages/kafka/lib/AbstractKafkaService.ts +++ b/packages/kafka/lib/AbstractKafkaService.ts @@ -125,9 +125,9 @@ export abstract class AbstractKafkaService< } } - protected handlerError(error: unknown, context: Record = {}): void { + protected handleError(error: unknown, context: Record = {}): void { const resolvedErrorLog = resolveGlobalErrorLogObject(error) - this.logger.error({ ...resolvedErrorLog, ...context }) + this.logger.error({ ...resolvedErrorLog, ...context, origin: this.constructor.name }) if (isError(error)) this.errorReporter.report({ error, From e9a720b2e8ba04c5adad84c2130eeacc58fb54c8 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 17:24:53 +0100 Subject: [PATCH 08/24] Reconnect logs fixes --- packages/kafka/lib/AbstractKafkaConsumer.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 9b2b7211..9e0a8d5b 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -235,7 +235,7 @@ export abstract class AbstractKafkaConsumer< } private async reconnect(error: unknown): Promise { - this.logger.warn( + this.logger.info( { origin: this.constructor.name, error: resolveGlobalErrorLogObject(error) }, 'Stream error detected, attempting to reconnect', ) @@ -248,7 +248,7 @@ export abstract class AbstractKafkaConsumer< return } catch (error) { this.logger.warn( - { origin: this.constructor.name, attempt, error: resolveGlobalErrorLogObject(error) }, + { origin: this.constructor.name, attempt, maxAttempts: MAX_RECONNECT_ATTEMPTS, error: resolveGlobalErrorLogObject(error) }, 'Reconnect attempt failed', ) } From 3f6cea3966ff85c5e9829e1cc49f527b47637471 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 17:30:35 +0100 Subject: [PATCH 09/24] Remove duplication by using logger child --- packages/kafka/lib/AbstractKafkaConsumer.ts | 10 +++++----- packages/kafka/lib/AbstractKafkaService.ts | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 9e0a8d5b..f3c540ec 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -236,7 +236,7 @@ export abstract class AbstractKafkaConsumer< private async reconnect(error: unknown): Promise { this.logger.info( - { origin: this.constructor.name, error: resolveGlobalErrorLogObject(error) }, + { error: resolveGlobalErrorLogObject(error) }, 'Stream error detected, attempting to reconnect', ) @@ -248,7 +248,7 @@ export abstract class AbstractKafkaConsumer< return } catch (error) { this.logger.warn( - { origin: this.constructor.name, attempt, maxAttempts: MAX_RECONNECT_ATTEMPTS, error: resolveGlobalErrorLogObject(error) }, + { attempt, maxAttempts: MAX_RECONNECT_ATTEMPTS, error: resolveGlobalErrorLogObject(error) }, 'Reconnect attempt failed', ) } @@ -264,7 +264,7 @@ export abstract class AbstractKafkaConsumer< messageOrBatch: MessageOrBatch>, ): Promise { const messageProcessingStartTimestamp = Date.now() - this.logger.debug({ origin: this.constructor.name, topic }, 'Consuming message(s)') + this.logger.debug({ topic }, 'Consuming message(s)') const handlerConfig = this.options.handlers[topic] @@ -278,11 +278,11 @@ export abstract class AbstractKafkaConsumer< ) if (!validMessages.length) { - this.logger.debug({ origin: this.constructor.name, topic }, 'Received not valid message(s)') + this.logger.debug({ topic }, 'Received not valid message(s)') return this.commit(messageOrBatch) } else { this.logger.debug( - { origin: this.constructor.name, topic, validMessagesCount: validMessages.length }, + { topic, validMessagesCount: validMessages.length }, 'Received valid message(s) to process', ) } diff --git a/packages/kafka/lib/AbstractKafkaService.ts b/packages/kafka/lib/AbstractKafkaService.ts index 0f759d6a..a08d6f88 100644 --- a/packages/kafka/lib/AbstractKafkaService.ts +++ b/packages/kafka/lib/AbstractKafkaService.ts @@ -56,7 +56,7 @@ export abstract class AbstractKafkaService< protected readonly _handlerSpy?: HandlerSpy> constructor(dependencies: KafkaDependencies, options: KafkaOptions) { - this.logger = dependencies.logger + this.logger = dependencies.logger.child({ origin: this.constructor.name }) this.errorReporter = dependencies.errorReporter this.messageMetricsManager = dependencies.messageMetricsManager this.options = { ...options, messageIdField: options.messageIdField ?? 'id' } @@ -127,7 +127,7 @@ export abstract class AbstractKafkaService< protected handleError(error: unknown, context: Record = {}): void { const resolvedErrorLog = resolveGlobalErrorLogObject(error) - this.logger.error({ ...resolvedErrorLog, ...context, origin: this.constructor.name }) + this.logger.error({ ...resolvedErrorLog, ...context }) if (isError(error)) this.errorReporter.report({ error, From a324be74a4bd57a013f857869080e03b3a5ed9a4 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 18 Mar 2026 17:38:31 +0100 Subject: [PATCH 10/24] AI fix --- packages/kafka/lib/AbstractKafkaConsumer.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index f3c540ec..c22d7506 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -229,7 +229,6 @@ export abstract class AbstractKafkaConsumer< ) this.messageBatchStream = undefined - this.consumer?.leaveGroup() await this.consumer?.close() this.consumer = undefined } From 6de55c92736bbdfce356a3caca15c66484e66dd2 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 11:42:38 +0100 Subject: [PATCH 11/24] lint --- packages/kafka/lib/AbstractKafkaConsumer.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index c22d7506..266f77ee 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -247,7 +247,11 @@ export abstract class AbstractKafkaConsumer< return } catch (error) { this.logger.warn( - { attempt, maxAttempts: MAX_RECONNECT_ATTEMPTS, error: resolveGlobalErrorLogObject(error) }, + { + attempt, + maxAttempts: MAX_RECONNECT_ATTEMPTS, + error: resolveGlobalErrorLogObject(error), + }, 'Reconnect attempt failed', ) } From d7fab3c82be5bb6e69e965f9f10bef564605f527 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 12:08:01 +0100 Subject: [PATCH 12/24] Adding test --- .../PermissionConsumer.reconnect.spec.ts | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts diff --git a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts new file mode 100644 index 00000000..58ab9107 --- /dev/null +++ b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts @@ -0,0 +1,47 @@ +import { waitAndRetry } from '@lokalise/universal-ts-utils/node' +import { afterAll, afterEach, beforeAll, expect, vi } from 'vitest' +import { createTestContext, type TestContext } from '../utils/testContext.ts' +import { PermissionConsumer } from './PermissionConsumer.ts' + +describe('PermissionConsumer - reconnect', () => { + let testContext: TestContext + let consumer: PermissionConsumer + + beforeAll(async () => { + testContext = await createTestContext() + consumer = new PermissionConsumer(testContext.cradle) + }) + + afterEach(async () => { + await consumer.close() + }) + + afterAll(async () => { + await testContext.dispose() + }) + + const simulateStreamError = () => { + // Simulate error on pipeline + ;(consumer as any).consumerStream.destroy() + } + + it('should call close and init again after stream error', async () => { + // Given + await consumer.init() + + const closeSpy = vi.spyOn(consumer, 'close') + const initSpy = vi.spyOn(consumer, 'init') + + // When + simulateStreamError() + + // Then - reconnect should trigger close and init + await waitAndRetry(() => closeSpy.mock.calls.length === 1) + expect(consumer.isActive).toBe(false) + await waitAndRetry(() => initSpy.mock.calls.length === 1, 100, 15) + + expect(closeSpy).toHaveBeenCalledTimes(1) + expect(initSpy).toHaveBeenCalledTimes(1) + expect(consumer.isActive).toBe(true) + }) +}) From d8f4e88d7c22c4584064879121653757057a3642 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 12:19:27 +0100 Subject: [PATCH 13/24] Handling healtcheck during reconnect --- packages/kafka/lib/AbstractKafkaConsumer.ts | 18 +++++++++++++----- .../PermissionConsumer.reconnect.spec.ts | 3 ++- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 266f77ee..6359f70c 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -101,6 +101,7 @@ export abstract class AbstractKafkaConsumer< private messageBatchStream?: KafkaMessageBatchStream< DeserializedMessage> > + private isReconnecting: boolean private readonly transactionObservabilityManager: TransactionObservabilityManager private readonly executionContext: ExecutionContext @@ -113,13 +114,16 @@ export abstract class AbstractKafkaConsumer< super(dependencies, options) this.transactionObservabilityManager = dependencies.transactionObservabilityManager this.executionContext = executionContext + + this.isReconnecting = false } /** - * Returns true if all client's connections are currently connected and the client is connected to at least one broker. + * Returns `true` if all client connections are currently active and the client is connected to at least one broker. + * During a reconnect attempt, returns `true` until all reconnect attempts are exhausted. */ get isConnected(): boolean { - if (!this.consumer) return false + if (!this.consumer) return this.isReconnecting try { return this.consumer.isConnected() /* v8 ignore start */ @@ -131,11 +135,12 @@ export abstract class AbstractKafkaConsumer< } /** - * Returns `true` if the consumer is not closed, and it is currently an active member of a consumer group. - * This method will return `false` during consumer group rebalancing. + * Returns `true` if the consumer is not closed and is an active member of a consumer group. + * Returns `false` during consumer group rebalancing. + * During a reconnect attempt, returns `true` until all reconnect attempts are exhausted. */ get isActive(): boolean { - if (!this.consumer) return false + if (!this.consumer) return this.isReconnecting try { return this.consumer.isActive() /* v8 ignore start */ @@ -234,6 +239,7 @@ export abstract class AbstractKafkaConsumer< } private async reconnect(error: unknown): Promise { + this.isReconnecting = true this.logger.info( { error: resolveGlobalErrorLogObject(error) }, 'Stream error detected, attempting to reconnect', @@ -244,6 +250,7 @@ export abstract class AbstractKafkaConsumer< await this.close() await setTimeout(Math.pow(2, attempt) * 1000) // Backoff delay starting with 1s await this.init() + this.isReconnecting = false return } catch (error) { this.logger.warn( @@ -257,6 +264,7 @@ export abstract class AbstractKafkaConsumer< } } + this.isReconnecting = false this.handleError(new Error('Consumer failed to reconnect after max attempts'), { maxAttempts: MAX_RECONNECT_ATTEMPTS, }) diff --git a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts index 58ab9107..6b7c0725 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts @@ -37,7 +37,8 @@ describe('PermissionConsumer - reconnect', () => { // Then - reconnect should trigger close and init await waitAndRetry(() => closeSpy.mock.calls.length === 1) - expect(consumer.isActive).toBe(false) + expect(consumer.isActive).toBe(true) + expect(consumer.isConnected).toBe(true) await waitAndRetry(() => initSpy.mock.calls.length === 1, 100, 15) expect(closeSpy).toHaveBeenCalledTimes(1) From bc9e20c3a4bf474ff84b04b4d1c4a5e35079ed78 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 12:49:47 +0100 Subject: [PATCH 14/24] Fixing small issue --- packages/kafka/lib/AbstractKafkaConsumer.ts | 1 + .../kafka/test/consumer/PermissionConsumer.reconnect.spec.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 6359f70c..f819f83c 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -264,6 +264,7 @@ export abstract class AbstractKafkaConsumer< } } + await this.close() // closing in case something is open after last init call this.isReconnecting = false this.handleError(new Error('Consumer failed to reconnect after max attempts'), { maxAttempts: MAX_RECONNECT_ATTEMPTS, diff --git a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts index 6b7c0725..041b833f 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts @@ -25,7 +25,7 @@ describe('PermissionConsumer - reconnect', () => { ;(consumer as any).consumerStream.destroy() } - it('should call close and init again after stream error', async () => { + it('should try to reconnect', async () => { // Given await consumer.init() From 2e1de6228139cf5a5b8014edba1d9cb25d1b1fe0 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 12:50:09 +0100 Subject: [PATCH 15/24] Adding test to cover reconnect error --- .../PermissionConsumer.reconnect.spec.ts | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts index 041b833f..cfc02116 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts @@ -44,5 +44,36 @@ describe('PermissionConsumer - reconnect', () => { expect(closeSpy).toHaveBeenCalledTimes(1) expect(initSpy).toHaveBeenCalledTimes(1) expect(consumer.isActive).toBe(true) + expect(consumer.isConnected).toBe(true) + }) + + it('should handle errors on reconnection', { timeout: 40_000 }, async () => { + // Given + await consumer.init() + + const closeSpy = vi.spyOn(consumer, 'close') + const initSpy = vi.spyOn(consumer, 'init').mockRejectedValue(new Error('Kafka unavailable')) + const errorReporterSpy = vi.spyOn(testContext.cradle.errorReporter, 'report') + + // When - trigger stream error which starts the reconnect loop + simulateStreamError() + + // Wait for all 5 attempts to exhaust (1+2+4+8+16 = 31s of backoff) + await waitAndRetry(() => errorReporterSpy.mock.calls.length > 0, 500, 65) + + // Then + expect(errorReporterSpy).toHaveBeenCalledOnce() + expect(errorReporterSpy).toHaveBeenCalledWith( + expect.objectContaining({ + error: expect.objectContaining({ + message: 'Consumer failed to reconnect after max attempts', + }), + }), + ) + + expect(initSpy).toHaveBeenCalledTimes(5) + expect(closeSpy).toHaveBeenCalledTimes(6) // Retries + final clean-up + expect(consumer.isConnected).toBe(false) + expect(consumer.isActive).toBe(false) }) }) From 86c873252567442e4608d57395facac03d61f573 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 13:21:43 +0100 Subject: [PATCH 16/24] Test fix --- .../kafka/test/consumer/PermissionConsumer.reconnect.spec.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts index cfc02116..c99a4741 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts @@ -12,6 +12,10 @@ describe('PermissionConsumer - reconnect', () => { consumer = new PermissionConsumer(testContext.cradle) }) + beforeEach(() => { + vi.restoreAllMocks() + }) + afterEach(async () => { await consumer.close() }) From 783978e8a7bd3bfd7573f5a0e9552d5b62f06187 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 14:42:20 +0100 Subject: [PATCH 17/24] Test fix --- packages/kafka/lib/AbstractKafkaConsumer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index f819f83c..5357a4dc 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -234,6 +234,7 @@ export abstract class AbstractKafkaConsumer< ) this.messageBatchStream = undefined + this.consumer?.leaveGroup() await this.consumer?.close() this.consumer = undefined } From c54eba283028136f36a3da97953ace967f2bb506 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 15:17:26 +0100 Subject: [PATCH 18/24] Minor improvement --- packages/kafka/lib/AbstractKafkaConsumer.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 5357a4dc..2969905f 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -226,6 +226,8 @@ export abstract class AbstractKafkaConsumer< } async close(): Promise { + if (!this.consumer) return Promise.resolve() + await this.consumerStream?.close() this.consumerStream = undefined @@ -234,8 +236,8 @@ export abstract class AbstractKafkaConsumer< ) this.messageBatchStream = undefined - this.consumer?.leaveGroup() - await this.consumer?.close() + this.consumer.leaveGroup() + await this.consumer.close() this.consumer = undefined } From 08a69753b77ab2c04302292b7d824f1d7172769c Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 15:23:51 +0100 Subject: [PATCH 19/24] Trying to fix tests --- packages/kafka/lib/AbstractKafkaConsumer.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 2969905f..22c97f94 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -236,8 +236,11 @@ export abstract class AbstractKafkaConsumer< ) this.messageBatchStream = undefined - this.consumer.leaveGroup() - await this.consumer.close() + try { + await this.consumer.close() + } catch { + // Ignoring errors at this stage + } this.consumer = undefined } From 8659a3ef85d772148fb9e77a341ad9bd4f1c018d Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 16:35:19 +0100 Subject: [PATCH 20/24] AI comment --- packages/kafka/lib/AbstractKafkaConsumer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 22c97f94..caf1ed3b 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -200,6 +200,7 @@ export abstract class AbstractKafkaConsumer< this.consumerStream.on('error', (error) => this.handleError(error)) } } catch (error) { + await this.close() // clean-up throw new InternalError({ message: 'Consumer init failed', errorCode: 'KAFKA_CONSUMER_INIT_ERROR', From a14d4b3a661f925ecd134b7f51fef64f7a1cc4a4 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 17:13:10 +0100 Subject: [PATCH 21/24] test fix --- packages/kafka/lib/AbstractKafkaConsumer.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index caf1ed3b..22c97f94 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -200,7 +200,6 @@ export abstract class AbstractKafkaConsumer< this.consumerStream.on('error', (error) => this.handleError(error)) } } catch (error) { - await this.close() // clean-up throw new InternalError({ message: 'Consumer init failed', errorCode: 'KAFKA_CONSUMER_INIT_ERROR', From d006089dbdab01ad99c4069feac59824a33cf03c Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 18:10:25 +0100 Subject: [PATCH 22/24] plt kafka update --- packages/kafka/load-tests/package.json | 2 +- packages/kafka/package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/kafka/load-tests/package.json b/packages/kafka/load-tests/package.json index a5993325..9b8bb410 100644 --- a/packages/kafka/load-tests/package.json +++ b/packages/kafka/load-tests/package.json @@ -33,7 +33,7 @@ "@message-queue-toolkit/schemas": "file:../../schemas", "@lokalise/node-core": "^14.2.0", "@platformatic/dynamic-buffer": "^0.3.1", - "@platformatic/kafka": "1.30.0", + "@platformatic/kafka": "1.31.0", "pg": "^8.19.0", "zod": "^4.0.17" }, diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 711a7c6c..359405f7 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -53,7 +53,7 @@ "dependencies": { "@lokalise/node-core": "^14.2.0", "@lokalise/universal-ts-utils": "^4.5.1", - "@platformatic/kafka": "^1.30.0" + "@platformatic/kafka": "^1.31.0" }, "peerDependencies": { "@message-queue-toolkit/core": ">=23.0.0", From 92f224dd6a3121aafaaf5e119933487d81023c65 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 18:32:42 +0100 Subject: [PATCH 23/24] Adding comment --- packages/kafka/lib/AbstractKafkaConsumer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 22c97f94..320e704b 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -157,6 +157,7 @@ export abstract class AbstractKafkaConsumer< const topics = Object.keys(this.options.handlers) if (topics.length === 0) throw new Error('At least one topic must be defined') + // Consumer needs to be recreated; once you call close, it ends in a final state, so we need to start from scratch this.consumer = new Consumer({ ...this.options.kafka, ...this.options, From 2b2b9af1ed536e1c255572db3c63fab8efd3a4d9 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 19 Mar 2026 18:37:46 +0100 Subject: [PATCH 24/24] test fix --- packages/kafka/test/publisher/PermissionPublisher.spec.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/kafka/test/publisher/PermissionPublisher.spec.ts b/packages/kafka/test/publisher/PermissionPublisher.spec.ts index 20a3eea6..790cf51a 100644 --- a/packages/kafka/test/publisher/PermissionPublisher.spec.ts +++ b/packages/kafka/test/publisher/PermissionPublisher.spec.ts @@ -1,5 +1,4 @@ import { randomUUID } from 'node:crypto' -import { InternalError } from '@lokalise/node-core' import type { MockInstance } from 'vitest' import { PERMISSION_ADDED_SCHEMA, @@ -97,11 +96,8 @@ describe('PermissionPublisher', () => { // Then expect(error).toBeDefined() expect(error).toMatchInlineSnapshot( - '[InternalError: Error while publishing to Kafka: metadata failed 4 times.]', + `[InternalError: Error while publishing to Kafka: Unknown topic permission-added.]`, ) - expect(error).toBeInstanceOf(InternalError) - expect(error.cause).toBeInstanceOf(AggregateError) - expect(error.cause.errors[0].errors[0].apiId).toBe('UNKNOWN_TOPIC_OR_PARTITION') }) it.each([