Skip to content

Add stream recovery and continuation to withDurableChat mixin#1254

Merged
threepointone merged 5 commits intomainfrom
even-more-fibers
Apr 3, 2026
Merged

Add stream recovery and continuation to withDurableChat mixin#1254
threepointone merged 5 commits intomainfrom
even-more-fibers

Conversation

@threepointone
Copy link
Copy Markdown
Contributor

@threepointone threepointone commented Apr 3, 2026

Summary

Adds crash-recovery and continuation support to the experimental withDurableChat mixin so that a chat agent can survive Durable Object eviction mid-stream and seamlessly resume for the end user.

What changed

packages/ai-chat/src/experimental/forever.tswithDurableChat mixin

  • onStartcheckInterruptedStream() — on every DO restart, detects an orphaned or stale stream (active but not live) and drives the recovery flow.
  • onChatRecovery(ctx) hook — overridable method called with a ChatRecoveryContext (stream ID, request ID, partial text, partial parts, full message history, last body/client tools). Return ChatRecoveryOptions to control behavior:
    • { persist: true, continue: true } (default) — save the partial response, then schedule a continuation.
    • { continue: false } — save partial only; don't continue.
    • { persist: false, continue: false } — caller handles everything (e.g. OpenAI Responses API retrieval).
  • getPartialStreamText(streamId?) — replays stored stream chunks through applyChunkToParts to reconstruct the assistant message's text and parts.
  • DurableChatMethods interface — exported for consumers that need to type the mixin's added surface.
  • DurableChatAgentClass return type — properly typed constructor so extends DurableChatBase<Env> works with generic env/state parameters.

packages/ai-chat/src/index.tsAIChatAgent core

  • continueLastTurn(body?) — triggers a new onChatMessage call using the existing conversation (no new user message). The response is streamed as a continuation: true reply, appending new parts to the last assistant message. Returns early with status: "skipped" if there's nothing to continue.
  • Promoted _lastBody, _lastClientTools, and _persistOrphanedStream from private to protected so the mixin can access them.

packages/agents/src/chat/resumable-stream.ts

  • Added preserveStaleStreams constructor option — when true, stale streams (> 5 min) are kept in the DB instead of being deleted in restore(), letting the recovery hook handle them.

Tests (new)

packages/ai-chat/src/tests/durable-chat-recovery.test.ts — 8 test cases:

  • Fires onChatRecovery for orphaned streams
  • Fires onChatRecovery for stale streams (> 5 min)
  • Persists partial response by default
  • Skips persistence when persist: false
  • Doesn't fire hook again after cleanup
  • Extracts partial text from stored chunks
  • Returns empty when no stream exists
  • Default options: persist + continue end-to-end

Uses a DurableChatTestStub interface to avoid TS2589 (excessively deep type instantiation from DurableObjectStub<DurableChatTestAgent> resolving the full AIChatAgent type tree through RPC serialization types).

packages/ai-chat/src/tests/continue-last-turn.test.ts — 5 test cases:

  • Appends to the last assistant message without creating a user message
  • Skips when there's no assistant message
  • Skips when messages are empty
  • Preserves the original assistant message ID
  • End-to-end: interrupted stream → recovery → continuation appends inline

packages/ai-chat/src/tests/worker.tsDurableChatTestAgent class using withDurableChat(AIChatAgent) with test helpers for inserting interrupted streams, overriding recovery options, and inspecting state.

Demo (experimental)

experimental/forever-chat/ — multi-provider recovery demo:

Provider Model Recovery strategy
Workers AI kimi-k2.5 Persist partial + inline continuation via continueLastTurn()
OpenAI gpt-5.4 Retrieve completed response via Responses API (store: true) — zero wasted tokens
Anthropic claude-sonnet-4.6 Persist partial + continue via synthetic user message (reasoning disabled for recovery)
  • Provider selector dropdown in the client UI
  • onChatRecovery override dispatches to provider-specific strategies
  • OpenAI: stores response ID from raw chunks, retrieves completed response on recovery
  • Anthropic: falls back to a synthetic user message since it doesn't support assistant prefill; disables extended thinking for the recovery turn
  • Renders reasoning parts (thinking blocks) in the UI
  • Hides synthetic continuation messages from the user

Test plan

  • npx tsc --noEmit -p packages/ai-chat/src/tests/tsconfig.json passes clean
  • npm run test — new durable-chat-recovery and continue-last-turn test suites pass
  • npm run check — formatting, linting, exports, typecheck all green
  • Manual: run cd experimental/forever-chat && npm start, send a message with each provider, verify streaming works
  • Manual: simulate eviction during a stream (e.g. restart wrangler mid-response), verify recovery fires and the response continues

Made with Cursor


Open with Devin

Implement interrupted-stream recovery for the experimental durable-chat mixin and wire it through client, server, agent infrastructure, and tests.

Key changes:
- Client: show a recovery banner and handle a `stream_recovered` WS message.
- Server/agent: ForeverChatAgent overrides onStreamInterrupted to persist partial responses, queue a continuation (continueAfterRecovery), and notify clients on connect; updated system prompt to instruct assistants to continue without repeating text.
- withDurableChat mixin: detect interrupted streams on start, expose StreamInterruptedContext, provide a default onStreamInterrupted (persist partial response), and add getPartialStreamText to reconstruct partial responses from stored chunks. ResumableStream is re-initialized with preserveStaleStreams to allow recovery handling.
- ResumableStream: add preserveStaleStreams option and avoid deleting stale streams when preservation is enabled.
- API visibility: make _lastBody, _lastClientTools and _persistOrphanedStream protected so mixins/tests can access them.
- Tests: add durable-chat-recovery.test.ts and DurableChatTestAgent test helpers; update wrangler test config.

Reason: allow Durable Object agents to detect and recover partially-streamed LLM responses after eviction, notify connected clients, and resume generation safely; includes tests to validate behavior.
Introduce a continuation API and more flexible recovery for durable chat.

- Add AIChatAgent.continueLastTurn to append LLM output to the last assistant message (no synthetic user message) and return status.
- Replace onStreamInterrupted/StreamInterruptedContext with onChatRecovery/ChatRecoveryContext and ChatRecoveryOptions (persist?, continue?) in withDurableChat. Default behavior persists partial streams and schedules a continuation.
- Update withDurableChat to preserve stale streams, call onChatRecovery, persist orphaned streams by default, and schedule a _durableChatContinue that calls continueLastTurn.
- Export updated DurableChatMethods signatures and new types (ChatRecoveryContext, ChatRecoveryOptions).
- Update experimental/forever example: remove client-side recovery banner and server-side manual continuation/notification logic; document zero-config recovery and refined system prompt.
- Add tests: new continue-last-turn.test.ts and updates to durable-chat-recovery.test.ts and worker test harness to reflect new hooks, options, and continuation behavior.

These changes simplify automatic recovery after eviction and provide an explicit API for continuing interrupted assistant responses inline.
Add OpenAI and Anthropic support and provider-specific recovery strategies for the experimental forever-chat example. Changes include:

- Add .env.example and update env types (env.d.ts) to include OPENAI_API_KEY and ANTHROPIC_API_KEY, plus NodeJS.ProcessEnv typing.
- Add @ai-sdk/openai and @ai-sdk/anthropic dependencies to package.json.
- Update README with provider recovery strategies and run instructions (copy .env.example to .env).
- Client: add provider dropdown, persist provider in agent state, pass provider to agent body, and refactor message rendering to handle text/reasoning/tool UI parts and synthetic user messages.
- Server: extend DurableChatAgent with AgentState, move tools and system prompt into shared constants, add multi-provider model selection, provider-specific providerOptions (OpenAI: store responses, Anthropic: disable thinking when recovering), onChatRecovery implementation (Anthropic: schedule synthetic user continuation; OpenAI: fetch the completed response via Responses API using stored response ID), includeRawChunks/onChunk for capturing OpenAI response IDs, and functions to store/retrieve response ID in sqlite.
- Wrangler config: reorder fields, set name/main, add AI binding entry and declare required secrets for OPENAI_API_KEY and ANTHROPIC_API_KEY.

Overall this enables streaming that survives DO eviction across Workers AI, OpenAI, and Anthropic by using different recovery approaches and wiring the UI and types to select providers.
@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Apr 3, 2026

⚠️ No Changeset found

Latest commit: 34a6140

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new bot commented Apr 3, 2026

Open in StackBlitz

agents

npm i https://pkg.pr.new/agents@1254

@cloudflare/ai-chat

npm i https://pkg.pr.new/@cloudflare/ai-chat@1254

@cloudflare/codemode

npm i https://pkg.pr.new/@cloudflare/codemode@1254

hono-agents

npm i https://pkg.pr.new/hono-agents@1254

@cloudflare/shell

npm i https://pkg.pr.new/@cloudflare/shell@1254

@cloudflare/think

npm i https://pkg.pr.new/@cloudflare/think@1254

@cloudflare/voice

npm i https://pkg.pr.new/@cloudflare/voice@1254

@cloudflare/worker-bundler

npm i https://pkg.pr.new/@cloudflare/worker-bundler@1254

commit: 34a6140

devin-ai-integration[bot]

This comment was marked as resolved.

The base AIChatAgent constructor creates a ResumableStream (without
preserveStaleStreams) whose restore() deletes stale streams from SQLite
before the mixin constructor can re-create it with preserveStaleStreams.

Fix by adding a protected _resumableStreamOptions() hook that the mixin
overrides via virtual dispatch — called during the base constructor,
before restore() runs.

Made-with: Cursor
Stream chunks are buffered in batches of 10 before writing to SQLite.
If the DO is evicted before the buffer flushes, _persistOrphanedStream
finds no chunks and is a no-op — silently discarding the complete
response retrieved from OpenAI's API. If a prior assistant message
exists, the old code also corrupts it by overwriting with new text.

Fix by creating the assistant message directly from the retrieved text
instead of relying on _persistOrphanedStream to establish it.

Made-with: Cursor
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 10 additional findings in Devin Review.

Open in Devin Review

*
* Returns early if there is no assistant message to continue from.
*/
protected async continueLastTurn(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Missing changeset for public API changes to published packages

The PR modifies published packages (packages/agents and packages/ai-chat) with new public/protected APIs but has no changeset in .changeset/. AGENTS.md explicitly requires: "Changes to packages/ that affect the public API or fix bugs need a changeset." The affected changes include: new continueLastTurn() protected method, _resumableStreamOptions() hook, visibility changes from private to protected for _lastBody, _lastClientTools, and _persistOrphanedStream in packages/ai-chat/src/index.ts:298-305,1017-1032, new preserveStaleStreams constructor option in packages/agents/src/chat/resumable-stream.ts:85-91, and new exports (ChatRecoveryContext, ChatRecoveryOptions, DurableChatMethods) from packages/ai-chat/src/experimental/forever.ts.

Prompt for agents
The PR adds new protected methods and changes member visibility in packages/ai-chat and packages/agents, but no changeset file exists in .changeset/. Per the AGENTS.md rule "Changes to packages/ that affect the public API or fix bugs need a changeset", run `npx changeset` at the repo root and create entries for both `agents` (ResumableStream constructor option) and `@cloudflare/ai-chat` (continueLastTurn, _resumableStreamOptions, visibility changes, new forever.ts exports). Since these are additive/non-breaking, a minor or patch bump is appropriate.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@threepointone threepointone merged commit 8db4ef1 into main Apr 3, 2026
3 checks passed
@threepointone threepointone deleted the even-more-fibers branch April 3, 2026 19:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant