Skip to content

Unified fiber architecture: durable execution baked into Agent#1256

Merged
threepointone merged 8 commits intomainfrom
fiber-refactor
Apr 4, 2026
Merged

Unified fiber architecture: durable execution baked into Agent#1256
threepointone merged 8 commits intomainfrom
fiber-refactor

Conversation

@threepointone
Copy link
Copy Markdown
Contributor

@threepointone threepointone commented Apr 4, 2026

What this unlocks

Every agent can now survive eviction. runFiber() on the base Agent class gives any agent — chat, orchestrator, background worker — a single primitive for durable execution that persists through DO restarts, code deploys, and alarm timeouts. No mixins, no opt-in class hierarchy changes, no experimental imports.

For chat agents, this means LLM streams that survive eviction out of the box. Set durableStreaming = true, and each chat turn is automatically wrapped in a fiber. If the DO dies mid-stream, the partial response is persisted, onChatRecovery fires on restart, and continueLastTurn() seamlessly appends to the interrupted message. The user sees the response just keep growing — no error, no reload, no lost context.

The recovery is provider-aware. Workers AI gets inline continuation. OpenAI gets server-side response retrieval (zero wasted tokens via store: true). Anthropic gets synthetic user message continuation. All three strategies are demonstrated in forever-chat/.


Summary

  • Agent.runFiber(name, fn) — durable execution primitive on the base class. Row in cf_agents_runs, keepAlive ref, ctx.stash() for checkpointing, onFiberRecovered hook for recovery.
  • this.stash() — convenience checkpoint via AsyncLocalStorage. Works correctly with concurrent fibers.
  • AIChatAgent.durableStreaming — wraps chat turns in fibers. Enables onChatRecovery + continueLastTurn().
  • Deleted withFibers mixin (agents/experimental/forever) — 610 lines removed
  • Deleted withDurableChat mixin (@cloudflare/ai-chat/experimental/forever) — 233 lines removed
  • Think extends Agent directly — no mixin wrapper, fiber support inherited
  • Net: −1,321 lines (2,181 added, 3,502 removed)

Commit walkthrough

1. Add durable runFiber framework and tests — the core primitive

Adds runFiber(), stash(), onFiberRecovered, _handleInternalFiberRecovery, and _checkRunFibers to the Agent base class. New cf_agents_runs table (4 columns — minimal by design). Schema version bumped to 3.

16 unit tests covering: basic execution, row cleanup, keepAlive refs, ctx.stash(), this.stash(), stash outside fiber (throws), recovery detection, recovery with snapshot, recovery cleanup, concurrent fibers, error propagation.

E2E test: start wrangler → spawn fiber → SIGKILL → restart → verify automatic recovery via persisted alarm.

Reviewer note: The cf_agents_runs schema is deliberately minimal (id, name, snapshot, created_at). The old cf_agents_fibers had 12 columns — status, retry_count, max_retries, result, error, timestamps. In practice, none of that was needed. Fibers either complete (row deleted) or get interrupted (recovery hook fires). Status tracking and retry logic belong in the developer's hook, not framework-managed columns.

2. Support fiber-based durable chat recovery — chat integration

Wraps each AIChatAgent chat turn in runFiber("__cf_internal_chat_turn:{requestId}", ...) when durableStreaming is enabled. Adds _handleInternalFiberRecovery override to intercept chat fibers before the user's onFiberRecovered. Maps to onChatRecovery(ctx) which returns { persist?, continue? }.

Adds continueLastTurn() — re-calls onChatMessage with saved _lastBody/_lastClientTools, streams as a continuation (appended to existing assistant message, no new user message).

Simplifies ResumableStream — removes lifecycle tracking (_preserveStaleStreams, STREAM_STALE_THRESHOLD_MS). Stream lifecycle is now managed by the fiber system, not the stream class.

Reviewer note: The requestId is encoded in the fiber name (__cf_internal_chat_turn:{requestId}), NOT in the snapshot. This is intentional — the snapshot is the user's domain. If someone calls this.stash({ responseId }) inside onChatMessage, it won't overwrite framework data. On recovery, recoveryData in ChatRecoveryContext contains whatever the user stashed.

3. Remove experimental forever mixin, migrate to runFiber — the big delete

Deletes withFibers (610 lines) and withDurableChat (233 lines). Removes ./experimental/forever exports from both packages. Removes build entries. Updates Think to extend Agent directly (removes fibers flag and checkFibers() call). Rewrites both examples (forever-chat, forever-fibers). Rewrites all fiber tests to use the new API.

Reviewer note: Think's fibers = false flag is gone. Recovery is automatic via _onAlarmHousekeeping_checkRunFibers() in the base class. No opt-in needed — if a fiber row exists in cf_agents_runs when the alarm fires, recovery runs. This is safe because rows only exist while fibers are actively running.

4. Remove cancelled research UI, add recovery state — edge cases + docs

Fixes: stale JSDoc on onFiberRecovered, wrong method name in JSDoc (_handleFiberRecovery_handleInternalFiberRecovery), missing setState in forever-fibers recovery path, dead research:cancelled type. Adds ALS-based this.stash() (replaces fragile _runFiberLastId tracking). Rewrites forever.md and design/think.md from scratch.

Reviewer note: this.stash() now uses AsyncLocalStorage to find the correct fiber, not _runFiberLastId. The old approach broke with concurrent fibers (tracked the most recently started fiber, not the currently executing one). ALS is correct in all cases. Test added: two concurrent fibers both using this.stash() — each writes to its own row.

5. Unify fiber API; add durable chat e2e tests — public API polish

Renames _onFiberRecoveredonFiberRecovered (public API). Renames _durableStreamingdurableStreaming (public property). Creates changeset. Adds chat recovery E2E test: start wrangler → send chat via WebSocket → SIGKILL mid-stream → restart → verify onChatRecovery fired automatically.

Reviewer note: The E2E test proves the full lifecycle: WebSocket chat → fiber row created → process killed → alarm persists to disk → wrangler restarts → alarm fires → _handleInternalFiberRecoveryonChatRecovery → recovery recorded. Runs in ~10 seconds.

Test coverage

Suite Files Tests
agents (unit) 63 1,119
ai-chat (unit) 40 437
think (unit) 7 115
agents fiber (e2e) 1 1
ai-chat chat recovery (e2e) 1 1
Total 112 1,673

Breaking changes (experimental only)

  • agents/experimental/forever export removed — withFibers, spawnFiber, stashFiber, cancelFiber, getFiber, onFiberComplete, onFibersRecovered no longer exist
  • @cloudflare/ai-chat/experimental/forever export removed — withDurableChat, checkInterruptedStream, getPartialStreamText no longer exist
  • @cloudflare/thinkfibers property removed, FiberState/FiberCompleteContext/FiberMethods type exports removed
  • cf_agents_fibers table is no longer created (replaced by cf_agents_runs)

Test plan

  • npm run check — all 72 projects typecheck, oxlint clean, oxfmt clean
  • npm run test — all unit tests pass (agents, ai-chat, think, + 5 other packages)
  • packages/agents/src/e2e-tests/ — fiber eviction recovery via real kill/restart
  • packages/ai-chat/src/e2e-tests/ — chat recovery via real kill/restart
  • experimental/forever-chat — typechecks cleanly
  • experimental/forever-fibers — typechecks cleanly
  • Manual test: run forever-chat with Workers AI, send a message, kill wrangler, restart, verify continuation

Made with Cursor


Open with Devin

Implement durable runFiber support: add Agent.runFiber, stash(), and recovery plumbing (FiberContext and FiberRecoveryContext), plus internal tracking and recovery logic (_checkRunFibers, _onFiberRecovered). Bump schema version to 3 and add cf_agents_runs table to persist active fibers and snapshots. Add test agents and suites (unit tests and e2e) to exercise checkpointing, eviction recovery, concurrency, and keep-alive behavior; update wrangler manifests to register new agents. Also import FiberRecoveryContext where needed in ai-chat types.
Introduce fiber-based durable chat recovery and simplify resumable stream handling. Remove stale-stream deletion and the preserveStaleStreams option from ResumableStream (constructor now only takes sql). Add durable streaming flag, ChatRecoveryContext/ChatRecoveryOptions types, and fiber integration: chat turns can run inside fibers, a CHAT_FIBER_NAME constant, _handleInternalFiberRecovery hook, onChatRecovery hook, _durableChatContinue, and helpers to reconstruct partial stream text. Update AIChatAgent to initialize ResumableStream with the new signature and wrap chat turns in runFiber when durable streaming is enabled. Update tests to expect restored stale streams (lifecycle handled by fibers), add a fiber-based recovery test, and add test helpers to insert/trigger interrupted fibers.
Delete the experimental "forever" mixin and migrate code/examples/tests to the runFiber/_durableStreaming APIs. Removed packages/agents/src/experimental/forever.ts and related exports; updated package.json and build script to no longer build or export the experimental forever module. Adapted examples: forever-chat now enables durable streaming via AIChatAgent._durableStreaming = true and updated imports/types; forever-fibers now uses Agent.runFiber, ctx.stash, and _onFiberRecovered (rewrote server logic and simplified the UI to remove fiberId/eviction demo controls). Cleaned up e2e tests and worker fixtures to remove the old fiber mixin tests and DurableObject bindings related to the removed mixin. Overall: remove deprecated experimental API surface and update callers to the newer durable fiber/streaming patterns.
Remove the "cancelled" research state and its UI/handling in the experimental forever-fibers app and server to simplify recovery flow. Remove the "research:cancelled" ProgressMessage variant and corresponding UI branch. Ensure ResearchAgent marks an active fiber by setting activeFiberId during snapshot recovery. Update agent docs to refer to `_handleInternalFiberRecovery`. In tests, include `lastBody` and `lastClientTools` in DurableChatTestAgent's persisted recovery data so recovery tests capture recent message artifacts.
Consolidate durable fiber behavior into the core Agent/AIChatAgent APIs and update code/docs/tests accordingly. Key changes:

- Rename recovery hook: _onFiberRecovered -> onFiberRecovered (Agent, Think, tests, examples).
- Rename durable chat flag: _durableStreaming -> durableStreaming and use it where chat turns are wrapped in fibers.
- Built-in durable fiber support on Agent (runFiber, stash, keepAlive); Think now extends Agent and inherits fiber behavior.
- Remove experimental mixin exports (breaking change to experimental APIs): withFibers / withDurableChat and ./experimental/forever export removed (noted in changeset).
- Update experimental examples (forever, forever-fibers, forever-chat) and docs/READMEs to reflect new API names and behavior.
- Add AI chat e2e test suite for chat recovery (tests, worker, wrangler.jsonc, vitest config) and wire up test:e2e to run via Vitest config.
- Update various tests and package code to use the new hooks/flags.

This change unifies fiber recovery semantics, enables durable chat streaming/recovery, and adds end-to-end coverage for chat recovery after process eviction.
@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Apr 4, 2026

🦋 Changeset detected

Latest commit: 18c69ad

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 3 packages
Name Type
agents Minor
@cloudflare/ai-chat Minor
@cloudflare/think Minor

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new bot commented Apr 4, 2026

Open in StackBlitz

agents

npm i https://pkg.pr.new/agents@1256

@cloudflare/ai-chat

npm i https://pkg.pr.new/@cloudflare/ai-chat@1256

@cloudflare/codemode

npm i https://pkg.pr.new/@cloudflare/codemode@1256

hono-agents

npm i https://pkg.pr.new/hono-agents@1256

@cloudflare/shell

npm i https://pkg.pr.new/@cloudflare/shell@1256

@cloudflare/think

npm i https://pkg.pr.new/@cloudflare/think@1256

@cloudflare/voice

npm i https://pkg.pr.new/@cloudflare/voice@1256

@cloudflare/worker-bundler

npm i https://pkg.pr.new/@cloudflare/worker-bundler@1256

commit: 18c69ad

Add support for resuming interrupted assistant streams by merging continued output into existing message parts instead of creating new blocks. For Workers AI, schedule an idempotent _continueWorkersAI task that marks recovery and calls continueLastTurn(), and add a RECOVERY_SUFFIX to hint the model not to re-reason. Prevent double-persist/complete by only persisting/completing when the resumable stream is still active. Implement merging logic to suppress text-start/reasoning-start events when resuming, and reuse an interrupted streaming text part when present. Update OpenAI options (reasoningEffort -> "high"). Add tests covering text and reasoning merge behavior and update the test worker to optionally include reasoning in responses.
Wrap chat continuations in fibers when durableStreaming is enabled and add cleanup for abandoned streaming rows. ResumableStream.restore now restores streams regardless of age and defers stale-removal to _maybeCleanupOldStreams; it also deletes orphaned cf_ai_chat_stream_chunks and cf_ai_chat_stream_metadata rows for 'streaming' streams older than the cutoff. AIChatAgent now runs continuation logic inside a named fiber (using runFiber) when durableStreaming is true. Tests updated/added: durable continuation fiber wrapping, recovery-from-interrupted-continuation via fiber recovery, convergence/no infinite recursion of recovery, and cleanup of abandoned streaming rows after 24 hours. Added getActiveFibers test helper to DurableChatTestAgent to inspect running fibers.
Introduce a DurableChatTestStub interface and a getTestAgent helper in continue-last-turn.test.ts to centralize typing for the test agent returned by getAgentByName. Replace repeated direct calls to getAgentByName(env.DurableChatTestAgent, room) with getTestAgent(room) across the tests to improve type safety and reduce duplication.
@threepointone threepointone merged commit dfab937 into main Apr 4, 2026
2 checks passed
@threepointone threepointone deleted the fiber-refactor branch April 4, 2026 12:07
@github-actions github-actions bot mentioned this pull request Apr 4, 2026
);
}

this.sql`DELETE FROM cf_agents_runs WHERE id = ${row.id}`;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Fiber row unconditionally deleted after failed recovery, causing permanent loss of checkpoint data

In _checkRunFibers(), the DELETE FROM cf_agents_runs statement at line 3040 executes unconditionally after the recovery try/catch block. If onFiberRecovered (or _handleInternalFiberRecovery) throws due to a transient error (e.g., network timeout, rate limit during re-invocation), the fiber's snapshot data — which the developer explicitly checkpointed via ctx.stash() for recovery purposes — is permanently destroyed. There is no mechanism to retry recovery on the next alarm cycle.

Code flow showing unconditional deletion
try {
  const handled = await this._handleInternalFiberRecovery(ctx);
  if (!handled) {
    await this.onFiberRecovered(ctx);
  }
} catch (e) {
  console.error(...);
}
// Row deleted regardless of whether recovery succeeded or failed:
this.sql`DELETE FROM cf_agents_runs WHERE id = ${row.id}`;

The old withFibers mixin preserved rows on recovery failure (status stayed 'interrupted') and would retry on the next alarm cycle. The new code silently deletes the row, violating the implicit contract that stashed data survives until the developer successfully handles it.

Prompt for agents
In _checkRunFibers() in packages/agents/src/index.ts, the DELETE statement at line 3040 runs unconditionally, even if the recovery hook threw an error. This means a transient failure in onFiberRecovered permanently destroys the fiber's snapshot data.

The fix should only delete the row when recovery succeeds (either _handleInternalFiberRecovery returned true, or onFiberRecovered completed without throwing). When recovery throws, the row should be preserved so the next alarm cycle retries recovery.

To prevent infinite retry loops for permanent errors, consider adding a retry limit: either add a retry_count column to cf_agents_runs, or track retry attempts in memory (accepting that the count resets on eviction, which is acceptable since eviction is the scenario that creates orphaned fibers in the first place).

Simplest fix: move the DELETE inside the try block, after the successful hook call. On error, log the warning and leave the row for the next alarm.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant