Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 105 additions & 93 deletions packages/kafka/lib/AbstractKafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { pipeline } from 'node:stream/promises'
import { setTimeout } from 'node:timers/promises'
import {
InternalError,
resolveGlobalErrorLogObject,
stringValueSerializer,
type TransactionObservabilityManager,
} from '@lokalise/node-core'
Expand Down Expand Up @@ -85,6 +86,7 @@ TODO: Proper retry mechanism + DLQ -> https://lokalise.atlassian.net/browse/EDEX
In the meantime, we will retry in memory up to 3 times
*/
const MAX_IN_MEMORY_RETRIES = 3
const MAX_RECONNECT_ATTEMPTS = 5

export abstract class AbstractKafkaConsumer<
TopicsConfig extends TopicConfig[],
Expand All @@ -94,11 +96,12 @@ export abstract class AbstractKafkaConsumer<
TopicsConfig,
KafkaConsumerOptions<TopicsConfig, ExecutionContext, BatchProcessingEnabled>
> {
private readonly consumer: Consumer<string, object, string, string>
private consumer?: Consumer<string, object, string, string>
private consumerStream?: MessagesStream<string, object, string, string>
private messageBatchStream?: KafkaMessageBatchStream<
DeserializedMessage<SupportedMessageValues<TopicsConfig>>
>
private isReconnecting: boolean

private readonly transactionObservabilityManager: TransactionObservabilityManager
private readonly executionContext: ExecutionContext
Expand All @@ -109,45 +112,18 @@ export abstract class AbstractKafkaConsumer<
executionContext: ExecutionContext,
) {
super(dependencies, options)

this.transactionObservabilityManager = dependencies.transactionObservabilityManager
this.executionContext = executionContext

this.consumer = new Consumer({
...this.options.kafka,
...this.options,
autocommit: false, // Handling commits manually
deserializers: {
key: stringDeserializer,
value: safeJsonDeserializer,
headerKey: stringDeserializer,
headerValue: stringDeserializer,
},
})

const logDetails = { origin: this.constructor.name, groupId: this.options.groupId }
/* v8 ignore start */
this.consumer.on('consumer:group:join', (_) =>
this.logger.debug(logDetails, 'Consumer is joining a group'),
)
this.consumer.on('consumer:group:rejoin', () =>
this.logger.debug(logDetails, 'Consumer is re-joining a group after a rebalance'),
)
this.consumer.on('consumer:group:leave', (_) =>
this.logger.debug(logDetails, 'Consumer is leaving the group'),
)
this.consumer.on('consumer:group:rebalance', (_) =>
this.logger.debug(logDetails, 'Group is rebalancing'),
)
/* v8 ignore stop */
this.isReconnecting = false
}

/**
* Returns true if all client's connections are currently connected and the client is connected to at least one broker.
* Returns `true` if all client connections are currently active and the client is connected to at least one broker.
* During a reconnect attempt, returns `true` until all reconnect attempts are exhausted.
*/
get isConnected(): boolean {
// Streams are created only when init method was called
if (!this.consumerStream && !this.messageBatchStream) return false
if (!this.consumer) return this.isReconnecting
try {
return this.consumer.isConnected()
/* v8 ignore start */
Expand All @@ -159,12 +135,12 @@ export abstract class AbstractKafkaConsumer<
}

/**
* Returns `true` if the consumer is not closed, and it is currently an active member of a consumer group.
* This method will return `false` during consumer group rebalancing.
* Returns `true` if the consumer is not closed and is an active member of a consumer group.
* Returns `false` during consumer group rebalancing.
* During a reconnect attempt, returns `true` until all reconnect attempts are exhausted.
*/
get isActive(): boolean {
// Streams are created only when init method was called
if (!this.consumerStream && !this.messageBatchStream) return false
if (!this.consumer) return this.isReconnecting
try {
return this.consumer.isActive()
/* v8 ignore start */
Expand All @@ -176,10 +152,24 @@ export abstract class AbstractKafkaConsumer<
}

async init(): Promise<void> {
if (this.consumerStream) return Promise.resolve()
if (this.consumer) return Promise.resolve()

const topics = Object.keys(this.options.handlers)
if (topics.length === 0) throw new Error('At least one topic must be defined')

// Consumer needs to be recreated; once you call close, it ends in a final state, so we need to start from scratch
this.consumer = new Consumer({
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer needs to be recreated, once you call close it ends in a final state so we need to start from scratch

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add this as a comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! added :D

...this.options.kafka,
...this.options,
autocommit: false, // Handling commits manually
deserializers: {
key: stringDeserializer,
value: safeJsonDeserializer,
headerKey: stringDeserializer,
headerValue: stringDeserializer,
},
})

try {
const { handlers: _, ...consumeOptions } = this.options // Handlers cannot be passed to consume method

Expand All @@ -202,12 +192,13 @@ export abstract class AbstractKafkaConsumer<
})

// Use pipeline for better error handling and backpressure management.
// pipeline() internally listens for errors on all streams
// pipeline() internally listens for errors on all streams and rejects if any stream errors.
// The .catch() here reports the error; reconnection is handled by handleStream's .catch() below.
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
this.handlerError(error),
this.handleError(error),
)
} else {
this.consumerStream.on('error', (error) => this.handlerError(error))
this.consumerStream.on('error', (error) => this.handleError(error))
}
} catch (error) {
throw new InternalError({
Expand All @@ -217,63 +208,84 @@ export abstract class AbstractKafkaConsumer<
})
}

if (this.messageBatchStream) {
this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error))
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
}
this.handleStream(
this.messageBatchStream ? this.messageBatchStream : this.consumerStream,
).catch((error) => this.reconnect(error))
}

private async handleSyncStream(
stream: MessagesStream<string, object, string, string>,
): Promise<void> {
for await (const message of stream) {
private async handleStream(
stream:
| MessagesStream<string, object, string, string>
| KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
) {
for await (const messageOrBatch of stream) {
await this.consume(
message.topic,
message as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
Array.isArray(messageOrBatch) ? messageOrBatch[0].topic : messageOrBatch.topic,
messageOrBatch,
)
}
}
private async handleSyncStreamBatch(
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
): Promise<void> {
for await (const messageBatch of stream) {
await this.consume(messageBatch[0].topic, messageBatch)
}
}

async close(): Promise<void> {
if (!this.consumerStream && !this.messageBatchStream) {
// Leaving the group in case consumer joined but streams were not created
if (this.isActive) this.consumer.leaveGroup()
return
}
if (!this.consumer) return Promise.resolve()

if (this.consumerStream) {
await this.consumerStream.close()
this.consumerStream = undefined
}
await this.consumerStream?.close()
this.consumerStream = undefined

if (this.messageBatchStream) {
await new Promise((resolve) => this.messageBatchStream?.end(resolve))
this.messageBatchStream = undefined
}
await new Promise((resolve) =>
this.messageBatchStream ? this.messageBatchStream?.end(resolve) : resolve(undefined),
)
this.messageBatchStream = undefined

await this.consumer.close()
try {
await this.consumer.close()
} catch {
// Ignoring errors at this stage
}
this.consumer = undefined
}

private resolveHandler(topic: SupportedTopics<TopicsConfig>) {
return this.options.handlers[topic]
private async reconnect(error: unknown): Promise<void> {
this.isReconnecting = true
this.logger.info(
{ error: resolveGlobalErrorLogObject(error) },
'Stream error detected, attempting to reconnect',
)

for (let attempt = 0; attempt < MAX_RECONNECT_ATTEMPTS; attempt++) {
try {
await this.close()
await setTimeout(Math.pow(2, attempt) * 1000) // Backoff delay starting with 1s
await this.init()
this.isReconnecting = false
return
} catch (error) {
this.logger.warn(
{
attempt,
maxAttempts: MAX_RECONNECT_ATTEMPTS,
error: resolveGlobalErrorLogObject(error),
},
'Reconnect attempt failed',
)
}
}

await this.close() // closing in case something is open after last init call
this.isReconnecting = false
this.handleError(new Error('Consumer failed to reconnect after max attempts'), {
maxAttempts: MAX_RECONNECT_ATTEMPTS,
})
}

private async consume(
topic: string,
topic: SupportedTopics<TopicsConfig>,
messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>,
): Promise<void> {
const messageProcessingStartTimestamp = Date.now()
this.logger.debug({ origin: this.constructor.name, topic }, 'Consuming message(s)')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Origin is now added with logger.child to avoid repetition

this.logger.debug({ topic }, 'Consuming message(s)')

const handlerConfig = this.resolveHandler(topic)
const handlerConfig = this.options.handlers[topic]

// if there is no handler for the message, we ignore it (simulating subscription)
if (!handlerConfig) return this.commit(messageOrBatch)
Expand All @@ -285,11 +297,11 @@ export abstract class AbstractKafkaConsumer<
)

if (!validMessages.length) {
this.logger.debug({ origin: this.constructor.name, topic }, 'Received not valid message(s)')
this.logger.debug({ topic }, 'Received not valid message(s)')
return this.commit(messageOrBatch)
} else {
this.logger.debug(
{ origin: this.constructor.name, topic, validMessagesCount: validMessages.length },
{ topic, validMessagesCount: validMessages.length },
'Received valid message(s) to process',
)
}
Expand Down Expand Up @@ -339,7 +351,7 @@ export abstract class AbstractKafkaConsumer<
const parseResult = handlerConfig.schema.safeParse(message.value)

if (!parseResult.success) {
this.handlerError(parseResult.error, {
this.handleError(parseResult.error, {
topic: message.topic,
message: stringValueSerializer(message.value),
})
Expand Down Expand Up @@ -415,7 +427,7 @@ export abstract class AbstractKafkaConsumer<
const errorContext = Array.isArray(messageOrBatch)
? { batchSize: messageOrBatch.length }
: { message: stringValueSerializer(messageOrBatch.value) }
this.handlerError(error, { topic, ...errorContext })
this.handleError(error, { topic, ...errorContext })
}

return { status: 'error', errorReason: 'handlerError' }
Expand All @@ -435,32 +447,32 @@ export abstract class AbstractKafkaConsumer<
}
}

private commit(messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>) {
private async commit(messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>) {
let messageToCommit: DeserializedMessage<SupportedMessageValues<TopicsConfig>>
if (Array.isArray(messageOrBatch)) {
if (messageOrBatch.length === 0) return Promise.resolve()

// biome-ignore lint/style/noNonNullAssertion: we check the length above
return this.commitMessage(messageOrBatch[messageOrBatch.length - 1]!)
messageToCommit = messageOrBatch[messageOrBatch.length - 1]!
} else {
return this.commitMessage(messageOrBatch)
messageToCommit = messageOrBatch
}
}

private async commitMessage(message: DeserializedMessage<SupportedMessageValues<TopicsConfig>>) {
const logDetails = {
topic: message.topic,
offset: message.offset,
timestamp: message.timestamp,
topic: messageToCommit.topic,
offset: messageToCommit.offset,
timestamp: messageToCommit.timestamp,
}
this.logger.debug(logDetails, 'Trying to commit message')

try {
await message.commit()
await messageToCommit.commit()
this.logger.debug(logDetails, 'Message committed successfully')
} catch (error) {
this.logger.debug(logDetails, 'Message commit failed')
if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error)
this.handlerError(error)
return error instanceof ResponseError
? this.handleResponseErrorOnCommit(error)
: this.handleError(error)
}
}

Expand All @@ -483,7 +495,7 @@ export abstract class AbstractKafkaConsumer<
`Failed to commit message: ${error.message}`,
)
} else {
this.handlerError(error)
this.handleError(error)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions packages/kafka/lib/AbstractKafkaPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,9 @@ export abstract class AbstractKafkaPublisher<
} catch (error) {
const errorDetails = {
topic,
publisher: this.constructor.name,
message: stringValueSerializer(message),
}
this.handlerError(error, errorDetails)
this.handleError(error, errorDetails)
throw new InternalError({
message: `Error while publishing to Kafka: ${(error as Error).message}`,
errorCode: 'KAFKA_PUBLISH_ERROR',
Expand Down
4 changes: 2 additions & 2 deletions packages/kafka/lib/AbstractKafkaService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export abstract class AbstractKafkaService<
protected readonly _handlerSpy?: HandlerSpy<SupportedMessageValues<TopicsConfig>>

constructor(dependencies: KafkaDependencies, options: KafkaOptions) {
this.logger = dependencies.logger
this.logger = dependencies.logger.child({ origin: this.constructor.name })
this.errorReporter = dependencies.errorReporter
this.messageMetricsManager = dependencies.messageMetricsManager
this.options = { ...options, messageIdField: options.messageIdField ?? 'id' }
Expand Down Expand Up @@ -125,7 +125,7 @@ export abstract class AbstractKafkaService<
}
}

protected handlerError(error: unknown, context: Record<string, unknown> = {}): void {
protected handleError(error: unknown, context: Record<string, unknown> = {}): void {
const resolvedErrorLog = resolveGlobalErrorLogObject(error)
this.logger.error({ ...resolvedErrorLog, ...context })
if (isError(error))
Expand Down
2 changes: 1 addition & 1 deletion packages/kafka/load-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"@message-queue-toolkit/schemas": "file:../../schemas",
"@lokalise/node-core": "^14.2.0",
"@platformatic/dynamic-buffer": "^0.3.1",
"@platformatic/kafka": "1.30.0",
"@platformatic/kafka": "1.31.0",
"pg": "^8.19.0",
"zod": "^4.0.17"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/kafka/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"dependencies": {
"@lokalise/node-core": "^14.2.0",
"@lokalise/universal-ts-utils": "^4.5.1",
"@platformatic/kafka": "^1.30.0"
"@platformatic/kafka": "^1.31.0"
},
"peerDependencies": {
"@message-queue-toolkit/core": ">=23.0.0",
Expand Down
Loading
Loading