Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eager-fiber-recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"agents": patch
---

Run fiber recovery eagerly in `onStart()` instead of deferring to the next alarm. Interrupted fibers are now detected immediately on the first request after DO wake, with the alarm path as a fallback. A re-entrancy guard prevents double recovery.
1 change: 1 addition & 0 deletions experimental/forever-chat/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ export class ForeverChatAgent extends AIChatAgent<Env, AgentState> {
}),
tools: chatTools,
stopWhen: stepCountIs(5),
abortSignal: options?.abortSignal,
// oxlint-disable-next-line @typescript-eslint/no-explicit-any -- provider-specific options
providerOptions: providerConfig.providerOptions as any,
includeRawChunks: providerConfig.includeRawChunks,
Expand Down
4 changes: 2 additions & 2 deletions experimental/forever-fibers/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Forever Fibers — Durable Execution Demo

Demonstrates durable long-running execution with `Agent.runFiber()` — work that survives Durable Object eviction via SQLite checkpointing and alarm-based recovery.
Demonstrates durable long-running execution with `Agent.runFiber()` — work that survives Durable Object eviction via SQLite checkpointing and automatic recovery on wake.

See [forever.md](../forever.md) for the full design doc.

Expand All @@ -9,7 +9,7 @@ See [forever.md](../forever.md) for the full design doc.
- `runFiber()` — start a multi-step research task that runs in the background
- `ctx.stash()` — checkpoint progress after each step (persisted in SQLite)
- `onFiberRecovered()` — automatically resume from the last checkpoint after eviction
- Real eviction testing — kill the wrangler process externally and restart; alarms persist to disk, so recovery happens automatically (same as production)
- Real eviction testing — kill the wrangler process externally and restart; recovery runs eagerly on the first request (same as production)

## Run it

Expand Down
20 changes: 11 additions & 9 deletions experimental/forever.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ No schedule rows are created in `cf_agents_schedules`. The heartbeat is invisibl

### Alarm persistence

`ctx.storage.setAlarm()` persists to disk. If the DO is evicted or the process dies, the alarm fires on restart, triggering `_onAlarmHousekeeping()` — which is where fiber recovery runs.
`ctx.storage.setAlarm()` persists to disk. If the DO is evicted or the process dies, recovery runs eagerly in `onStart()` on the first request after wake. The persisted alarm serves as a fallback — `_onAlarmHousekeeping()` also calls `_checkRunFibers()`, with a re-entrancy guard preventing double recovery.

### Configurable interval

Expand Down Expand Up @@ -174,9 +174,11 @@ runFiber("work", fn)
```
[DO evicted — all in-memory state lost]
├─ Heartbeat alarm fires → DO restarts → constructor runs
├─ First request/connection → onStart() → _checkRunFibers() [primary]
│ OR
├─ Heartbeat alarm fires → _onAlarmHousekeeping() → _checkRunFibers() [fallback]
├─ alarm() → _onAlarmHousekeeping() → _checkRunFibers()
├─ _checkRunFibers() (runs once — re-entrancy guard prevents double recovery)
│ │
│ ├─ SELECT * FROM cf_agents_runs
│ ├─ For each row NOT in _runFiberActiveFibers (in-memory set):
Expand Down Expand Up @@ -429,7 +431,7 @@ Key behaviors during hibernation:

- **`cf_agents_runs` rows persist** — SQLite survives hibernation
- **`_runFiberActiveFibers` (in-memory Set) is empty** — reconstructed from scratch
- **`_checkRunFibers()` runs on the first alarm after wake** — any row NOT in the (empty) in-memory set is treated as interrupted
- **`_checkRunFibers()` runs eagerly in `onStart()` on first wake** — any row NOT in the (empty) in-memory set is treated as interrupted. The alarm path also calls it as a fallback, with a re-entrancy guard preventing double recovery
- **`_lastBody` and `_lastClientTools` are restored from SQLite** — `cf_ai_chat_request_context` table, restored in the constructor

The `cf_agents_runs` table is created with `CREATE TABLE IF NOT EXISTS` in the Agent constructor — cheap DDL that runs every wake. No `_fibersTableCreated` flag needed (that would reset on hibernation and miss recovery).
Expand All @@ -438,20 +440,20 @@ The `cf_agents_runs` table is created with `CREATE TABLE IF NOT EXISTS` in the A

1. `runFiber()` calls `keepAlive()`, which increments `_keepAliveRefs` and calls `_scheduleNextAlarm()`
2. `_scheduleNextAlarm()` sets `ctx.storage.setAlarm(now + keepAliveIntervalMs)` — persists to disk
3. When the alarm fires, `alarm()` processes schedules, then calls `_onAlarmHousekeeping()`
4. `_onAlarmHousekeeping()` calls `_checkRunFibers()` — finds orphaned rows and triggers recovery
3. On wake, `onStart()` calls `_checkRunFibers()` eagerly — recovery fires immediately on the first request
4. The alarm also calls `_onAlarmHousekeeping()` `_checkRunFibers()` as a fallback (re-entrancy guard prevents double recovery)
5. `_scheduleNextAlarm()` sets the next alarm if refs are still held

The alarm handler itself runs for milliseconds (a few SQL queries + setting the next alarm). The actual fiber work runs in the method execution context, not the alarm handler. The 15-minute alarm timeout is a non-issue.

## Local development

Workerd persists alarm state to disk. Local development and production behave identically for fiber recovery:
Workerd persists both SQLite and alarm state to disk. Local development and production behave identically for fiber recovery:

1. Fiber is running, `keepAlive()` sets an alarm
2. Process is killed (SIGKILL, code update, `Ctrl-C`)
3. Process restarts — workerd reads persisted alarm state from disk
4. Alarm fires → `_onAlarmHousekeeping()` → `_checkRunFibers()` → recovery
3. Process restarts — first request triggers `onStart()` → `_checkRunFibers()` → recovery
4. Persisted alarm also fires as fallback → `_onAlarmHousekeeping()` → `_checkRunFibers()` (no-op, already recovered)

The E2E test in `packages/agents/src/e2e-tests/` validates this: it starts wrangler, spawns a fiber, kills the process with SIGKILL, restarts with the same persist directory, and verifies the fiber recovers automatically.

Expand Down
1 change: 1 addition & 0 deletions packages/agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,7 @@ export class Agent<
this.broadcastMcpServers();

this._checkOrphanedWorkflows();
await this._checkRunFibers();

this._insideOnStart = true;
this._warnedScheduleInOnStart.clear();
Expand Down
49 changes: 49 additions & 0 deletions packages/ai-chat/src/tests/durable-chat-recovery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,53 @@ describe("onChatRecovery", () => {
expect(fiberCtx.partialText).toBe("Fiber recovery text");
expect(fiberCtx.recoveryData).toEqual({ someUserData: true });
});

it("should not double-recover when _checkRunFibers runs from both onStart and alarm", async () => {
const room = crypto.randomUUID();
const agentStub = await getTestAgent(room);
await agentStub.setRecoveryOverride({ continue: false });

await agentStub.persistMessages([
{
id: "user-1",
role: "user",
parts: [{ type: "text", text: "Hello" }]
}
] as ChatMessage[]);

await agentStub.insertInterruptedStream(
"stream-double",
"req-double",
makeChunks(["Double recovery text"], "assistant-double")
);
await agentStub.insertInterruptedFiber(
"__cf_internal_chat_turn:req-double"
);

// First call (simulates onStart path)
await agentStub.triggerFiberRecovery();

// Second call (simulates alarm path — should be a no-op since
// the fiber row was deleted after the first recovery)
await agentStub.triggerFiberRecovery();

const contexts = (await agentStub.getRecoveryContexts()) as Array<{
streamId: string;
partialText: string;
}>;

// Recovery should have fired exactly once, not twice
const doubleContexts = contexts.filter(
(c) => c.streamId === "stream-double"
);
expect(doubleContexts).toHaveLength(1);
expect(doubleContexts[0].partialText).toBe("Double recovery text");

// Message should be persisted once (not duplicated)
const messages = (await agentStub.getPersistedMessages()) as ChatMessage[];
const assistantMessages = messages.filter(
(m: ChatMessage) => m.role === "assistant"
);
expect(assistantMessages).toHaveLength(1);
});
});
7 changes: 4 additions & 3 deletions packages/ai-chat/src/tests/message-concurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -589,17 +589,18 @@ describe("AIChatAgent messageConcurrency", () => {

sendChatRequest(ws, "req-clear-stale-1", [firstUserMessage], {
format: "plaintext",
responseDelayMs: 300,
responseDelayMs: 500,
chunkCount: 1,
chunkDelayMs: 10
});
await delay(20);
await delay(50);

ws.send(
JSON.stringify({
type: MessageType.CF_AGENT_CHAT_CLEAR
})
);
await delay(20);

sendChatRequest(ws, "req-clear-stale-2", [secondUserMessage], {
format: "plaintext",
Expand All @@ -619,7 +620,7 @@ describe("AIChatAgent messageConcurrency", () => {
);

return userTexts.includes("Second");
}, 5000);
}, 8000);
await expect(
agentStub.waitUntilStableForTest({ timeout: 15_000 })
).resolves.toBe(true);
Expand Down
Loading