Add stream recovery and continuation to withDurableChat mixin#1254
Add stream recovery and continuation to withDurableChat mixin#1254threepointone merged 5 commits intomainfrom
Conversation
Implement interrupted-stream recovery for the experimental durable-chat mixin and wire it through client, server, agent infrastructure, and tests. Key changes: - Client: show a recovery banner and handle a `stream_recovered` WS message. - Server/agent: ForeverChatAgent overrides onStreamInterrupted to persist partial responses, queue a continuation (continueAfterRecovery), and notify clients on connect; updated system prompt to instruct assistants to continue without repeating text. - withDurableChat mixin: detect interrupted streams on start, expose StreamInterruptedContext, provide a default onStreamInterrupted (persist partial response), and add getPartialStreamText to reconstruct partial responses from stored chunks. ResumableStream is re-initialized with preserveStaleStreams to allow recovery handling. - ResumableStream: add preserveStaleStreams option and avoid deleting stale streams when preservation is enabled. - API visibility: make _lastBody, _lastClientTools and _persistOrphanedStream protected so mixins/tests can access them. - Tests: add durable-chat-recovery.test.ts and DurableChatTestAgent test helpers; update wrangler test config. Reason: allow Durable Object agents to detect and recover partially-streamed LLM responses after eviction, notify connected clients, and resume generation safely; includes tests to validate behavior.
Introduce a continuation API and more flexible recovery for durable chat. - Add AIChatAgent.continueLastTurn to append LLM output to the last assistant message (no synthetic user message) and return status. - Replace onStreamInterrupted/StreamInterruptedContext with onChatRecovery/ChatRecoveryContext and ChatRecoveryOptions (persist?, continue?) in withDurableChat. Default behavior persists partial streams and schedules a continuation. - Update withDurableChat to preserve stale streams, call onChatRecovery, persist orphaned streams by default, and schedule a _durableChatContinue that calls continueLastTurn. - Export updated DurableChatMethods signatures and new types (ChatRecoveryContext, ChatRecoveryOptions). - Update experimental/forever example: remove client-side recovery banner and server-side manual continuation/notification logic; document zero-config recovery and refined system prompt. - Add tests: new continue-last-turn.test.ts and updates to durable-chat-recovery.test.ts and worker test harness to reflect new hooks, options, and continuation behavior. These changes simplify automatic recovery after eviction and provide an explicit API for continuing interrupted assistant responses inline.
Add OpenAI and Anthropic support and provider-specific recovery strategies for the experimental forever-chat example. Changes include: - Add .env.example and update env types (env.d.ts) to include OPENAI_API_KEY and ANTHROPIC_API_KEY, plus NodeJS.ProcessEnv typing. - Add @ai-sdk/openai and @ai-sdk/anthropic dependencies to package.json. - Update README with provider recovery strategies and run instructions (copy .env.example to .env). - Client: add provider dropdown, persist provider in agent state, pass provider to agent body, and refactor message rendering to handle text/reasoning/tool UI parts and synthetic user messages. - Server: extend DurableChatAgent with AgentState, move tools and system prompt into shared constants, add multi-provider model selection, provider-specific providerOptions (OpenAI: store responses, Anthropic: disable thinking when recovering), onChatRecovery implementation (Anthropic: schedule synthetic user continuation; OpenAI: fetch the completed response via Responses API using stored response ID), includeRawChunks/onChunk for capturing OpenAI response IDs, and functions to store/retrieve response ID in sqlite. - Wrangler config: reorder fields, set name/main, add AI binding entry and declare required secrets for OPENAI_API_KEY and ANTHROPIC_API_KEY. Overall this enables streaming that survives DO eviction across Workers AI, OpenAI, and Anthropic by using different recovery approaches and wiring the UI and types to select providers.
|
agents
@cloudflare/ai-chat
@cloudflare/codemode
hono-agents
@cloudflare/shell
@cloudflare/think
@cloudflare/voice
@cloudflare/worker-bundler
commit: |
The base AIChatAgent constructor creates a ResumableStream (without preserveStaleStreams) whose restore() deletes stale streams from SQLite before the mixin constructor can re-create it with preserveStaleStreams. Fix by adding a protected _resumableStreamOptions() hook that the mixin overrides via virtual dispatch — called during the base constructor, before restore() runs. Made-with: Cursor
Stream chunks are buffered in batches of 10 before writing to SQLite. If the DO is evicted before the buffer flushes, _persistOrphanedStream finds no chunks and is a no-op — silently discarding the complete response retrieved from OpenAI's API. If a prior assistant message exists, the old code also corrupts it by overwriting with new text. Fix by creating the assistant message directly from the retrieved text instead of relying on _persistOrphanedStream to establish it. Made-with: Cursor
| * | ||
| * Returns early if there is no assistant message to continue from. | ||
| */ | ||
| protected async continueLastTurn( |
There was a problem hiding this comment.
🟡 Missing changeset for public API changes to published packages
The PR modifies published packages (packages/agents and packages/ai-chat) with new public/protected APIs but has no changeset in .changeset/. AGENTS.md explicitly requires: "Changes to packages/ that affect the public API or fix bugs need a changeset." The affected changes include: new continueLastTurn() protected method, _resumableStreamOptions() hook, visibility changes from private to protected for _lastBody, _lastClientTools, and _persistOrphanedStream in packages/ai-chat/src/index.ts:298-305,1017-1032, new preserveStaleStreams constructor option in packages/agents/src/chat/resumable-stream.ts:85-91, and new exports (ChatRecoveryContext, ChatRecoveryOptions, DurableChatMethods) from packages/ai-chat/src/experimental/forever.ts.
Prompt for agents
The PR adds new protected methods and changes member visibility in packages/ai-chat and packages/agents, but no changeset file exists in .changeset/. Per the AGENTS.md rule "Changes to packages/ that affect the public API or fix bugs need a changeset", run `npx changeset` at the repo root and create entries for both `agents` (ResumableStream constructor option) and `@cloudflare/ai-chat` (continueLastTurn, _resumableStreamOptions, visibility changes, new forever.ts exports). Since these are additive/non-breaking, a minor or patch bump is appropriate.
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
Adds crash-recovery and continuation support to the experimental
withDurableChatmixin so that a chat agent can survive Durable Object eviction mid-stream and seamlessly resume for the end user.What changed
packages/ai-chat/src/experimental/forever.ts—withDurableChatmixinonStart→checkInterruptedStream()— on every DO restart, detects an orphaned or stale stream (active but not live) and drives the recovery flow.onChatRecovery(ctx)hook — overridable method called with aChatRecoveryContext(stream ID, request ID, partial text, partial parts, full message history, last body/client tools). ReturnChatRecoveryOptionsto control behavior:{ persist: true, continue: true }(default) — save the partial response, then schedule a continuation.{ continue: false }— save partial only; don't continue.{ persist: false, continue: false }— caller handles everything (e.g. OpenAI Responses API retrieval).getPartialStreamText(streamId?)— replays stored stream chunks throughapplyChunkToPartsto reconstruct the assistant message's text and parts.DurableChatMethodsinterface — exported for consumers that need to type the mixin's added surface.DurableChatAgentClassreturn type — properly typed constructor soextends DurableChatBase<Env>works with generic env/state parameters.packages/ai-chat/src/index.ts—AIChatAgentcorecontinueLastTurn(body?)— triggers a newonChatMessagecall using the existing conversation (no new user message). The response is streamed as acontinuation: truereply, appending new parts to the last assistant message. Returns early withstatus: "skipped"if there's nothing to continue._lastBody,_lastClientTools, and_persistOrphanedStreamfromprivatetoprotectedso the mixin can access them.packages/agents/src/chat/resumable-stream.tspreserveStaleStreamsconstructor option — whentrue, stale streams (> 5 min) are kept in the DB instead of being deleted inrestore(), letting the recovery hook handle them.Tests (new)
packages/ai-chat/src/tests/durable-chat-recovery.test.ts— 8 test cases:onChatRecoveryfor orphaned streamsonChatRecoveryfor stale streams (> 5 min)persist: falseUses a
DurableChatTestStubinterface to avoid TS2589 (excessively deep type instantiation fromDurableObjectStub<DurableChatTestAgent>resolving the fullAIChatAgenttype tree through RPC serialization types).packages/ai-chat/src/tests/continue-last-turn.test.ts— 5 test cases:packages/ai-chat/src/tests/worker.ts—DurableChatTestAgentclass usingwithDurableChat(AIChatAgent)with test helpers for inserting interrupted streams, overriding recovery options, and inspecting state.Demo (experimental)
experimental/forever-chat/— multi-provider recovery demo:continueLastTurn()store: true) — zero wasted tokensonChatRecoveryoverride dispatches to provider-specific strategiesTest plan
npx tsc --noEmit -p packages/ai-chat/src/tests/tsconfig.jsonpasses cleannpm run test— newdurable-chat-recoveryandcontinue-last-turntest suites passnpm run check— formatting, linting, exports, typecheck all greencd experimental/forever-chat && npm start, send a message with each provider, verify streaming worksMade with Cursor