Skip to content

Commit d5d4cbe

Browse files
authored
feat: Reconnect feature on stream error (#419)
* Small simplification * Reduce duplication * Replacing deprecates * Improve init and close flows to allow restarts * Restart PoC * Proper reconnect flow * Improve error report * Reconnect logs fixes * Remove duplication by using logger child * AI fix * lint * Adding test * Handling healtcheck during reconnect * Fixing small issue * Adding test to cover reconnect error * Test fix * Test fix * Minor improvement * Trying to fix tests * AI comment * test fix * plt kafka update * Adding comment * test fix
1 parent f21d661 commit d5d4cbe

9 files changed

Lines changed: 200 additions & 110 deletions

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 105 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { pipeline } from 'node:stream/promises'
33
import { setTimeout } from 'node:timers/promises'
44
import {
55
InternalError,
6+
resolveGlobalErrorLogObject,
67
stringValueSerializer,
78
type TransactionObservabilityManager,
89
} from '@lokalise/node-core'
@@ -85,6 +86,7 @@ TODO: Proper retry mechanism + DLQ -> https://lokalise.atlassian.net/browse/EDEX
8586
In the meantime, we will retry in memory up to 3 times
8687
*/
8788
const MAX_IN_MEMORY_RETRIES = 3
89+
const MAX_RECONNECT_ATTEMPTS = 5
8890

8991
export abstract class AbstractKafkaConsumer<
9092
TopicsConfig extends TopicConfig[],
@@ -94,11 +96,12 @@ export abstract class AbstractKafkaConsumer<
9496
TopicsConfig,
9597
KafkaConsumerOptions<TopicsConfig, ExecutionContext, BatchProcessingEnabled>
9698
> {
97-
private readonly consumer: Consumer<string, object, string, string>
99+
private consumer?: Consumer<string, object, string, string>
98100
private consumerStream?: MessagesStream<string, object, string, string>
99101
private messageBatchStream?: KafkaMessageBatchStream<
100102
DeserializedMessage<SupportedMessageValues<TopicsConfig>>
101103
>
104+
private isReconnecting: boolean
102105

103106
private readonly transactionObservabilityManager: TransactionObservabilityManager
104107
private readonly executionContext: ExecutionContext
@@ -109,45 +112,18 @@ export abstract class AbstractKafkaConsumer<
109112
executionContext: ExecutionContext,
110113
) {
111114
super(dependencies, options)
112-
113115
this.transactionObservabilityManager = dependencies.transactionObservabilityManager
114116
this.executionContext = executionContext
115117

116-
this.consumer = new Consumer({
117-
...this.options.kafka,
118-
...this.options,
119-
autocommit: false, // Handling commits manually
120-
deserializers: {
121-
key: stringDeserializer,
122-
value: safeJsonDeserializer,
123-
headerKey: stringDeserializer,
124-
headerValue: stringDeserializer,
125-
},
126-
})
127-
128-
const logDetails = { origin: this.constructor.name, groupId: this.options.groupId }
129-
/* v8 ignore start */
130-
this.consumer.on('consumer:group:join', (_) =>
131-
this.logger.debug(logDetails, 'Consumer is joining a group'),
132-
)
133-
this.consumer.on('consumer:group:rejoin', () =>
134-
this.logger.debug(logDetails, 'Consumer is re-joining a group after a rebalance'),
135-
)
136-
this.consumer.on('consumer:group:leave', (_) =>
137-
this.logger.debug(logDetails, 'Consumer is leaving the group'),
138-
)
139-
this.consumer.on('consumer:group:rebalance', (_) =>
140-
this.logger.debug(logDetails, 'Group is rebalancing'),
141-
)
142-
/* v8 ignore stop */
118+
this.isReconnecting = false
143119
}
144120

145121
/**
146-
* Returns true if all client's connections are currently connected and the client is connected to at least one broker.
122+
* Returns `true` if all client connections are currently active and the client is connected to at least one broker.
123+
* During a reconnect attempt, returns `true` until all reconnect attempts are exhausted.
147124
*/
148125
get isConnected(): boolean {
149-
// Streams are created only when init method was called
150-
if (!this.consumerStream && !this.messageBatchStream) return false
126+
if (!this.consumer) return this.isReconnecting
151127
try {
152128
return this.consumer.isConnected()
153129
/* v8 ignore start */
@@ -159,12 +135,12 @@ export abstract class AbstractKafkaConsumer<
159135
}
160136

161137
/**
162-
* Returns `true` if the consumer is not closed, and it is currently an active member of a consumer group.
163-
* This method will return `false` during consumer group rebalancing.
138+
* Returns `true` if the consumer is not closed and is an active member of a consumer group.
139+
* Returns `false` during consumer group rebalancing.
140+
* During a reconnect attempt, returns `true` until all reconnect attempts are exhausted.
164141
*/
165142
get isActive(): boolean {
166-
// Streams are created only when init method was called
167-
if (!this.consumerStream && !this.messageBatchStream) return false
143+
if (!this.consumer) return this.isReconnecting
168144
try {
169145
return this.consumer.isActive()
170146
/* v8 ignore start */
@@ -176,10 +152,24 @@ export abstract class AbstractKafkaConsumer<
176152
}
177153

178154
async init(): Promise<void> {
179-
if (this.consumerStream) return Promise.resolve()
155+
if (this.consumer) return Promise.resolve()
156+
180157
const topics = Object.keys(this.options.handlers)
181158
if (topics.length === 0) throw new Error('At least one topic must be defined')
182159

160+
// Consumer needs to be recreated; once you call close, it ends in a final state, so we need to start from scratch
161+
this.consumer = new Consumer({
162+
...this.options.kafka,
163+
...this.options,
164+
autocommit: false, // Handling commits manually
165+
deserializers: {
166+
key: stringDeserializer,
167+
value: safeJsonDeserializer,
168+
headerKey: stringDeserializer,
169+
headerValue: stringDeserializer,
170+
},
171+
})
172+
183173
try {
184174
const { handlers: _, ...consumeOptions } = this.options // Handlers cannot be passed to consume method
185175

@@ -202,12 +192,13 @@ export abstract class AbstractKafkaConsumer<
202192
})
203193

204194
// Use pipeline for better error handling and backpressure management.
205-
// pipeline() internally listens for errors on all streams
195+
// pipeline() internally listens for errors on all streams and rejects if any stream errors.
196+
// The .catch() here reports the error; reconnection is handled by handleStream's .catch() below.
206197
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
207-
this.handlerError(error),
198+
this.handleError(error),
208199
)
209200
} else {
210-
this.consumerStream.on('error', (error) => this.handlerError(error))
201+
this.consumerStream.on('error', (error) => this.handleError(error))
211202
}
212203
} catch (error) {
213204
throw new InternalError({
@@ -217,63 +208,84 @@ export abstract class AbstractKafkaConsumer<
217208
})
218209
}
219210

220-
if (this.messageBatchStream) {
221-
this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error))
222-
} else {
223-
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
224-
}
211+
this.handleStream(
212+
this.messageBatchStream ? this.messageBatchStream : this.consumerStream,
213+
).catch((error) => this.reconnect(error))
225214
}
226215

227-
private async handleSyncStream(
228-
stream: MessagesStream<string, object, string, string>,
229-
): Promise<void> {
230-
for await (const message of stream) {
216+
private async handleStream(
217+
stream:
218+
| MessagesStream<string, object, string, string>
219+
| KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
220+
) {
221+
for await (const messageOrBatch of stream) {
231222
await this.consume(
232-
message.topic,
233-
message as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
223+
Array.isArray(messageOrBatch) ? messageOrBatch[0].topic : messageOrBatch.topic,
224+
messageOrBatch,
234225
)
235226
}
236227
}
237-
private async handleSyncStreamBatch(
238-
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
239-
): Promise<void> {
240-
for await (const messageBatch of stream) {
241-
await this.consume(messageBatch[0].topic, messageBatch)
242-
}
243-
}
244228

245229
async close(): Promise<void> {
246-
if (!this.consumerStream && !this.messageBatchStream) {
247-
// Leaving the group in case consumer joined but streams were not created
248-
if (this.isActive) this.consumer.leaveGroup()
249-
return
250-
}
230+
if (!this.consumer) return Promise.resolve()
251231

252-
if (this.consumerStream) {
253-
await this.consumerStream.close()
254-
this.consumerStream = undefined
255-
}
232+
await this.consumerStream?.close()
233+
this.consumerStream = undefined
256234

257-
if (this.messageBatchStream) {
258-
await new Promise((resolve) => this.messageBatchStream?.end(resolve))
259-
this.messageBatchStream = undefined
260-
}
235+
await new Promise((resolve) =>
236+
this.messageBatchStream ? this.messageBatchStream?.end(resolve) : resolve(undefined),
237+
)
238+
this.messageBatchStream = undefined
261239

262-
await this.consumer.close()
240+
try {
241+
await this.consumer.close()
242+
} catch {
243+
// Ignoring errors at this stage
244+
}
245+
this.consumer = undefined
263246
}
264247

265-
private resolveHandler(topic: SupportedTopics<TopicsConfig>) {
266-
return this.options.handlers[topic]
248+
private async reconnect(error: unknown): Promise<void> {
249+
this.isReconnecting = true
250+
this.logger.info(
251+
{ error: resolveGlobalErrorLogObject(error) },
252+
'Stream error detected, attempting to reconnect',
253+
)
254+
255+
for (let attempt = 0; attempt < MAX_RECONNECT_ATTEMPTS; attempt++) {
256+
try {
257+
await this.close()
258+
await setTimeout(Math.pow(2, attempt) * 1000) // Backoff delay starting with 1s
259+
await this.init()
260+
this.isReconnecting = false
261+
return
262+
} catch (error) {
263+
this.logger.warn(
264+
{
265+
attempt,
266+
maxAttempts: MAX_RECONNECT_ATTEMPTS,
267+
error: resolveGlobalErrorLogObject(error),
268+
},
269+
'Reconnect attempt failed',
270+
)
271+
}
272+
}
273+
274+
await this.close() // closing in case something is open after last init call
275+
this.isReconnecting = false
276+
this.handleError(new Error('Consumer failed to reconnect after max attempts'), {
277+
maxAttempts: MAX_RECONNECT_ATTEMPTS,
278+
})
267279
}
268280

269281
private async consume(
270-
topic: string,
282+
topic: SupportedTopics<TopicsConfig>,
271283
messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>,
272284
): Promise<void> {
273285
const messageProcessingStartTimestamp = Date.now()
274-
this.logger.debug({ origin: this.constructor.name, topic }, 'Consuming message(s)')
286+
this.logger.debug({ topic }, 'Consuming message(s)')
275287

276-
const handlerConfig = this.resolveHandler(topic)
288+
const handlerConfig = this.options.handlers[topic]
277289

278290
// if there is no handler for the message, we ignore it (simulating subscription)
279291
if (!handlerConfig) return this.commit(messageOrBatch)
@@ -285,11 +297,11 @@ export abstract class AbstractKafkaConsumer<
285297
)
286298

287299
if (!validMessages.length) {
288-
this.logger.debug({ origin: this.constructor.name, topic }, 'Received not valid message(s)')
300+
this.logger.debug({ topic }, 'Received not valid message(s)')
289301
return this.commit(messageOrBatch)
290302
} else {
291303
this.logger.debug(
292-
{ origin: this.constructor.name, topic, validMessagesCount: validMessages.length },
304+
{ topic, validMessagesCount: validMessages.length },
293305
'Received valid message(s) to process',
294306
)
295307
}
@@ -339,7 +351,7 @@ export abstract class AbstractKafkaConsumer<
339351
const parseResult = handlerConfig.schema.safeParse(message.value)
340352

341353
if (!parseResult.success) {
342-
this.handlerError(parseResult.error, {
354+
this.handleError(parseResult.error, {
343355
topic: message.topic,
344356
message: stringValueSerializer(message.value),
345357
})
@@ -415,7 +427,7 @@ export abstract class AbstractKafkaConsumer<
415427
const errorContext = Array.isArray(messageOrBatch)
416428
? { batchSize: messageOrBatch.length }
417429
: { message: stringValueSerializer(messageOrBatch.value) }
418-
this.handlerError(error, { topic, ...errorContext })
430+
this.handleError(error, { topic, ...errorContext })
419431
}
420432

421433
return { status: 'error', errorReason: 'handlerError' }
@@ -435,32 +447,32 @@ export abstract class AbstractKafkaConsumer<
435447
}
436448
}
437449

438-
private commit(messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>) {
450+
private async commit(messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>) {
451+
let messageToCommit: DeserializedMessage<SupportedMessageValues<TopicsConfig>>
439452
if (Array.isArray(messageOrBatch)) {
440453
if (messageOrBatch.length === 0) return Promise.resolve()
441454

442455
// biome-ignore lint/style/noNonNullAssertion: we check the length above
443-
return this.commitMessage(messageOrBatch[messageOrBatch.length - 1]!)
456+
messageToCommit = messageOrBatch[messageOrBatch.length - 1]!
444457
} else {
445-
return this.commitMessage(messageOrBatch)
458+
messageToCommit = messageOrBatch
446459
}
447-
}
448460

449-
private async commitMessage(message: DeserializedMessage<SupportedMessageValues<TopicsConfig>>) {
450461
const logDetails = {
451-
topic: message.topic,
452-
offset: message.offset,
453-
timestamp: message.timestamp,
462+
topic: messageToCommit.topic,
463+
offset: messageToCommit.offset,
464+
timestamp: messageToCommit.timestamp,
454465
}
455466
this.logger.debug(logDetails, 'Trying to commit message')
456467

457468
try {
458-
await message.commit()
469+
await messageToCommit.commit()
459470
this.logger.debug(logDetails, 'Message committed successfully')
460471
} catch (error) {
461472
this.logger.debug(logDetails, 'Message commit failed')
462-
if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error)
463-
this.handlerError(error)
473+
return error instanceof ResponseError
474+
? this.handleResponseErrorOnCommit(error)
475+
: this.handleError(error)
464476
}
465477
}
466478

@@ -483,7 +495,7 @@ export abstract class AbstractKafkaConsumer<
483495
`Failed to commit message: ${error.message}`,
484496
)
485497
} else {
486-
this.handlerError(error)
498+
this.handleError(error)
487499
}
488500
}
489501
}

packages/kafka/lib/AbstractKafkaPublisher.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,9 @@ export abstract class AbstractKafkaPublisher<
122122
} catch (error) {
123123
const errorDetails = {
124124
topic,
125-
publisher: this.constructor.name,
126125
message: stringValueSerializer(message),
127126
}
128-
this.handlerError(error, errorDetails)
127+
this.handleError(error, errorDetails)
129128
throw new InternalError({
130129
message: `Error while publishing to Kafka: ${(error as Error).message}`,
131130
errorCode: 'KAFKA_PUBLISH_ERROR',

packages/kafka/lib/AbstractKafkaService.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export abstract class AbstractKafkaService<
5656
protected readonly _handlerSpy?: HandlerSpy<SupportedMessageValues<TopicsConfig>>
5757

5858
constructor(dependencies: KafkaDependencies, options: KafkaOptions) {
59-
this.logger = dependencies.logger
59+
this.logger = dependencies.logger.child({ origin: this.constructor.name })
6060
this.errorReporter = dependencies.errorReporter
6161
this.messageMetricsManager = dependencies.messageMetricsManager
6262
this.options = { ...options, messageIdField: options.messageIdField ?? 'id' }
@@ -125,7 +125,7 @@ export abstract class AbstractKafkaService<
125125
}
126126
}
127127

128-
protected handlerError(error: unknown, context: Record<string, unknown> = {}): void {
128+
protected handleError(error: unknown, context: Record<string, unknown> = {}): void {
129129
const resolvedErrorLog = resolveGlobalErrorLogObject(error)
130130
this.logger.error({ ...resolvedErrorLog, ...context })
131131
if (isError(error))

packages/kafka/load-tests/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"@message-queue-toolkit/schemas": "file:../../schemas",
3434
"@lokalise/node-core": "^14.2.0",
3535
"@platformatic/dynamic-buffer": "^0.3.1",
36-
"@platformatic/kafka": "1.30.0",
36+
"@platformatic/kafka": "1.31.0",
3737
"pg": "^8.19.0",
3838
"zod": "^4.0.17"
3939
},

packages/kafka/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
"dependencies": {
5454
"@lokalise/node-core": "^14.2.0",
5555
"@lokalise/universal-ts-utils": "^4.5.1",
56-
"@platformatic/kafka": "^1.30.0"
56+
"@platformatic/kafka": "^1.31.0"
5757
},
5858
"peerDependencies": {
5959
"@message-queue-toolkit/core": ">=23.0.0",

0 commit comments

Comments
 (0)