Unified fiber architecture: durable execution baked into Agent#1256
Unified fiber architecture: durable execution baked into Agent#1256threepointone merged 8 commits intomainfrom
Conversation
Implement durable runFiber support: add Agent.runFiber, stash(), and recovery plumbing (FiberContext and FiberRecoveryContext), plus internal tracking and recovery logic (_checkRunFibers, _onFiberRecovered). Bump schema version to 3 and add cf_agents_runs table to persist active fibers and snapshots. Add test agents and suites (unit tests and e2e) to exercise checkpointing, eviction recovery, concurrency, and keep-alive behavior; update wrangler manifests to register new agents. Also import FiberRecoveryContext where needed in ai-chat types.
Introduce fiber-based durable chat recovery and simplify resumable stream handling. Remove stale-stream deletion and the preserveStaleStreams option from ResumableStream (constructor now only takes sql). Add durable streaming flag, ChatRecoveryContext/ChatRecoveryOptions types, and fiber integration: chat turns can run inside fibers, a CHAT_FIBER_NAME constant, _handleInternalFiberRecovery hook, onChatRecovery hook, _durableChatContinue, and helpers to reconstruct partial stream text. Update AIChatAgent to initialize ResumableStream with the new signature and wrap chat turns in runFiber when durable streaming is enabled. Update tests to expect restored stale streams (lifecycle handled by fibers), add a fiber-based recovery test, and add test helpers to insert/trigger interrupted fibers.
Delete the experimental "forever" mixin and migrate code/examples/tests to the runFiber/_durableStreaming APIs. Removed packages/agents/src/experimental/forever.ts and related exports; updated package.json and build script to no longer build or export the experimental forever module. Adapted examples: forever-chat now enables durable streaming via AIChatAgent._durableStreaming = true and updated imports/types; forever-fibers now uses Agent.runFiber, ctx.stash, and _onFiberRecovered (rewrote server logic and simplified the UI to remove fiberId/eviction demo controls). Cleaned up e2e tests and worker fixtures to remove the old fiber mixin tests and DurableObject bindings related to the removed mixin. Overall: remove deprecated experimental API surface and update callers to the newer durable fiber/streaming patterns.
Remove the "cancelled" research state and its UI/handling in the experimental forever-fibers app and server to simplify recovery flow. Remove the "research:cancelled" ProgressMessage variant and corresponding UI branch. Ensure ResearchAgent marks an active fiber by setting activeFiberId during snapshot recovery. Update agent docs to refer to `_handleInternalFiberRecovery`. In tests, include `lastBody` and `lastClientTools` in DurableChatTestAgent's persisted recovery data so recovery tests capture recent message artifacts.
Consolidate durable fiber behavior into the core Agent/AIChatAgent APIs and update code/docs/tests accordingly. Key changes: - Rename recovery hook: _onFiberRecovered -> onFiberRecovered (Agent, Think, tests, examples). - Rename durable chat flag: _durableStreaming -> durableStreaming and use it where chat turns are wrapped in fibers. - Built-in durable fiber support on Agent (runFiber, stash, keepAlive); Think now extends Agent and inherits fiber behavior. - Remove experimental mixin exports (breaking change to experimental APIs): withFibers / withDurableChat and ./experimental/forever export removed (noted in changeset). - Update experimental examples (forever, forever-fibers, forever-chat) and docs/READMEs to reflect new API names and behavior. - Add AI chat e2e test suite for chat recovery (tests, worker, wrangler.jsonc, vitest config) and wire up test:e2e to run via Vitest config. - Update various tests and package code to use the new hooks/flags. This change unifies fiber recovery semantics, enables durable chat streaming/recovery, and adds end-to-end coverage for chat recovery after process eviction.
🦋 Changeset detectedLatest commit: 18c69ad The changes in this PR will be included in the next version bump. This PR includes changesets to release 3 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
agents
@cloudflare/ai-chat
@cloudflare/codemode
hono-agents
@cloudflare/shell
@cloudflare/think
@cloudflare/voice
@cloudflare/worker-bundler
commit: |
Add support for resuming interrupted assistant streams by merging continued output into existing message parts instead of creating new blocks. For Workers AI, schedule an idempotent _continueWorkersAI task that marks recovery and calls continueLastTurn(), and add a RECOVERY_SUFFIX to hint the model not to re-reason. Prevent double-persist/complete by only persisting/completing when the resumable stream is still active. Implement merging logic to suppress text-start/reasoning-start events when resuming, and reuse an interrupted streaming text part when present. Update OpenAI options (reasoningEffort -> "high"). Add tests covering text and reasoning merge behavior and update the test worker to optionally include reasoning in responses.
Wrap chat continuations in fibers when durableStreaming is enabled and add cleanup for abandoned streaming rows. ResumableStream.restore now restores streams regardless of age and defers stale-removal to _maybeCleanupOldStreams; it also deletes orphaned cf_ai_chat_stream_chunks and cf_ai_chat_stream_metadata rows for 'streaming' streams older than the cutoff. AIChatAgent now runs continuation logic inside a named fiber (using runFiber) when durableStreaming is true. Tests updated/added: durable continuation fiber wrapping, recovery-from-interrupted-continuation via fiber recovery, convergence/no infinite recursion of recovery, and cleanup of abandoned streaming rows after 24 hours. Added getActiveFibers test helper to DurableChatTestAgent to inspect running fibers.
Introduce a DurableChatTestStub interface and a getTestAgent helper in continue-last-turn.test.ts to centralize typing for the test agent returned by getAgentByName. Replace repeated direct calls to getAgentByName(env.DurableChatTestAgent, room) with getTestAgent(room) across the tests to improve type safety and reduce duplication.
| ); | ||
| } | ||
|
|
||
| this.sql`DELETE FROM cf_agents_runs WHERE id = ${row.id}`; |
There was a problem hiding this comment.
🔴 Fiber row unconditionally deleted after failed recovery, causing permanent loss of checkpoint data
In _checkRunFibers(), the DELETE FROM cf_agents_runs statement at line 3040 executes unconditionally after the recovery try/catch block. If onFiberRecovered (or _handleInternalFiberRecovery) throws due to a transient error (e.g., network timeout, rate limit during re-invocation), the fiber's snapshot data — which the developer explicitly checkpointed via ctx.stash() for recovery purposes — is permanently destroyed. There is no mechanism to retry recovery on the next alarm cycle.
Code flow showing unconditional deletion
try {
const handled = await this._handleInternalFiberRecovery(ctx);
if (!handled) {
await this.onFiberRecovered(ctx);
}
} catch (e) {
console.error(...);
}
// Row deleted regardless of whether recovery succeeded or failed:
this.sql`DELETE FROM cf_agents_runs WHERE id = ${row.id}`;
The old withFibers mixin preserved rows on recovery failure (status stayed 'interrupted') and would retry on the next alarm cycle. The new code silently deletes the row, violating the implicit contract that stashed data survives until the developer successfully handles it.
Prompt for agents
In _checkRunFibers() in packages/agents/src/index.ts, the DELETE statement at line 3040 runs unconditionally, even if the recovery hook threw an error. This means a transient failure in onFiberRecovered permanently destroys the fiber's snapshot data.
The fix should only delete the row when recovery succeeds (either _handleInternalFiberRecovery returned true, or onFiberRecovered completed without throwing). When recovery throws, the row should be preserved so the next alarm cycle retries recovery.
To prevent infinite retry loops for permanent errors, consider adding a retry limit: either add a retry_count column to cf_agents_runs, or track retry attempts in memory (accepting that the count resets on eviction, which is acceptable since eviction is the scenario that creates orphaned fibers in the first place).
Simplest fix: move the DELETE inside the try block, after the successful hook call. On error, log the warning and leave the row for the next alarm.
Was this helpful? React with 👍 or 👎 to provide feedback.
What this unlocks
Every agent can now survive eviction.
runFiber()on the baseAgentclass gives any agent — chat, orchestrator, background worker — a single primitive for durable execution that persists through DO restarts, code deploys, and alarm timeouts. No mixins, no opt-in class hierarchy changes, no experimental imports.For chat agents, this means LLM streams that survive eviction out of the box. Set
durableStreaming = true, and each chat turn is automatically wrapped in a fiber. If the DO dies mid-stream, the partial response is persisted,onChatRecoveryfires on restart, andcontinueLastTurn()seamlessly appends to the interrupted message. The user sees the response just keep growing — no error, no reload, no lost context.The recovery is provider-aware. Workers AI gets inline continuation. OpenAI gets server-side response retrieval (zero wasted tokens via
store: true). Anthropic gets synthetic user message continuation. All three strategies are demonstrated inforever-chat/.Summary
Agent.runFiber(name, fn)— durable execution primitive on the base class. Row incf_agents_runs,keepAliveref,ctx.stash()for checkpointing,onFiberRecoveredhook for recovery.this.stash()— convenience checkpoint via AsyncLocalStorage. Works correctly with concurrent fibers.AIChatAgent.durableStreaming— wraps chat turns in fibers. EnablesonChatRecovery+continueLastTurn().withFibersmixin (agents/experimental/forever) — 610 lines removedwithDurableChatmixin (@cloudflare/ai-chat/experimental/forever) — 233 lines removedCommit walkthrough
1.
Add durable runFiber framework and tests— the core primitiveAdds
runFiber(),stash(),onFiberRecovered,_handleInternalFiberRecovery, and_checkRunFibersto theAgentbase class. Newcf_agents_runstable (4 columns — minimal by design). Schema version bumped to 3.16 unit tests covering: basic execution, row cleanup, keepAlive refs,
ctx.stash(),this.stash(), stash outside fiber (throws), recovery detection, recovery with snapshot, recovery cleanup, concurrent fibers, error propagation.E2E test: start wrangler → spawn fiber → SIGKILL → restart → verify automatic recovery via persisted alarm.
2.
Support fiber-based durable chat recovery— chat integrationWraps each AIChatAgent chat turn in
runFiber("__cf_internal_chat_turn:{requestId}", ...)whendurableStreamingis enabled. Adds_handleInternalFiberRecoveryoverride to intercept chat fibers before the user'sonFiberRecovered. Maps toonChatRecovery(ctx)which returns{ persist?, continue? }.Adds
continueLastTurn()— re-callsonChatMessagewith saved_lastBody/_lastClientTools, streams as a continuation (appended to existing assistant message, no new user message).Simplifies
ResumableStream— removes lifecycle tracking (_preserveStaleStreams,STREAM_STALE_THRESHOLD_MS). Stream lifecycle is now managed by the fiber system, not the stream class.3.
Remove experimental forever mixin, migrate to runFiber— the big deleteDeletes
withFibers(610 lines) andwithDurableChat(233 lines). Removes./experimental/foreverexports from both packages. Removes build entries. Updates Think to extend Agent directly (removesfibersflag andcheckFibers()call). Rewrites both examples (forever-chat,forever-fibers). Rewrites all fiber tests to use the new API.4.
Remove cancelled research UI, add recovery state— edge cases + docsFixes: stale JSDoc on
onFiberRecovered, wrong method name in JSDoc (_handleFiberRecovery→_handleInternalFiberRecovery), missingsetStatein forever-fibers recovery path, deadresearch:cancelledtype. Adds ALS-basedthis.stash()(replaces fragile_runFiberLastIdtracking). Rewritesforever.mdanddesign/think.mdfrom scratch.5.
Unify fiber API; add durable chat e2e tests— public API polishRenames
_onFiberRecovered→onFiberRecovered(public API). Renames_durableStreaming→durableStreaming(public property). Creates changeset. Adds chat recovery E2E test: start wrangler → send chat via WebSocket → SIGKILL mid-stream → restart → verifyonChatRecoveryfired automatically.Test coverage
Breaking changes (experimental only)
agents/experimental/foreverexport removed —withFibers,spawnFiber,stashFiber,cancelFiber,getFiber,onFiberComplete,onFibersRecoveredno longer exist@cloudflare/ai-chat/experimental/foreverexport removed —withDurableChat,checkInterruptedStream,getPartialStreamTextno longer exist@cloudflare/think—fibersproperty removed,FiberState/FiberCompleteContext/FiberMethodstype exports removedcf_agents_fiberstable is no longer created (replaced bycf_agents_runs)Test plan
npm run check— all 72 projects typecheck, oxlint clean, oxfmt cleannpm run test— all unit tests pass (agents, ai-chat, think, + 5 other packages)packages/agents/src/e2e-tests/— fiber eviction recovery via real kill/restartpackages/ai-chat/src/e2e-tests/— chat recovery via real kill/restartexperimental/forever-chat— typechecks cleanlyexperimental/forever-fibers— typechecks cleanlyforever-chatwith Workers AI, send a message, kill wrangler, restart, verify continuationMade with Cursor