diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index fa1948ef..320e704b 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -3,6 +3,7 @@ import { pipeline } from 'node:stream/promises' import { setTimeout } from 'node:timers/promises' import { InternalError, + resolveGlobalErrorLogObject, stringValueSerializer, type TransactionObservabilityManager, } from '@lokalise/node-core' @@ -85,6 +86,7 @@ TODO: Proper retry mechanism + DLQ -> https://lokalise.atlassian.net/browse/EDEX In the meantime, we will retry in memory up to 3 times */ const MAX_IN_MEMORY_RETRIES = 3 +const MAX_RECONNECT_ATTEMPTS = 5 export abstract class AbstractKafkaConsumer< TopicsConfig extends TopicConfig[], @@ -94,11 +96,12 @@ export abstract class AbstractKafkaConsumer< TopicsConfig, KafkaConsumerOptions > { - private readonly consumer: Consumer + private consumer?: Consumer private consumerStream?: MessagesStream private messageBatchStream?: KafkaMessageBatchStream< DeserializedMessage> > + private isReconnecting: boolean private readonly transactionObservabilityManager: TransactionObservabilityManager private readonly executionContext: ExecutionContext @@ -109,45 +112,18 @@ export abstract class AbstractKafkaConsumer< executionContext: ExecutionContext, ) { super(dependencies, options) - this.transactionObservabilityManager = dependencies.transactionObservabilityManager this.executionContext = executionContext - this.consumer = new Consumer({ - ...this.options.kafka, - ...this.options, - autocommit: false, // Handling commits manually - deserializers: { - key: stringDeserializer, - value: safeJsonDeserializer, - headerKey: stringDeserializer, - headerValue: stringDeserializer, - }, - }) - - const logDetails = { origin: this.constructor.name, groupId: this.options.groupId } - /* v8 ignore start */ - this.consumer.on('consumer:group:join', (_) => - this.logger.debug(logDetails, 'Consumer is joining a group'), - ) - this.consumer.on('consumer:group:rejoin', () => - this.logger.debug(logDetails, 'Consumer is re-joining a group after a rebalance'), - ) - this.consumer.on('consumer:group:leave', (_) => - this.logger.debug(logDetails, 'Consumer is leaving the group'), - ) - this.consumer.on('consumer:group:rebalance', (_) => - this.logger.debug(logDetails, 'Group is rebalancing'), - ) - /* v8 ignore stop */ + this.isReconnecting = false } /** - * Returns true if all client's connections are currently connected and the client is connected to at least one broker. + * Returns `true` if all client connections are currently active and the client is connected to at least one broker. + * During a reconnect attempt, returns `true` until all reconnect attempts are exhausted. */ get isConnected(): boolean { - // Streams are created only when init method was called - if (!this.consumerStream && !this.messageBatchStream) return false + if (!this.consumer) return this.isReconnecting try { return this.consumer.isConnected() /* v8 ignore start */ @@ -159,12 +135,12 @@ export abstract class AbstractKafkaConsumer< } /** - * Returns `true` if the consumer is not closed, and it is currently an active member of a consumer group. - * This method will return `false` during consumer group rebalancing. + * Returns `true` if the consumer is not closed and is an active member of a consumer group. + * Returns `false` during consumer group rebalancing. + * During a reconnect attempt, returns `true` until all reconnect attempts are exhausted. */ get isActive(): boolean { - // Streams are created only when init method was called - if (!this.consumerStream && !this.messageBatchStream) return false + if (!this.consumer) return this.isReconnecting try { return this.consumer.isActive() /* v8 ignore start */ @@ -176,10 +152,24 @@ export abstract class AbstractKafkaConsumer< } async init(): Promise { - if (this.consumerStream) return Promise.resolve() + if (this.consumer) return Promise.resolve() + const topics = Object.keys(this.options.handlers) if (topics.length === 0) throw new Error('At least one topic must be defined') + // Consumer needs to be recreated; once you call close, it ends in a final state, so we need to start from scratch + this.consumer = new Consumer({ + ...this.options.kafka, + ...this.options, + autocommit: false, // Handling commits manually + deserializers: { + key: stringDeserializer, + value: safeJsonDeserializer, + headerKey: stringDeserializer, + headerValue: stringDeserializer, + }, + }) + try { const { handlers: _, ...consumeOptions } = this.options // Handlers cannot be passed to consume method @@ -202,12 +192,13 @@ export abstract class AbstractKafkaConsumer< }) // Use pipeline for better error handling and backpressure management. - // pipeline() internally listens for errors on all streams + // pipeline() internally listens for errors on all streams and rejects if any stream errors. + // The .catch() here reports the error; reconnection is handled by handleStream's .catch() below. pipeline(this.consumerStream, this.messageBatchStream).catch((error) => - this.handlerError(error), + this.handleError(error), ) } else { - this.consumerStream.on('error', (error) => this.handlerError(error)) + this.consumerStream.on('error', (error) => this.handleError(error)) } } catch (error) { throw new InternalError({ @@ -217,63 +208,84 @@ export abstract class AbstractKafkaConsumer< }) } - if (this.messageBatchStream) { - this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) - } else { - this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) - } + this.handleStream( + this.messageBatchStream ? this.messageBatchStream : this.consumerStream, + ).catch((error) => this.reconnect(error)) } - private async handleSyncStream( - stream: MessagesStream, - ): Promise { - for await (const message of stream) { + private async handleStream( + stream: + | MessagesStream + | KafkaMessageBatchStream>>, + ) { + for await (const messageOrBatch of stream) { await this.consume( - message.topic, - message as DeserializedMessage>, + Array.isArray(messageOrBatch) ? messageOrBatch[0].topic : messageOrBatch.topic, + messageOrBatch, ) } } - private async handleSyncStreamBatch( - stream: KafkaMessageBatchStream>>, - ): Promise { - for await (const messageBatch of stream) { - await this.consume(messageBatch[0].topic, messageBatch) - } - } async close(): Promise { - if (!this.consumerStream && !this.messageBatchStream) { - // Leaving the group in case consumer joined but streams were not created - if (this.isActive) this.consumer.leaveGroup() - return - } + if (!this.consumer) return Promise.resolve() - if (this.consumerStream) { - await this.consumerStream.close() - this.consumerStream = undefined - } + await this.consumerStream?.close() + this.consumerStream = undefined - if (this.messageBatchStream) { - await new Promise((resolve) => this.messageBatchStream?.end(resolve)) - this.messageBatchStream = undefined - } + await new Promise((resolve) => + this.messageBatchStream ? this.messageBatchStream?.end(resolve) : resolve(undefined), + ) + this.messageBatchStream = undefined - await this.consumer.close() + try { + await this.consumer.close() + } catch { + // Ignoring errors at this stage + } + this.consumer = undefined } - private resolveHandler(topic: SupportedTopics) { - return this.options.handlers[topic] + private async reconnect(error: unknown): Promise { + this.isReconnecting = true + this.logger.info( + { error: resolveGlobalErrorLogObject(error) }, + 'Stream error detected, attempting to reconnect', + ) + + for (let attempt = 0; attempt < MAX_RECONNECT_ATTEMPTS; attempt++) { + try { + await this.close() + await setTimeout(Math.pow(2, attempt) * 1000) // Backoff delay starting with 1s + await this.init() + this.isReconnecting = false + return + } catch (error) { + this.logger.warn( + { + attempt, + maxAttempts: MAX_RECONNECT_ATTEMPTS, + error: resolveGlobalErrorLogObject(error), + }, + 'Reconnect attempt failed', + ) + } + } + + await this.close() // closing in case something is open after last init call + this.isReconnecting = false + this.handleError(new Error('Consumer failed to reconnect after max attempts'), { + maxAttempts: MAX_RECONNECT_ATTEMPTS, + }) } private async consume( - topic: string, + topic: SupportedTopics, messageOrBatch: MessageOrBatch>, ): Promise { const messageProcessingStartTimestamp = Date.now() - this.logger.debug({ origin: this.constructor.name, topic }, 'Consuming message(s)') + this.logger.debug({ topic }, 'Consuming message(s)') - const handlerConfig = this.resolveHandler(topic) + const handlerConfig = this.options.handlers[topic] // if there is no handler for the message, we ignore it (simulating subscription) if (!handlerConfig) return this.commit(messageOrBatch) @@ -285,11 +297,11 @@ export abstract class AbstractKafkaConsumer< ) if (!validMessages.length) { - this.logger.debug({ origin: this.constructor.name, topic }, 'Received not valid message(s)') + this.logger.debug({ topic }, 'Received not valid message(s)') return this.commit(messageOrBatch) } else { this.logger.debug( - { origin: this.constructor.name, topic, validMessagesCount: validMessages.length }, + { topic, validMessagesCount: validMessages.length }, 'Received valid message(s) to process', ) } @@ -339,7 +351,7 @@ export abstract class AbstractKafkaConsumer< const parseResult = handlerConfig.schema.safeParse(message.value) if (!parseResult.success) { - this.handlerError(parseResult.error, { + this.handleError(parseResult.error, { topic: message.topic, message: stringValueSerializer(message.value), }) @@ -415,7 +427,7 @@ export abstract class AbstractKafkaConsumer< const errorContext = Array.isArray(messageOrBatch) ? { batchSize: messageOrBatch.length } : { message: stringValueSerializer(messageOrBatch.value) } - this.handlerError(error, { topic, ...errorContext }) + this.handleError(error, { topic, ...errorContext }) } return { status: 'error', errorReason: 'handlerError' } @@ -435,32 +447,32 @@ export abstract class AbstractKafkaConsumer< } } - private commit(messageOrBatch: MessageOrBatch>) { + private async commit(messageOrBatch: MessageOrBatch>) { + let messageToCommit: DeserializedMessage> if (Array.isArray(messageOrBatch)) { if (messageOrBatch.length === 0) return Promise.resolve() // biome-ignore lint/style/noNonNullAssertion: we check the length above - return this.commitMessage(messageOrBatch[messageOrBatch.length - 1]!) + messageToCommit = messageOrBatch[messageOrBatch.length - 1]! } else { - return this.commitMessage(messageOrBatch) + messageToCommit = messageOrBatch } - } - private async commitMessage(message: DeserializedMessage>) { const logDetails = { - topic: message.topic, - offset: message.offset, - timestamp: message.timestamp, + topic: messageToCommit.topic, + offset: messageToCommit.offset, + timestamp: messageToCommit.timestamp, } this.logger.debug(logDetails, 'Trying to commit message') try { - await message.commit() + await messageToCommit.commit() this.logger.debug(logDetails, 'Message committed successfully') } catch (error) { this.logger.debug(logDetails, 'Message commit failed') - if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error) - this.handlerError(error) + return error instanceof ResponseError + ? this.handleResponseErrorOnCommit(error) + : this.handleError(error) } } @@ -483,7 +495,7 @@ export abstract class AbstractKafkaConsumer< `Failed to commit message: ${error.message}`, ) } else { - this.handlerError(error) + this.handleError(error) } } } diff --git a/packages/kafka/lib/AbstractKafkaPublisher.ts b/packages/kafka/lib/AbstractKafkaPublisher.ts index fdb76f1a..378b2798 100644 --- a/packages/kafka/lib/AbstractKafkaPublisher.ts +++ b/packages/kafka/lib/AbstractKafkaPublisher.ts @@ -122,10 +122,9 @@ export abstract class AbstractKafkaPublisher< } catch (error) { const errorDetails = { topic, - publisher: this.constructor.name, message: stringValueSerializer(message), } - this.handlerError(error, errorDetails) + this.handleError(error, errorDetails) throw new InternalError({ message: `Error while publishing to Kafka: ${(error as Error).message}`, errorCode: 'KAFKA_PUBLISH_ERROR', diff --git a/packages/kafka/lib/AbstractKafkaService.ts b/packages/kafka/lib/AbstractKafkaService.ts index bfaf3164..a08d6f88 100644 --- a/packages/kafka/lib/AbstractKafkaService.ts +++ b/packages/kafka/lib/AbstractKafkaService.ts @@ -56,7 +56,7 @@ export abstract class AbstractKafkaService< protected readonly _handlerSpy?: HandlerSpy> constructor(dependencies: KafkaDependencies, options: KafkaOptions) { - this.logger = dependencies.logger + this.logger = dependencies.logger.child({ origin: this.constructor.name }) this.errorReporter = dependencies.errorReporter this.messageMetricsManager = dependencies.messageMetricsManager this.options = { ...options, messageIdField: options.messageIdField ?? 'id' } @@ -125,7 +125,7 @@ export abstract class AbstractKafkaService< } } - protected handlerError(error: unknown, context: Record = {}): void { + protected handleError(error: unknown, context: Record = {}): void { const resolvedErrorLog = resolveGlobalErrorLogObject(error) this.logger.error({ ...resolvedErrorLog, ...context }) if (isError(error)) diff --git a/packages/kafka/load-tests/package.json b/packages/kafka/load-tests/package.json index a5993325..9b8bb410 100644 --- a/packages/kafka/load-tests/package.json +++ b/packages/kafka/load-tests/package.json @@ -33,7 +33,7 @@ "@message-queue-toolkit/schemas": "file:../../schemas", "@lokalise/node-core": "^14.2.0", "@platformatic/dynamic-buffer": "^0.3.1", - "@platformatic/kafka": "1.30.0", + "@platformatic/kafka": "1.31.0", "pg": "^8.19.0", "zod": "^4.0.17" }, diff --git a/packages/kafka/package.json b/packages/kafka/package.json index 711a7c6c..359405f7 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -53,7 +53,7 @@ "dependencies": { "@lokalise/node-core": "^14.2.0", "@lokalise/universal-ts-utils": "^4.5.1", - "@platformatic/kafka": "^1.30.0" + "@platformatic/kafka": "^1.31.0" }, "peerDependencies": { "@message-queue-toolkit/core": ">=23.0.0", diff --git a/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts b/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts index 11450848..fa001dd2 100644 --- a/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts +++ b/packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts @@ -64,7 +64,7 @@ describe('PermissionBatchConsumer', () => { // Given consumer = new PermissionBatchConsumer(testContext.cradle, { handlers: {} }) // When - Then - await expect(consumer.close()).resolves.not.toThrowError() + await expect(consumer.close()).resolves.not.toThrow() }) it('should not fail on init if it is already initiated', async () => { @@ -74,7 +74,7 @@ describe('PermissionBatchConsumer', () => { await consumer.init() // Then - await expect(consumer.init()).resolves.not.toThrowError() + await expect(consumer.init()).resolves.not.toThrow() }) it('should fail if kafka is not available', async () => { @@ -109,7 +109,7 @@ describe('PermissionBatchConsumer', () => { consumer = new PermissionBatchConsumer(testContext.cradle) // When - Then - await expect(consumer.init()).resolves.not.toThrowError() + await expect(consumer.init()).resolves.not.toThrow() }) }) diff --git a/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts new file mode 100644 index 00000000..c99a4741 --- /dev/null +++ b/packages/kafka/test/consumer/PermissionConsumer.reconnect.spec.ts @@ -0,0 +1,83 @@ +import { waitAndRetry } from '@lokalise/universal-ts-utils/node' +import { afterAll, afterEach, beforeAll, expect, vi } from 'vitest' +import { createTestContext, type TestContext } from '../utils/testContext.ts' +import { PermissionConsumer } from './PermissionConsumer.ts' + +describe('PermissionConsumer - reconnect', () => { + let testContext: TestContext + let consumer: PermissionConsumer + + beforeAll(async () => { + testContext = await createTestContext() + consumer = new PermissionConsumer(testContext.cradle) + }) + + beforeEach(() => { + vi.restoreAllMocks() + }) + + afterEach(async () => { + await consumer.close() + }) + + afterAll(async () => { + await testContext.dispose() + }) + + const simulateStreamError = () => { + // Simulate error on pipeline + ;(consumer as any).consumerStream.destroy() + } + + it('should try to reconnect', async () => { + // Given + await consumer.init() + + const closeSpy = vi.spyOn(consumer, 'close') + const initSpy = vi.spyOn(consumer, 'init') + + // When + simulateStreamError() + + // Then - reconnect should trigger close and init + await waitAndRetry(() => closeSpy.mock.calls.length === 1) + expect(consumer.isActive).toBe(true) + expect(consumer.isConnected).toBe(true) + await waitAndRetry(() => initSpy.mock.calls.length === 1, 100, 15) + + expect(closeSpy).toHaveBeenCalledTimes(1) + expect(initSpy).toHaveBeenCalledTimes(1) + expect(consumer.isActive).toBe(true) + expect(consumer.isConnected).toBe(true) + }) + + it('should handle errors on reconnection', { timeout: 40_000 }, async () => { + // Given + await consumer.init() + + const closeSpy = vi.spyOn(consumer, 'close') + const initSpy = vi.spyOn(consumer, 'init').mockRejectedValue(new Error('Kafka unavailable')) + const errorReporterSpy = vi.spyOn(testContext.cradle.errorReporter, 'report') + + // When - trigger stream error which starts the reconnect loop + simulateStreamError() + + // Wait for all 5 attempts to exhaust (1+2+4+8+16 = 31s of backoff) + await waitAndRetry(() => errorReporterSpy.mock.calls.length > 0, 500, 65) + + // Then + expect(errorReporterSpy).toHaveBeenCalledOnce() + expect(errorReporterSpy).toHaveBeenCalledWith( + expect.objectContaining({ + error: expect.objectContaining({ + message: 'Consumer failed to reconnect after max attempts', + }), + }), + ) + + expect(initSpy).toHaveBeenCalledTimes(5) + expect(closeSpy).toHaveBeenCalledTimes(6) // Retries + final clean-up + expect(consumer.isConnected).toBe(false) + expect(consumer.isActive).toBe(false) + }) +}) diff --git a/packages/kafka/test/consumer/PermissionConsumer.spec.ts b/packages/kafka/test/consumer/PermissionConsumer.spec.ts index 85025121..8e41a3d7 100644 --- a/packages/kafka/test/consumer/PermissionConsumer.spec.ts +++ b/packages/kafka/test/consumer/PermissionConsumer.spec.ts @@ -65,7 +65,7 @@ describe('PermissionConsumer', () => { // Given consumer = new PermissionConsumer(testContext.cradle, { handlers: {} }) // When - Then - await expect(consumer.close()).resolves.not.toThrowError() + await expect(consumer.close()).resolves.not.toThrow() }) it('should not fail on init if it is already initiated', async () => { @@ -75,7 +75,7 @@ describe('PermissionConsumer', () => { await consumer.init() // Then - await expect(consumer.init()).resolves.not.toThrowError() + await expect(consumer.init()).resolves.not.toThrow() }) it('should fail if kafka is not available', async () => { @@ -110,7 +110,7 @@ describe('PermissionConsumer', () => { consumer = new PermissionConsumer(testContext.cradle) // When - Then - await expect(consumer.init()).resolves.not.toThrowError() + await expect(consumer.init()).resolves.not.toThrow() }) }) diff --git a/packages/kafka/test/publisher/PermissionPublisher.spec.ts b/packages/kafka/test/publisher/PermissionPublisher.spec.ts index 20a3eea6..790cf51a 100644 --- a/packages/kafka/test/publisher/PermissionPublisher.spec.ts +++ b/packages/kafka/test/publisher/PermissionPublisher.spec.ts @@ -1,5 +1,4 @@ import { randomUUID } from 'node:crypto' -import { InternalError } from '@lokalise/node-core' import type { MockInstance } from 'vitest' import { PERMISSION_ADDED_SCHEMA, @@ -97,11 +96,8 @@ describe('PermissionPublisher', () => { // Then expect(error).toBeDefined() expect(error).toMatchInlineSnapshot( - '[InternalError: Error while publishing to Kafka: metadata failed 4 times.]', + `[InternalError: Error while publishing to Kafka: Unknown topic permission-added.]`, ) - expect(error).toBeInstanceOf(InternalError) - expect(error.cause).toBeInstanceOf(AggregateError) - expect(error.cause.errors[0].errors[0].apiId).toBe('UNKNOWN_TOPIC_OR_PARTITION') }) it.each([