Skip to content

Get Kafka from "beta" to "GA" → fully supported #4564

@taylordowns2000

Description

@taylordowns2000

Kafka Triggers: Do less to do more!

Let's smooth the sharp edges and achieve 0(n) complexity for this issue by limiting the scope of this feature. Beta was great, we learned a lot, and now we've got a target architecture for general availability.

Problem

The Kafka trigger implementation has two interrelated problems:

  1. The consumer commits offsets regardless of whether the work order was created. This single gap is the root cause of every sharp edge in the current implementation — silent message loss, invisible errors, no replay — and has spawned compensating infrastructure (alternate file storage, manual recovery, dedup tracking, failure emails) that adds complexity without fully solving the problem.

  2. Kafka-specific logic has spread beyond the consumer layer. The compensating infrastructure above, plus ambiguity about ordering, backpressure, and bulk operations, creates a growing surface area where every new platform feature must ask "but does this work for Kafka?" This is O(n²) complexity that should be O(n): the consumer should be a thin ingestion layer, and everything downstream should be trigger-type-agnostic.

Sharp Edges in the Current Implementation

1. Silent message loss

If a message can't be persisted as a work order (due to a database error or hitting the hard run limit), the consumer marks it as failed internally but commits the offset anyway — this is Broadway's default behavior. Broadway commits offsets for all messages that complete the pipeline, including those marked failed via Broadway.Message.failed/2. The message is gone from Kafka's perspective, but Lightning never created a work order. The user has no idea this happened.

There is an "alternate storage" mechanism (KAFKA_ALTERNATE_STORAGE_ENABLED) that writes failed messages to disk as JSON files, but:

  • It's off by default
  • It only covers :persistence failures, not :work_order_creation_blocked or other error types
  • Recovery requires manual intervention via MessageRecovery.recover_messages/1
  • It's a second persistence layer that exists solely because the primary path lacks atomicity

2. Errors are invisible to users

When message processing fails, errors go to:

  • Elixir Logger (server logs only)
  • Sentry (ops team only)
  • Email notifications (throttled to once per KAFKA_NOTIFICATION_EMBARGO_SECONDS, default 1 hour)

Nothing surfaces in the Lightning UI. There's no failed work order, no error state on the trigger, no history entry. Compare this with webhooks, where a failed persistence would return an HTTP error and the caller would retry.

3. No practical replay

Because failed messages don't produce work orders, there's nothing in the database to replay. The alternate storage recovery mechanism is a manual, CLI-only process that re-calls Pipeline.handle_message/3 directly. Users can't initiate replay from the UI, and the standard "rerun from start" feature doesn't help because there was never a run to rerun.

4. Duplicate risk after long disconnections

Deduplication relies on the trigger_kafka_message_records table, which is cleaned up every 10 minutes (retaining records for KAFKA_DUPLICATE_TRACKING_RETENTION_SECONDS, default 1 hour). If a consumer group is disconnected long enough for both:

  • The Kafka cluster to expire the consumer group's committed offsets, AND
  • The dedup records to be cleaned up

...then redelivered messages will be processed as new, creating duplicate work orders.

5. Instance-level tuning only

Performance settings (number_of_consumers, number_of_processors, number_of_messages_per_second) are configured via environment variables and apply identically to every Kafka trigger on the instance. In multi-tenant deployments, different users need different settings to match their Kafka cluster configurations and throughput requirements.

6. Ordering vs. concurrency tradeoff is opaque

Kafka guarantees ordering within a partition by message key. Broadway preserves this ordering during run creation (with the default single-processor config), but the downstream run execution pipeline does not: multiple workers claim runs via FOR UPDATE SKIP LOCKED with no per-trigger or per-partition serialization. Users must set workflow concurrency to 1 to preserve execution sequence, but this isn't documented or enforced — it's a silent correctness trap.

Root Cause

The core issue is that BroadwayKafka commits offsets for all messages that complete the pipeline, regardless of success or failure. The consumer calls WorkOrders.create_for/3 inside handle_message/3, but whether that call succeeds or fails, Broadway considers the message handled and commits the offset.

This means there is a window between "Kafka thinks the message was consumed" and "Lightning actually persisted a work order" where data can be lost. Every piece of compensating infrastructure in the current implementation — alternate file storage, manual recovery, dedup tracking, failure emails — exists to paper over this gap. None of them fully close it.

Proposed Solution: Manual Offset Control

Replace Broadway's automatic offset commits with manual offset management. Only commit an offset after the corresponding work order has been successfully persisted to the database. If persistence fails, the offset is not committed, and Kafka redelivers the message.

Approach: Replace Broadway with direct :brod usage

Drop Broadway and use :brod's brod_group_subscriber_v2 directly. This gives full control over offset commits, consumer group management, and message processing flow.

brod_group_subscriber_v2 spawns one worker process per assigned partition. Each worker receives messages via a handle_message/2 callback and controls offset commits through its return value:

  1. Worker receives a message for its assigned partition
  2. Attempts work order creation
  3. On success: returns {:ok, :ack, state} — the group coordinator batches the offset and commits it on the next offset_commit_interval_seconds tick
  4. On failure: returns {:ok, state} (no ack) and retries with backoff

Note: offsets are not committed directly via :brod.commit_offsets/2. Instead, the :ack atom in the return tuple signals the group coordinator to include that offset in its next periodic commit. This is a cleaner model — the coordinator batches acks and commits them efficiently, while the worker only decides "did this message succeed?"

This eliminates the impedance mismatch between Broadway's "message pipeline" model and our need for transactional offset control.

Backup option: Crash on failure

If the :brod migration proves too costly, an interim alternative is to keep Broadway but let the consumer process crash when work order creation fails. Broadway restarts the process, BroadwayKafka re-joins the consumer group, and Kafka redelivers from the last committed offset. This is crude but atomic — offsets are never committed for messages that weren't persisted.

Downsides: noisy restarts, potential thundering herd on transient DB issues, and all in-flight messages for that consumer are redelivered (not just the failed one).

How Each Sharp Edge Resolves

Gating offset commits on successful persistence resolves the sharp edges:

Current Problem Resolution
Silent message loss Offset is only committed after the work order is persisted. If persistence fails, the offset stays uncommitted and Kafka redelivers. No data loss window.
Errors invisible to users The consumer retries until the work order is created. Transient errors (DB blip) self-heal. Persistent errors (hard run limit) cause the consumer to stall on that partition with backoff, which is observable via a health/status indicator. The problem shifts from "we lost your data and didn't tell you" to "ingestion is paused and here's why."
No practical replay Every successfully consumed message has a work order in the database. Standard replay/rerun features work because there's always a run to rerun.
Duplicate risk Deduplication is still needed for at-least-once delivery (Kafka's standard guarantee). The existing topic_partition_offset idempotency check stays, but the duplicate tracking retention window becomes less critical because the primary failure mode (lost messages that get replayed much later) is eliminated.
Instance-level tuning only Resolved by the :brod switch. Broadway's number_of_consumers and number_of_processors ENV vars become obsolete — brod_group_subscriber_v2 automatically spawns one worker per partition. Fetch tuning (prefetch_count, prefetch_bytes) and rate limiting move to per-trigger kafka_configuration fields, passed directly to each trigger's :brod client/subscriber config. See Per-Trigger Tuning section below.
Ordering vs. concurrency Separate concern — not caused by the offset commit issue. Can be addressed independently via documentation and/or workflow concurrency enforcement for Kafka triggers.

What Can Be Removed

Once offset commits are gated on successful persistence, the following compensating infrastructure becomes unnecessary:

  • MessageRecovery module and alternate file storage (KAFKA_ALTERNATE_STORAGE_* config) — messages that fail to persist are retried by Kafka, not written to disk for manual recovery
  • Kafka-specific user failure email notifications and the KAFKA_NOTIFICATION_EMBARGO_SECONDS throttle — replaced by a health/status indicator on the trigger
  • Several instance-level ENV variables (KAFKA_ALTERNATE_STORAGE_ENABLED, KAFKA_ALTERNATE_STORAGE_FILE_PATH, KAFKA_NOTIFICATION_EMBARGO_SECONDS)

The dedup table (trigger_kafka_message_records) and its cleanup worker should be retained — they serve a different purpose (preventing duplicates from at-least-once delivery) that isn't solved by offset control alone.

What Stays the Same

The consumer still:

  • Runs as a supervised OTP process per enabled Kafka trigger
  • Manages connection, authentication, and consumer group membership
  • Deserializes message payloads and validates they're JSON objects
  • Calls WorkOrders.create_for/3 with type: :kafka and Kafka-specific request metadata
  • Applies rate limiting / backpressure

Resource model change: Today, each trigger runs one Broadway pipeline. With brod_group_subscriber_v2, each trigger runs one subscriber that spawns one worker process per assigned partition (plus a coordinator process). If a trigger subscribes to a topic with 12 partitions, that trigger has 12 worker processes and 1 coordinator — not 1 process. This is actually better: a stalled partition blocks only its own worker, not the entire trigger's consumption.

Supervisor tree change: BroadwayKafka manages :brod clients internally. With direct :brod usage, each trigger needs an explicit :brod client started via :brod.start_client/3 (with that trigger's brokers, SSL, and SASL config) before brod_group_subscriber_v2.start_link/1 can reference it. The supervisor tree for each trigger should start the client first, then the subscriber — and tear down both when the trigger is disabled. A per-trigger supervisor (:rest_for_one) is a natural fit:

PipelineSupervisor (one_for_one, dynamic children)
  ├── TriggerSupervisor for trigger A (rest_for_one)
  │     ├── :brod client (named :"client-<trigger_id>")
  │     └── brod_group_subscriber_v2 (references client above)
  ├── TriggerSupervisor for trigger B (rest_for_one)
  │     ├── :brod client
  │     └── brod_group_subscriber_v2
  └── ...

The trigger configuration UI, health monitoring, and per-trigger config schema are unchanged.

Implementation Considerations

  • Backpressure on persistent failure: If work order creation is blocked (e.g., hard run limit), the consumer will retry indefinitely on that partition. Need a backoff strategy that doesn't overwhelm the DB but also doesn't silently stop consuming. A health/status indicator should surface this state to users.
  • Partial partition stalls: With manual offset control, a single failing message can block consumption of an entire partition. Need to decide: retry forever (preserves ordering, risks stalling), or skip after N retries and log/alert (loses ordering guarantee but unblocks the partition).
  • Metadata passthrough: Kafka headers, topic, partition, offset, key, and timestamp should continue to be passed through as dataclip.request metadata so they're available for user inspection.
  • Graceful migration: The consumer group IDs (lightning-<uuid>) must be preserved when switching from Broadway to :brod so Kafka resumes from the last committed offset rather than replaying from initial_offset_reset_policy. The group_id is already stored in kafka_configuration in the database — pass the same string to brod_group_subscriber_v2's config.
  • :brod client lifecycle: Each trigger needs a :brod client started via :brod.start_client/3 before its subscriber can reference it. The per-trigger supervisor tree (:rest_for_one with client then subscriber as children) handles startup ordering and teardown. See the supervisor tree diagram in "What Stays the Same" above.

Achieving O(n) Complexity

The offset commit fix above eliminates the largest source of Kafka-specific complexity — the compensating infrastructure for lost messages. But fixing the atomicity gap alone isn't enough; three areas need explicit treatment to ensure the integration stays O(n) as the platform grows.

Principle: The Consumer is a Thin Ingestion Layer

After the offset commit fix, the Kafka consumer's job is:

  1. Connect to Kafka, authenticate, manage consumer group
  2. Receive a message, deserialize it, check for duplicates
  3. Call WorkOrders.create_for/3 — the same function webhooks use
  4. On success: return {:ok, :ack, state}. On failure: return {:ok, state} and retry.

From step 3 onward, there is no Kafka-specific code path. The work order, its runs, steps, dataclips, history, replay, UI, rate limiting, sandboxes, lost runs handling, and worker execution all follow the identical path as webhook- triggered work orders. The only differences are metadata: the trigger has type: :kafka, the dataclip has type: :kafka with Kafka-specific request fields (topic, partition, offset, key, headers).

This means the answer to "does feature X also work for Kafka?" is always yes for any feature that operates on work orders, runs, or downstream. The only features that need Kafka awareness are those in the consumer layer itself.

Bounded Kafka-specific surface

Layer Kafka-specific? Scope
Consumer connection, auth, consumer groups Yes Lightning.KafkaTriggers only
Offset commit management Yes Lightning.KafkaTriggers only
Message dedup (trigger_kafka_message_records) Yes Lightning.KafkaTriggers only
Trigger configuration UI (broker, topic, etc.) Yes Trigger form component only
Trigger health/status indicator Yes Trigger status component only
Work order creation No — uses WorkOrders.create_for/3 Common path
Runs, steps, dataclips No Common path
History, replay, rerun No Common path
Worker execution No Common path
Rate limiting No Common path
Sandboxes No Common path
Lost runs handling No Common path
Bulk operations / cursors No Common path

If a future feature operates on work orders or runs, it works for Kafka automatically. If a future feature operates on the consumer layer, it's scoped to Lightning.KafkaTriggers — a single module boundary. This is O(n), not O(n²).

1. Message ordering: 🌟 Not our problem to solve at the platform level 🌟

Kafka guarantees ordering within a partition by key. Webhooks make no ordering guarantee. The question is whether the platform needs to enforce or preserve Kafka's ordering semantics downstream.

Answer: No. Ordering is a property of the source, not the platform. Once a Kafka message becomes a work order, it enters the same queue as every other work order. If a user needs ordered processing, they use workflow concurrency = 1 — the same mechanism they'd use if they needed ordered processing of webhooks arriving in rapid succession.

This is already how it works today. The consumer creates work orders; the worker processes them respecting workflow concurrency settings. No Kafka-specific branching is needed. What is needed:

  • Documentation: Explain that Kafka partition ordering is preserved into work order creation order, but parallel worker execution may reorder runs. Users who need strict ordering should set workflow concurrency to 1.
  • No enforcement: We do not auto-set concurrency for Kafka triggers. The user makes this choice based on whether their use case requires ordering (stateful event streams) or not (independent records).

This keeps ordering out of the platform's conditional logic entirely. It's a user configuration choice, same as for any trigger type.

2. Blocked persistence: Pause and surface, don't invent new failure modes

When work order creation is blocked (hard run limit, DB down), the consumer cannot commit the offset, so it retries. The question is what the retry/backoff strategy looks like and whether it creates Kafka-specific operational complexity.

Strategy: Exponential backoff with a cap, surfaced via existing trigger status.

  • On failed persistence, wait 1s, 2s, 4s, ... up to a cap of 60s, then retry at 60s intervals.
  • The consumer's partition assignment stays active — brod_group_coordinator runs in a separate process from the worker and continues heartbeating independently, even while the worker is blocked in a retry loop.
  • Messages queued behind the blocked one on the same partition wait — this preserves ordering and is standard Kafka consumer behavior. Other partitions are unaffected (each has its own worker process).
  • The trigger's existing enabled / error status field (already used for connection failures) reflects the stall state. No new status model needed.

Caveat: broker-side max.poll.interval.ms. While :brod's coordinator heartbeats independently (so session.timeout.ms isn't a concern), some Kafka deployments configure max.poll.interval.ms on the broker. If the worker doesn't fetch new messages within this interval, the broker kicks the consumer from the group. The default is 5 minutes, well above our 60s backoff cap, but deployments with aggressive settings could trigger a rebalance during extended stalls. This should be documented as a known interaction.

Why this isn't O(n²): The backoff/retry lives entirely inside the consumer (Lightning.KafkaTriggers). No downstream feature needs to know about it. The trigger status indicator already exists for connection errors — stalled persistence is just another error state in the same field. Monitoring, dashboards, and admin UIs that already read trigger status get this for free.

Escape hatch: If a partition is stalled for longer than a configurable threshold (default: 1 hour), log a warning and emit a Sentry alert. Do not skip the message — skipping would reintroduce the "lost message" problem this spec exists to fix. The correct resolution for a persistent stall is operational: increase the run limit, fix the DB, or disable the trigger.

3. Bulk operations and high-volume throughput

If a Kafka topic produces thousands of messages per second, does the 1:1 message-to-work-order model hold?

Answer: Yes, because it's the same model webhooks use. A webhook endpoint receiving 1,000 requests/second creates 1,000 work orders. A Kafka consumer receiving 1,000 messages/second creates 1,000 work orders via the same function. The throughput bottleneck is WorkOrders.create_for/3 and the database, not the trigger type.

If we ever need batch work order creation for throughput, it would be a common- path optimization (batch WorkOrders.create_for/3 calls) that benefits all trigger types equally. The Kafka consumer would call a batch variant the same way a hypothetical bulk webhook endpoint would. No Kafka-specific branching.

Cursors are a user-space concern — a job that processes a batch and tracks its position via collections or state. This works identically regardless of whether the triggering work order came from Kafka or a webhook.

4. Per-trigger tuning: No longer instance-level

The current Broadway-based implementation takes three ENV vars — KAFKA_NUMBER_OF_CONSUMERS, KAFKA_NUMBER_OF_PROCESSORS, KAFKA_NUMBER_OF_MESSAGES_PER_SECOND — and applies them identically to every Kafka trigger on the instance. The switch to :brod eliminates two of these entirely and makes the third per-trigger.

What the Broadway ENV vars mapped to, and why they go away

Broadway ENV var What it controlled :brod equivalent
KAFKA_NUMBER_OF_CONSUMERS Number of concurrent BroadwayKafka consumer connections per pipeline Gone. brod_group_subscriber_v2 automatically spawns one worker per assigned partition — parallelism matches partition count, which is the correct model. No knob needed.
KAFKA_NUMBER_OF_PROCESSORS Concurrent message processing stages in the Broadway pipeline Gone. Each worker processes messages sequentially in its handle_message/2 callback. This is what we want — sequential processing per partition preserves ordering and ensures we don't ack before persistence completes.
KAFKA_NUMBER_OF_MESSAGES_PER_SECOND Broadway rate limiter (converted to floor(per_second * 10) messages per 10s window) Moves to per-trigger config as a rate_limit field on kafka_configuration. Implemented in the worker's handle_message/2 callback (e.g., token bucket or simple sleep).

What :brod exposes for per-trigger tuning

Each brod_group_subscriber_v2 instance accepts its own consumer_config and group_config maps at startup. Since each trigger starts its own subscriber, these are naturally per-trigger — no global state, no shared config.

consumer_config (per-partition fetch behavior, from brod_consumer):

Parameter Default What it controls
prefetch_count 10 Max unacked messages before the consumer pauses fetching
prefetch_bytes 102,400 (100KB) Max unacked bytes before fetch pauses (both count AND bytes must be exceeded)
max_bytes 1,048,576 (1MB) Max payload per fetch request to the broker
min_bytes 0 Minimum bytes broker accumulates before responding
max_wait_time 10,000ms How long broker waits to fill min_bytes
sleep_timeout 1,000ms Consumer sleep when broker returns empty

group_config (consumer group coordination, from brod_group_coordinator):

Parameter Default What it controls
session_timeout_seconds 30 How long before broker considers consumer dead if no heartbeat
heartbeat_rate_seconds 5 Heartbeat interval
offset_commit_interval_seconds 5 How often acked offsets are committed to Kafka
max_rejoin_attempts 5 Max attempts to rejoin after rebalance
rejoin_delay_seconds 1 Delay between rejoin attempts

Which parameters to expose in kafka_configuration

Not all of these should be user-facing. Most users don't need to tune brod_group_coordinator session timeouts. The parameters worth exposing in the trigger's kafka_configuration schema are those that affect throughput and resource usage:

Field Type Default Why expose it
prefetch_count integer 10 Controls how aggressively the consumer fetches ahead. Higher = more throughput, more memory. Users with high-volume topics may want 100+.
prefetch_bytes integer 102,400 Pairs with prefetch_count. Users with large messages (e.g., 1MB payloads) need to increase this.
max_bytes_per_fetch integer 1,048,576 Cap on fetch response size. Users with very large messages may need to increase.
rate_limit_per_second integer nil (unlimited) Replaces the global KAFKA_NUMBER_OF_MESSAGES_PER_SECOND. Users who want to throttle ingestion set this per-trigger.

The remaining parameters (session_timeout_seconds, heartbeat_rate_seconds, offset_commit_interval_seconds, etc.) stay at :brod defaults. They can be exposed later if operational experience shows a need, but they're coordinator-level settings that rarely need per-trigger tuning.

Migration path

  1. Add the new fields to KafkaConfiguration (embedded schema) with defaults matching current behavior.
  2. Pass them into the consumer_config map when calling brod_group_subscriber_v2.start_link/1.
  3. Remove the Broadway ENV vars (KAFKA_NUMBER_OF_CONSUMERS, KAFKA_NUMBER_OF_PROCESSORS, KAFKA_NUMBER_OF_MESSAGES_PER_SECOND) and the Lightning.Config functions that read them.
  4. Existing triggers get the defaults automatically — no data migration needed since KafkaConfiguration is an embedded JSON schema with default values.

Success Criteria

  • A Kafka message that fails to persist as a work order is never lost — the offset is not committed, and Kafka redelivers.
  • Kafka-originated work orders are created via WorkOrders.create_for/3, the same function webhooks use. They are distinguishable by their trigger association (the work order's trigger has type: :kafka), dataclip.type (:kafka), and Kafka-specific request metadata — but all downstream behavior (runs, replay, history, UI) is identical.
  • The compensating infrastructure (alternate storage, manual recovery, failure emails) is removed.
  • The consumer's only responsibilities are: connect to Kafka, receive messages, create work orders, commit offsets on success.
  • Kafka-specific code is confined to Lightning.KafkaTriggers and trigger UI components. All features operating on work orders, runs, or downstream work identically for Kafka and webhook triggers.
  • Adding a new platform feature (e.g., bulk operations, new history views, sandboxes) requires zero Kafka-specific branching.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    Product Backlog

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions