Kafka Triggers: Do less to do more!
Let's smooth the sharp edges and achieve 0(n) complexity for this issue by limiting the scope of this feature. Beta was great, we learned a lot, and now we've got a target architecture for general availability.
Problem
The Kafka trigger implementation has two interrelated problems:
-
The consumer commits offsets regardless of whether the work order was created. This single gap is the root cause of every sharp edge in the current implementation — silent message loss, invisible errors, no replay — and has spawned compensating infrastructure (alternate file storage, manual recovery, dedup tracking, failure emails) that adds complexity without fully solving the problem.
-
Kafka-specific logic has spread beyond the consumer layer. The compensating infrastructure above, plus ambiguity about ordering, backpressure, and bulk operations, creates a growing surface area where every new platform feature must ask "but does this work for Kafka?" This is O(n²) complexity that should be O(n): the consumer should be a thin ingestion layer, and everything downstream should be trigger-type-agnostic.
Sharp Edges in the Current Implementation
1. Silent message loss
If a message can't be persisted as a work order (due to a database error or hitting the hard run limit), the consumer marks it as failed internally but commits the offset anyway — this is Broadway's default behavior. Broadway commits offsets for all messages that complete the pipeline, including those marked failed via Broadway.Message.failed/2. The message is gone from Kafka's perspective, but Lightning never created a work order. The user has no idea this happened.
There is an "alternate storage" mechanism (KAFKA_ALTERNATE_STORAGE_ENABLED) that writes failed messages to disk as JSON files, but:
- It's off by default
- It only covers
:persistence failures, not :work_order_creation_blocked or other error types
- Recovery requires manual intervention via
MessageRecovery.recover_messages/1
- It's a second persistence layer that exists solely because the primary path lacks atomicity
2. Errors are invisible to users
When message processing fails, errors go to:
- Elixir
Logger (server logs only)
- Sentry (ops team only)
- Email notifications (throttled to once per
KAFKA_NOTIFICATION_EMBARGO_SECONDS, default 1 hour)
Nothing surfaces in the Lightning UI. There's no failed work order, no error state on the trigger, no history entry. Compare this with webhooks, where a failed persistence would return an HTTP error and the caller would retry.
3. No practical replay
Because failed messages don't produce work orders, there's nothing in the database to replay. The alternate storage recovery mechanism is a manual, CLI-only process that re-calls Pipeline.handle_message/3 directly. Users can't initiate replay from the UI, and the standard "rerun from start" feature doesn't help because there was never a run to rerun.
4. Duplicate risk after long disconnections
Deduplication relies on the trigger_kafka_message_records table, which is cleaned up every 10 minutes (retaining records for KAFKA_DUPLICATE_TRACKING_RETENTION_SECONDS, default 1 hour). If a consumer group is disconnected long enough for both:
- The Kafka cluster to expire the consumer group's committed offsets, AND
- The dedup records to be cleaned up
...then redelivered messages will be processed as new, creating duplicate work orders.
5. Instance-level tuning only
Performance settings (number_of_consumers, number_of_processors, number_of_messages_per_second) are configured via environment variables and apply identically to every Kafka trigger on the instance. In multi-tenant deployments, different users need different settings to match their Kafka cluster configurations and throughput requirements.
6. Ordering vs. concurrency tradeoff is opaque
Kafka guarantees ordering within a partition by message key. Broadway preserves this ordering during run creation (with the default single-processor config), but the downstream run execution pipeline does not: multiple workers claim runs via FOR UPDATE SKIP LOCKED with no per-trigger or per-partition serialization. Users must set workflow concurrency to 1 to preserve execution sequence, but this isn't documented or enforced — it's a silent correctness trap.
Root Cause
The core issue is that BroadwayKafka commits offsets for all messages that complete the pipeline, regardless of success or failure. The consumer calls WorkOrders.create_for/3 inside handle_message/3, but whether that call succeeds or fails, Broadway considers the message handled and commits the offset.
This means there is a window between "Kafka thinks the message was consumed" and "Lightning actually persisted a work order" where data can be lost. Every piece of compensating infrastructure in the current implementation — alternate file storage, manual recovery, dedup tracking, failure emails — exists to paper over this gap. None of them fully close it.
Proposed Solution: Manual Offset Control
Replace Broadway's automatic offset commits with manual offset management. Only commit an offset after the corresponding work order has been successfully persisted to the database. If persistence fails, the offset is not committed, and Kafka redelivers the message.
Approach: Replace Broadway with direct :brod usage
Drop Broadway and use :brod's brod_group_subscriber_v2 directly. This gives full control over offset commits, consumer group management, and message processing flow.
brod_group_subscriber_v2 spawns one worker process per assigned partition. Each worker receives messages via a handle_message/2 callback and controls offset commits through its return value:
- Worker receives a message for its assigned partition
- Attempts work order creation
- On success: returns
{:ok, :ack, state} — the group coordinator batches the offset and commits it on the next offset_commit_interval_seconds tick
- On failure: returns
{:ok, state} (no ack) and retries with backoff
Note: offsets are not committed directly via :brod.commit_offsets/2. Instead, the :ack atom in the return tuple signals the group coordinator to include that offset in its next periodic commit. This is a cleaner model — the coordinator batches acks and commits them efficiently, while the worker only decides "did this message succeed?"
This eliminates the impedance mismatch between Broadway's "message pipeline" model and our need for transactional offset control.
Backup option: Crash on failure
If the :brod migration proves too costly, an interim alternative is to keep Broadway but let the consumer process crash when work order creation fails. Broadway restarts the process, BroadwayKafka re-joins the consumer group, and Kafka redelivers from the last committed offset. This is crude but atomic — offsets are never committed for messages that weren't persisted.
Downsides: noisy restarts, potential thundering herd on transient DB issues, and all in-flight messages for that consumer are redelivered (not just the failed one).
How Each Sharp Edge Resolves
Gating offset commits on successful persistence resolves the sharp edges:
| Current Problem |
Resolution |
| Silent message loss |
Offset is only committed after the work order is persisted. If persistence fails, the offset stays uncommitted and Kafka redelivers. No data loss window. |
| Errors invisible to users |
The consumer retries until the work order is created. Transient errors (DB blip) self-heal. Persistent errors (hard run limit) cause the consumer to stall on that partition with backoff, which is observable via a health/status indicator. The problem shifts from "we lost your data and didn't tell you" to "ingestion is paused and here's why." |
| No practical replay |
Every successfully consumed message has a work order in the database. Standard replay/rerun features work because there's always a run to rerun. |
| Duplicate risk |
Deduplication is still needed for at-least-once delivery (Kafka's standard guarantee). The existing topic_partition_offset idempotency check stays, but the duplicate tracking retention window becomes less critical because the primary failure mode (lost messages that get replayed much later) is eliminated. |
| Instance-level tuning only |
Resolved by the :brod switch. Broadway's number_of_consumers and number_of_processors ENV vars become obsolete — brod_group_subscriber_v2 automatically spawns one worker per partition. Fetch tuning (prefetch_count, prefetch_bytes) and rate limiting move to per-trigger kafka_configuration fields, passed directly to each trigger's :brod client/subscriber config. See Per-Trigger Tuning section below. |
| Ordering vs. concurrency |
Separate concern — not caused by the offset commit issue. Can be addressed independently via documentation and/or workflow concurrency enforcement for Kafka triggers. |
What Can Be Removed
Once offset commits are gated on successful persistence, the following compensating infrastructure becomes unnecessary:
MessageRecovery module and alternate file storage (KAFKA_ALTERNATE_STORAGE_* config) — messages that fail to persist are retried by Kafka, not written to disk for manual recovery
- Kafka-specific user failure email notifications and the
KAFKA_NOTIFICATION_EMBARGO_SECONDS throttle — replaced by a health/status indicator on the trigger
- Several instance-level ENV variables (
KAFKA_ALTERNATE_STORAGE_ENABLED, KAFKA_ALTERNATE_STORAGE_FILE_PATH, KAFKA_NOTIFICATION_EMBARGO_SECONDS)
The dedup table (trigger_kafka_message_records) and its cleanup worker should be retained — they serve a different purpose (preventing duplicates from at-least-once delivery) that isn't solved by offset control alone.
What Stays the Same
The consumer still:
- Runs as a supervised OTP process per enabled Kafka trigger
- Manages connection, authentication, and consumer group membership
- Deserializes message payloads and validates they're JSON objects
- Calls
WorkOrders.create_for/3 with type: :kafka and Kafka-specific request metadata
- Applies rate limiting / backpressure
Resource model change: Today, each trigger runs one Broadway pipeline. With brod_group_subscriber_v2, each trigger runs one subscriber that spawns one worker process per assigned partition (plus a coordinator process). If a trigger subscribes to a topic with 12 partitions, that trigger has 12 worker processes and 1 coordinator — not 1 process. This is actually better: a stalled partition blocks only its own worker, not the entire trigger's consumption.
Supervisor tree change: BroadwayKafka manages :brod clients internally. With direct :brod usage, each trigger needs an explicit :brod client started via :brod.start_client/3 (with that trigger's brokers, SSL, and SASL config) before brod_group_subscriber_v2.start_link/1 can reference it. The supervisor tree for each trigger should start the client first, then the subscriber — and tear down both when the trigger is disabled. A per-trigger supervisor (:rest_for_one) is a natural fit:
PipelineSupervisor (one_for_one, dynamic children)
├── TriggerSupervisor for trigger A (rest_for_one)
│ ├── :brod client (named :"client-<trigger_id>")
│ └── brod_group_subscriber_v2 (references client above)
├── TriggerSupervisor for trigger B (rest_for_one)
│ ├── :brod client
│ └── brod_group_subscriber_v2
└── ...
The trigger configuration UI, health monitoring, and per-trigger config schema are unchanged.
Implementation Considerations
- Backpressure on persistent failure: If work order creation is blocked (e.g., hard run limit), the consumer will retry indefinitely on that partition. Need a backoff strategy that doesn't overwhelm the DB but also doesn't silently stop consuming. A health/status indicator should surface this state to users.
- Partial partition stalls: With manual offset control, a single failing message can block consumption of an entire partition. Need to decide: retry forever (preserves ordering, risks stalling), or skip after N retries and log/alert (loses ordering guarantee but unblocks the partition).
- Metadata passthrough: Kafka headers, topic, partition, offset, key, and timestamp should continue to be passed through as
dataclip.request metadata so they're available for user inspection.
- Graceful migration: The consumer group IDs (
lightning-<uuid>) must be preserved when switching from Broadway to :brod so Kafka resumes from the last committed offset rather than replaying from initial_offset_reset_policy. The group_id is already stored in kafka_configuration in the database — pass the same string to brod_group_subscriber_v2's config.
:brod client lifecycle: Each trigger needs a :brod client started via :brod.start_client/3 before its subscriber can reference it. The per-trigger supervisor tree (:rest_for_one with client then subscriber as children) handles startup ordering and teardown. See the supervisor tree diagram in "What Stays the Same" above.
Achieving O(n) Complexity
The offset commit fix above eliminates the largest source of Kafka-specific complexity — the compensating infrastructure for lost messages. But fixing the atomicity gap alone isn't enough; three areas need explicit treatment to ensure the integration stays O(n) as the platform grows.
Principle: The Consumer is a Thin Ingestion Layer
After the offset commit fix, the Kafka consumer's job is:
- Connect to Kafka, authenticate, manage consumer group
- Receive a message, deserialize it, check for duplicates
- Call
WorkOrders.create_for/3 — the same function webhooks use
- On success: return
{:ok, :ack, state}. On failure: return {:ok, state} and retry.
From step 3 onward, there is no Kafka-specific code path. The work order, its runs, steps, dataclips, history, replay, UI, rate limiting, sandboxes, lost runs handling, and worker execution all follow the identical path as webhook- triggered work orders. The only differences are metadata: the trigger has type: :kafka, the dataclip has type: :kafka with Kafka-specific request fields (topic, partition, offset, key, headers).
This means the answer to "does feature X also work for Kafka?" is always yes for any feature that operates on work orders, runs, or downstream. The only features that need Kafka awareness are those in the consumer layer itself.
Bounded Kafka-specific surface
| Layer |
Kafka-specific? |
Scope |
| Consumer connection, auth, consumer groups |
Yes |
Lightning.KafkaTriggers only |
| Offset commit management |
Yes |
Lightning.KafkaTriggers only |
Message dedup (trigger_kafka_message_records) |
Yes |
Lightning.KafkaTriggers only |
| Trigger configuration UI (broker, topic, etc.) |
Yes |
Trigger form component only |
| Trigger health/status indicator |
Yes |
Trigger status component only |
| Work order creation |
No — uses WorkOrders.create_for/3 |
Common path |
| Runs, steps, dataclips |
No |
Common path |
| History, replay, rerun |
No |
Common path |
| Worker execution |
No |
Common path |
| Rate limiting |
No |
Common path |
| Sandboxes |
No |
Common path |
| Lost runs handling |
No |
Common path |
| Bulk operations / cursors |
No |
Common path |
If a future feature operates on work orders or runs, it works for Kafka automatically. If a future feature operates on the consumer layer, it's scoped to Lightning.KafkaTriggers — a single module boundary. This is O(n), not O(n²).
1. Message ordering: 🌟 Not our problem to solve at the platform level 🌟
Kafka guarantees ordering within a partition by key. Webhooks make no ordering guarantee. The question is whether the platform needs to enforce or preserve Kafka's ordering semantics downstream.
Answer: No. Ordering is a property of the source, not the platform. Once a Kafka message becomes a work order, it enters the same queue as every other work order. If a user needs ordered processing, they use workflow concurrency = 1 — the same mechanism they'd use if they needed ordered processing of webhooks arriving in rapid succession.
This is already how it works today. The consumer creates work orders; the worker processes them respecting workflow concurrency settings. No Kafka-specific branching is needed. What is needed:
- Documentation: Explain that Kafka partition ordering is preserved into work order creation order, but parallel worker execution may reorder runs. Users who need strict ordering should set workflow concurrency to 1.
- No enforcement: We do not auto-set concurrency for Kafka triggers. The user makes this choice based on whether their use case requires ordering (stateful event streams) or not (independent records).
This keeps ordering out of the platform's conditional logic entirely. It's a user configuration choice, same as for any trigger type.
2. Blocked persistence: Pause and surface, don't invent new failure modes
When work order creation is blocked (hard run limit, DB down), the consumer cannot commit the offset, so it retries. The question is what the retry/backoff strategy looks like and whether it creates Kafka-specific operational complexity.
Strategy: Exponential backoff with a cap, surfaced via existing trigger status.
- On failed persistence, wait 1s, 2s, 4s, ... up to a cap of 60s, then retry at 60s intervals.
- The consumer's partition assignment stays active —
brod_group_coordinator runs in a separate process from the worker and continues heartbeating independently, even while the worker is blocked in a retry loop.
- Messages queued behind the blocked one on the same partition wait — this preserves ordering and is standard Kafka consumer behavior. Other partitions are unaffected (each has its own worker process).
- The trigger's existing
enabled / error status field (already used for connection failures) reflects the stall state. No new status model needed.
Caveat: broker-side max.poll.interval.ms. While :brod's coordinator heartbeats independently (so session.timeout.ms isn't a concern), some Kafka deployments configure max.poll.interval.ms on the broker. If the worker doesn't fetch new messages within this interval, the broker kicks the consumer from the group. The default is 5 minutes, well above our 60s backoff cap, but deployments with aggressive settings could trigger a rebalance during extended stalls. This should be documented as a known interaction.
Why this isn't O(n²): The backoff/retry lives entirely inside the consumer (Lightning.KafkaTriggers). No downstream feature needs to know about it. The trigger status indicator already exists for connection errors — stalled persistence is just another error state in the same field. Monitoring, dashboards, and admin UIs that already read trigger status get this for free.
Escape hatch: If a partition is stalled for longer than a configurable threshold (default: 1 hour), log a warning and emit a Sentry alert. Do not skip the message — skipping would reintroduce the "lost message" problem this spec exists to fix. The correct resolution for a persistent stall is operational: increase the run limit, fix the DB, or disable the trigger.
3. Bulk operations and high-volume throughput
If a Kafka topic produces thousands of messages per second, does the 1:1 message-to-work-order model hold?
Answer: Yes, because it's the same model webhooks use. A webhook endpoint receiving 1,000 requests/second creates 1,000 work orders. A Kafka consumer receiving 1,000 messages/second creates 1,000 work orders via the same function. The throughput bottleneck is WorkOrders.create_for/3 and the database, not the trigger type.
If we ever need batch work order creation for throughput, it would be a common- path optimization (batch WorkOrders.create_for/3 calls) that benefits all trigger types equally. The Kafka consumer would call a batch variant the same way a hypothetical bulk webhook endpoint would. No Kafka-specific branching.
Cursors are a user-space concern — a job that processes a batch and tracks its position via collections or state. This works identically regardless of whether the triggering work order came from Kafka or a webhook.
4. Per-trigger tuning: No longer instance-level
The current Broadway-based implementation takes three ENV vars — KAFKA_NUMBER_OF_CONSUMERS, KAFKA_NUMBER_OF_PROCESSORS, KAFKA_NUMBER_OF_MESSAGES_PER_SECOND — and applies them identically to every Kafka trigger on the instance. The switch to :brod eliminates two of these entirely and makes the third per-trigger.
What the Broadway ENV vars mapped to, and why they go away
| Broadway ENV var |
What it controlled |
:brod equivalent |
KAFKA_NUMBER_OF_CONSUMERS |
Number of concurrent BroadwayKafka consumer connections per pipeline |
Gone. brod_group_subscriber_v2 automatically spawns one worker per assigned partition — parallelism matches partition count, which is the correct model. No knob needed. |
KAFKA_NUMBER_OF_PROCESSORS |
Concurrent message processing stages in the Broadway pipeline |
Gone. Each worker processes messages sequentially in its handle_message/2 callback. This is what we want — sequential processing per partition preserves ordering and ensures we don't ack before persistence completes. |
KAFKA_NUMBER_OF_MESSAGES_PER_SECOND |
Broadway rate limiter (converted to floor(per_second * 10) messages per 10s window) |
Moves to per-trigger config as a rate_limit field on kafka_configuration. Implemented in the worker's handle_message/2 callback (e.g., token bucket or simple sleep). |
What :brod exposes for per-trigger tuning
Each brod_group_subscriber_v2 instance accepts its own consumer_config and group_config maps at startup. Since each trigger starts its own subscriber, these are naturally per-trigger — no global state, no shared config.
consumer_config (per-partition fetch behavior, from brod_consumer):
| Parameter |
Default |
What it controls |
prefetch_count |
10 |
Max unacked messages before the consumer pauses fetching |
prefetch_bytes |
102,400 (100KB) |
Max unacked bytes before fetch pauses (both count AND bytes must be exceeded) |
max_bytes |
1,048,576 (1MB) |
Max payload per fetch request to the broker |
min_bytes |
0 |
Minimum bytes broker accumulates before responding |
max_wait_time |
10,000ms |
How long broker waits to fill min_bytes |
sleep_timeout |
1,000ms |
Consumer sleep when broker returns empty |
group_config (consumer group coordination, from brod_group_coordinator):
| Parameter |
Default |
What it controls |
session_timeout_seconds |
30 |
How long before broker considers consumer dead if no heartbeat |
heartbeat_rate_seconds |
5 |
Heartbeat interval |
offset_commit_interval_seconds |
5 |
How often acked offsets are committed to Kafka |
max_rejoin_attempts |
5 |
Max attempts to rejoin after rebalance |
rejoin_delay_seconds |
1 |
Delay between rejoin attempts |
Which parameters to expose in kafka_configuration
Not all of these should be user-facing. Most users don't need to tune brod_group_coordinator session timeouts. The parameters worth exposing in the trigger's kafka_configuration schema are those that affect throughput and resource usage:
| Field |
Type |
Default |
Why expose it |
prefetch_count |
integer |
10 |
Controls how aggressively the consumer fetches ahead. Higher = more throughput, more memory. Users with high-volume topics may want 100+. |
prefetch_bytes |
integer |
102,400 |
Pairs with prefetch_count. Users with large messages (e.g., 1MB payloads) need to increase this. |
max_bytes_per_fetch |
integer |
1,048,576 |
Cap on fetch response size. Users with very large messages may need to increase. |
rate_limit_per_second |
integer |
nil (unlimited) |
Replaces the global KAFKA_NUMBER_OF_MESSAGES_PER_SECOND. Users who want to throttle ingestion set this per-trigger. |
The remaining parameters (session_timeout_seconds, heartbeat_rate_seconds, offset_commit_interval_seconds, etc.) stay at :brod defaults. They can be exposed later if operational experience shows a need, but they're coordinator-level settings that rarely need per-trigger tuning.
Migration path
- Add the new fields to
KafkaConfiguration (embedded schema) with defaults matching current behavior.
- Pass them into the
consumer_config map when calling brod_group_subscriber_v2.start_link/1.
- Remove the Broadway ENV vars (
KAFKA_NUMBER_OF_CONSUMERS, KAFKA_NUMBER_OF_PROCESSORS, KAFKA_NUMBER_OF_MESSAGES_PER_SECOND) and the Lightning.Config functions that read them.
- Existing triggers get the defaults automatically — no data migration needed since
KafkaConfiguration is an embedded JSON schema with default values.
Success Criteria
- A Kafka message that fails to persist as a work order is never lost — the offset is not committed, and Kafka redelivers.
- Kafka-originated work orders are created via
WorkOrders.create_for/3, the same function webhooks use. They are distinguishable by their trigger association (the work order's trigger has type: :kafka), dataclip.type (:kafka), and Kafka-specific request metadata — but all downstream behavior (runs, replay, history, UI) is identical.
- The compensating infrastructure (alternate storage, manual recovery, failure emails) is removed.
- The consumer's only responsibilities are: connect to Kafka, receive messages, create work orders, commit offsets on success.
- Kafka-specific code is confined to
Lightning.KafkaTriggers and trigger UI components. All features operating on work orders, runs, or downstream work identically for Kafka and webhook triggers.
- Adding a new platform feature (e.g., bulk operations, new history views, sandboxes) requires zero Kafka-specific branching.
Kafka Triggers: Do less to do more!
Let's smooth the sharp edges and achieve 0(n) complexity for this issue by limiting the scope of this feature. Beta was great, we learned a lot, and now we've got a target architecture for general availability.
Problem
The Kafka trigger implementation has two interrelated problems:
The consumer commits offsets regardless of whether the work order was created. This single gap is the root cause of every sharp edge in the current implementation — silent message loss, invisible errors, no replay — and has spawned compensating infrastructure (alternate file storage, manual recovery, dedup tracking, failure emails) that adds complexity without fully solving the problem.
Kafka-specific logic has spread beyond the consumer layer. The compensating infrastructure above, plus ambiguity about ordering, backpressure, and bulk operations, creates a growing surface area where every new platform feature must ask "but does this work for Kafka?" This is O(n²) complexity that should be O(n): the consumer should be a thin ingestion layer, and everything downstream should be trigger-type-agnostic.
Sharp Edges in the Current Implementation
1. Silent message loss
If a message can't be persisted as a work order (due to a database error or hitting the hard run limit), the consumer marks it as failed internally but commits the offset anyway — this is Broadway's default behavior. Broadway commits offsets for all messages that complete the pipeline, including those marked failed via
Broadway.Message.failed/2. The message is gone from Kafka's perspective, but Lightning never created a work order. The user has no idea this happened.There is an "alternate storage" mechanism (
KAFKA_ALTERNATE_STORAGE_ENABLED) that writes failed messages to disk as JSON files, but::persistencefailures, not:work_order_creation_blockedor other error typesMessageRecovery.recover_messages/12. Errors are invisible to users
When message processing fails, errors go to:
Logger(server logs only)KAFKA_NOTIFICATION_EMBARGO_SECONDS, default 1 hour)Nothing surfaces in the Lightning UI. There's no failed work order, no error state on the trigger, no history entry. Compare this with webhooks, where a failed persistence would return an HTTP error and the caller would retry.
3. No practical replay
Because failed messages don't produce work orders, there's nothing in the database to replay. The alternate storage recovery mechanism is a manual, CLI-only process that re-calls
Pipeline.handle_message/3directly. Users can't initiate replay from the UI, and the standard "rerun from start" feature doesn't help because there was never a run to rerun.4. Duplicate risk after long disconnections
Deduplication relies on the
trigger_kafka_message_recordstable, which is cleaned up every 10 minutes (retaining records forKAFKA_DUPLICATE_TRACKING_RETENTION_SECONDS, default 1 hour). If a consumer group is disconnected long enough for both:...then redelivered messages will be processed as new, creating duplicate work orders.
5. Instance-level tuning only
Performance settings (
number_of_consumers,number_of_processors,number_of_messages_per_second) are configured via environment variables and apply identically to every Kafka trigger on the instance. In multi-tenant deployments, different users need different settings to match their Kafka cluster configurations and throughput requirements.6. Ordering vs. concurrency tradeoff is opaque
Kafka guarantees ordering within a partition by message key. Broadway preserves this ordering during run creation (with the default single-processor config), but the downstream run execution pipeline does not: multiple workers claim runs via
FOR UPDATE SKIP LOCKEDwith no per-trigger or per-partition serialization. Users must set workflow concurrency to 1 to preserve execution sequence, but this isn't documented or enforced — it's a silent correctness trap.Root Cause
The core issue is that BroadwayKafka commits offsets for all messages that complete the pipeline, regardless of success or failure. The consumer calls
WorkOrders.create_for/3insidehandle_message/3, but whether that call succeeds or fails, Broadway considers the message handled and commits the offset.This means there is a window between "Kafka thinks the message was consumed" and "Lightning actually persisted a work order" where data can be lost. Every piece of compensating infrastructure in the current implementation — alternate file storage, manual recovery, dedup tracking, failure emails — exists to paper over this gap. None of them fully close it.
Proposed Solution: Manual Offset Control
Replace Broadway's automatic offset commits with manual offset management. Only commit an offset after the corresponding work order has been successfully persisted to the database. If persistence fails, the offset is not committed, and Kafka redelivers the message.
Approach: Replace Broadway with direct
:brodusageDrop Broadway and use
:brod'sbrod_group_subscriber_v2directly. This gives full control over offset commits, consumer group management, and message processing flow.brod_group_subscriber_v2spawns one worker process per assigned partition. Each worker receives messages via ahandle_message/2callback and controls offset commits through its return value:{:ok, :ack, state}— the group coordinator batches the offset and commits it on the nextoffset_commit_interval_secondstick{:ok, state}(no ack) and retries with backoffNote: offsets are not committed directly via
:brod.commit_offsets/2. Instead, the:ackatom in the return tuple signals the group coordinator to include that offset in its next periodic commit. This is a cleaner model — the coordinator batches acks and commits them efficiently, while the worker only decides "did this message succeed?"This eliminates the impedance mismatch between Broadway's "message pipeline" model and our need for transactional offset control.
Backup option: Crash on failure
If the
:brodmigration proves too costly, an interim alternative is to keep Broadway but let the consumer process crash when work order creation fails. Broadway restarts the process, BroadwayKafka re-joins the consumer group, and Kafka redelivers from the last committed offset. This is crude but atomic — offsets are never committed for messages that weren't persisted.Downsides: noisy restarts, potential thundering herd on transient DB issues, and all in-flight messages for that consumer are redelivered (not just the failed one).
How Each Sharp Edge Resolves
Gating offset commits on successful persistence resolves the sharp edges:
topic_partition_offsetidempotency check stays, but the duplicate tracking retention window becomes less critical because the primary failure mode (lost messages that get replayed much later) is eliminated.:brodswitch. Broadway'snumber_of_consumersandnumber_of_processorsENV vars become obsolete —brod_group_subscriber_v2automatically spawns one worker per partition. Fetch tuning (prefetch_count,prefetch_bytes) and rate limiting move to per-triggerkafka_configurationfields, passed directly to each trigger's:brodclient/subscriber config. See Per-Trigger Tuning section below.What Can Be Removed
Once offset commits are gated on successful persistence, the following compensating infrastructure becomes unnecessary:
MessageRecoverymodule and alternate file storage (KAFKA_ALTERNATE_STORAGE_*config) — messages that fail to persist are retried by Kafka, not written to disk for manual recoveryKAFKA_NOTIFICATION_EMBARGO_SECONDSthrottle — replaced by a health/status indicator on the triggerKAFKA_ALTERNATE_STORAGE_ENABLED,KAFKA_ALTERNATE_STORAGE_FILE_PATH,KAFKA_NOTIFICATION_EMBARGO_SECONDS)The dedup table (
trigger_kafka_message_records) and its cleanup worker should be retained — they serve a different purpose (preventing duplicates from at-least-once delivery) that isn't solved by offset control alone.What Stays the Same
The consumer still:
WorkOrders.create_for/3withtype: :kafkaand Kafka-specificrequestmetadataResource model change: Today, each trigger runs one Broadway pipeline. With
brod_group_subscriber_v2, each trigger runs one subscriber that spawns one worker process per assigned partition (plus a coordinator process). If a trigger subscribes to a topic with 12 partitions, that trigger has 12 worker processes and 1 coordinator — not 1 process. This is actually better: a stalled partition blocks only its own worker, not the entire trigger's consumption.Supervisor tree change: BroadwayKafka manages
:brodclients internally. With direct:brodusage, each trigger needs an explicit:brodclient started via:brod.start_client/3(with that trigger's brokers, SSL, and SASL config) beforebrod_group_subscriber_v2.start_link/1can reference it. The supervisor tree for each trigger should start the client first, then the subscriber — and tear down both when the trigger is disabled. A per-trigger supervisor (:rest_for_one) is a natural fit:The trigger configuration UI, health monitoring, and per-trigger config schema are unchanged.
Implementation Considerations
dataclip.requestmetadata so they're available for user inspection.lightning-<uuid>) must be preserved when switching from Broadway to:brodso Kafka resumes from the last committed offset rather than replaying frominitial_offset_reset_policy. Thegroup_idis already stored inkafka_configurationin the database — pass the same string tobrod_group_subscriber_v2's config.:brodclient lifecycle: Each trigger needs a:brodclient started via:brod.start_client/3before its subscriber can reference it. The per-trigger supervisor tree (:rest_for_onewith client then subscriber as children) handles startup ordering and teardown. See the supervisor tree diagram in "What Stays the Same" above.Achieving O(n) Complexity
The offset commit fix above eliminates the largest source of Kafka-specific complexity — the compensating infrastructure for lost messages. But fixing the atomicity gap alone isn't enough; three areas need explicit treatment to ensure the integration stays O(n) as the platform grows.
Principle: The Consumer is a Thin Ingestion Layer
After the offset commit fix, the Kafka consumer's job is:
WorkOrders.create_for/3— the same function webhooks use{:ok, :ack, state}. On failure: return{:ok, state}and retry.From step 3 onward, there is no Kafka-specific code path. The work order, its runs, steps, dataclips, history, replay, UI, rate limiting, sandboxes, lost runs handling, and worker execution all follow the identical path as webhook- triggered work orders. The only differences are metadata: the trigger has
type: :kafka, the dataclip hastype: :kafkawith Kafka-specificrequestfields (topic, partition, offset, key, headers).This means the answer to "does feature X also work for Kafka?" is always yes for any feature that operates on work orders, runs, or downstream. The only features that need Kafka awareness are those in the consumer layer itself.
Bounded Kafka-specific surface
Lightning.KafkaTriggersonlyLightning.KafkaTriggersonlytrigger_kafka_message_records)Lightning.KafkaTriggersonlyWorkOrders.create_for/3If a future feature operates on work orders or runs, it works for Kafka automatically. If a future feature operates on the consumer layer, it's scoped to
Lightning.KafkaTriggers— a single module boundary. This is O(n), not O(n²).1. Message ordering: 🌟 Not our problem to solve at the platform level 🌟
Kafka guarantees ordering within a partition by key. Webhooks make no ordering guarantee. The question is whether the platform needs to enforce or preserve Kafka's ordering semantics downstream.
Answer: No. Ordering is a property of the source, not the platform. Once a Kafka message becomes a work order, it enters the same queue as every other work order. If a user needs ordered processing, they use workflow concurrency = 1 — the same mechanism they'd use if they needed ordered processing of webhooks arriving in rapid succession.
This is already how it works today. The consumer creates work orders; the worker processes them respecting workflow concurrency settings. No Kafka-specific branching is needed. What is needed:
This keeps ordering out of the platform's conditional logic entirely. It's a user configuration choice, same as for any trigger type.
2. Blocked persistence: Pause and surface, don't invent new failure modes
When work order creation is blocked (hard run limit, DB down), the consumer cannot commit the offset, so it retries. The question is what the retry/backoff strategy looks like and whether it creates Kafka-specific operational complexity.
Strategy: Exponential backoff with a cap, surfaced via existing trigger status.
brod_group_coordinatorruns in a separate process from the worker and continues heartbeating independently, even while the worker is blocked in a retry loop.enabled/errorstatus field (already used for connection failures) reflects the stall state. No new status model needed.Caveat: broker-side
max.poll.interval.ms. While:brod's coordinator heartbeats independently (sosession.timeout.msisn't a concern), some Kafka deployments configuremax.poll.interval.mson the broker. If the worker doesn't fetch new messages within this interval, the broker kicks the consumer from the group. The default is 5 minutes, well above our 60s backoff cap, but deployments with aggressive settings could trigger a rebalance during extended stalls. This should be documented as a known interaction.Why this isn't O(n²): The backoff/retry lives entirely inside the consumer (
Lightning.KafkaTriggers). No downstream feature needs to know about it. The trigger status indicator already exists for connection errors — stalled persistence is just another error state in the same field. Monitoring, dashboards, and admin UIs that already read trigger status get this for free.Escape hatch: If a partition is stalled for longer than a configurable threshold (default: 1 hour), log a warning and emit a Sentry alert. Do not skip the message — skipping would reintroduce the "lost message" problem this spec exists to fix. The correct resolution for a persistent stall is operational: increase the run limit, fix the DB, or disable the trigger.
3. Bulk operations and high-volume throughput
If a Kafka topic produces thousands of messages per second, does the 1:1 message-to-work-order model hold?
Answer: Yes, because it's the same model webhooks use. A webhook endpoint receiving 1,000 requests/second creates 1,000 work orders. A Kafka consumer receiving 1,000 messages/second creates 1,000 work orders via the same function. The throughput bottleneck is
WorkOrders.create_for/3and the database, not the trigger type.If we ever need batch work order creation for throughput, it would be a common- path optimization (batch
WorkOrders.create_for/3calls) that benefits all trigger types equally. The Kafka consumer would call a batch variant the same way a hypothetical bulk webhook endpoint would. No Kafka-specific branching.Cursors are a user-space concern — a job that processes a batch and tracks its position via
collectionsor state. This works identically regardless of whether the triggering work order came from Kafka or a webhook.4. Per-trigger tuning: No longer instance-level
The current Broadway-based implementation takes three ENV vars —
KAFKA_NUMBER_OF_CONSUMERS,KAFKA_NUMBER_OF_PROCESSORS,KAFKA_NUMBER_OF_MESSAGES_PER_SECOND— and applies them identically to every Kafka trigger on the instance. The switch to:brodeliminates two of these entirely and makes the third per-trigger.What the Broadway ENV vars mapped to, and why they go away
:brodequivalentKAFKA_NUMBER_OF_CONSUMERSbrod_group_subscriber_v2automatically spawns one worker per assigned partition — parallelism matches partition count, which is the correct model. No knob needed.KAFKA_NUMBER_OF_PROCESSORShandle_message/2callback. This is what we want — sequential processing per partition preserves ordering and ensures we don't ack before persistence completes.KAFKA_NUMBER_OF_MESSAGES_PER_SECONDfloor(per_second * 10)messages per 10s window)rate_limitfield onkafka_configuration. Implemented in the worker'shandle_message/2callback (e.g., token bucket or simple sleep).What
:brodexposes for per-trigger tuningEach
brod_group_subscriber_v2instance accepts its ownconsumer_configandgroup_configmaps at startup. Since each trigger starts its own subscriber, these are naturally per-trigger — no global state, no shared config.consumer_config(per-partition fetch behavior, frombrod_consumer):prefetch_countprefetch_bytesmax_bytesmin_bytesmax_wait_timemin_bytessleep_timeoutgroup_config(consumer group coordination, frombrod_group_coordinator):session_timeout_secondsheartbeat_rate_secondsoffset_commit_interval_secondsmax_rejoin_attemptsrejoin_delay_secondsWhich parameters to expose in
kafka_configurationNot all of these should be user-facing. Most users don't need to tune
brod_group_coordinatorsession timeouts. The parameters worth exposing in the trigger'skafka_configurationschema are those that affect throughput and resource usage:prefetch_countprefetch_bytesprefetch_count. Users with large messages (e.g., 1MB payloads) need to increase this.max_bytes_per_fetchrate_limit_per_secondKAFKA_NUMBER_OF_MESSAGES_PER_SECOND. Users who want to throttle ingestion set this per-trigger.The remaining parameters (
session_timeout_seconds,heartbeat_rate_seconds,offset_commit_interval_seconds, etc.) stay at:broddefaults. They can be exposed later if operational experience shows a need, but they're coordinator-level settings that rarely need per-trigger tuning.Migration path
KafkaConfiguration(embedded schema) with defaults matching current behavior.consumer_configmap when callingbrod_group_subscriber_v2.start_link/1.KAFKA_NUMBER_OF_CONSUMERS,KAFKA_NUMBER_OF_PROCESSORS,KAFKA_NUMBER_OF_MESSAGES_PER_SECOND) and theLightning.Configfunctions that read them.KafkaConfigurationis an embedded JSON schema with default values.Success Criteria
WorkOrders.create_for/3, the same function webhooks use. They are distinguishable by their trigger association (the work order's trigger hastype: :kafka),dataclip.type(:kafka), and Kafka-specificrequestmetadata — but all downstream behavior (runs, replay, history, UI) is identical.Lightning.KafkaTriggersand trigger UI components. All features operating on work orders, runs, or downstream work identically for Kafka and webhook triggers.