diff --git a/cloudflare-gastown/src/dos/Town.do.ts b/cloudflare-gastown/src/dos/Town.do.ts index 0cf7c974a..92c4c0363 100644 --- a/cloudflare-gastown/src/dos/Town.do.ts +++ b/cloudflare-gastown/src/dos/Town.do.ts @@ -240,7 +240,11 @@ export class TownDO extends DurableObject { } private emitEvent(data: Omit): void { - writeEvent(this.env, { ...data, delivery: 'internal', userId: this._ownerUserId }); + writeEvent(this.env, { + ...data, + delivery: 'internal', + userId: this._ownerUserId, + }); } /** Build the context object used by the scheduling sub-module. */ @@ -290,7 +294,9 @@ export class TownDO extends DurableObject { }); } - return scheduling.dispatchAgent(schedulingCtx, agent, bead, { systemPromptOverride }); + return scheduling.dispatchAgent(schedulingCtx, agent, bead, { + systemPromptOverride, + }); }, stopAgent: async agentId => { await dispatch.stopAgentInContainer(this.env, this.townId, agentId); @@ -677,7 +683,9 @@ export class TownDO extends DurableObject { const townConfig = await this.getTownConfig(); if (!townConfig.kilocode_token || townConfig.kilocode_token !== rigConfig.kilocodeToken) { console.log(`${TOWN_LOG} configureRig: propagating kilocodeToken to town config`); - await this.updateTownConfig({ kilocode_token: rigConfig.kilocodeToken }); + await this.updateTownConfig({ + kilocode_token: rigConfig.kilocodeToken, + }); } } @@ -1214,10 +1222,14 @@ export class TownDO extends DurableObject { * Return undelivered, non-expired nudges for an agent. * Urgent nudges are returned first, then FIFO within same priority. */ - async getPendingNudges( - agentId: string - ): Promise< - { nudge_id: string; message: string; mode: string; priority: string; source: string }[] + async getPendingNudges(agentId: string): Promise< + { + nudge_id: string; + message: string; + mode: string; + priority: string; + source: string; + }[] > { const rows = [ ...query( @@ -1768,7 +1780,12 @@ export class TownDO extends DurableObject { /** Build the rig list for mayor agent startup (browse worktree setup on fresh containers). */ private async rigListForMayor(): Promise< - Array<{ rigId: string; gitUrl: string; defaultBranch: string; platformIntegrationId?: string }> + Array<{ + rigId: string; + gitUrl: string; + defaultBranch: string; + platformIntegrationId?: string; + }> > { const rigRecords = rigs.listRigs(this.sql); return Promise.all( @@ -1792,7 +1809,10 @@ export class TownDO extends DurableObject { message: string, _model?: string, uiContext?: string - ): Promise<{ agentId: string; sessionStatus: 'idle' | 'active' | 'starting' }> { + ): Promise<{ + agentId: string; + sessionStatus: 'idle' | 'active' | 'starting'; + }> { const townId = this.townId; let mayor = agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null; @@ -1876,7 +1896,10 @@ export class TownDO extends DurableObject { * Called eagerly on page load so the terminal is available immediately * without requiring the user to send a message first. */ - async ensureMayor(): Promise<{ agentId: string; sessionStatus: 'idle' | 'active' | 'starting' }> { + async ensureMayor(): Promise<{ + agentId: string; + sessionStatus: 'idle' | 'active' | 'starting'; + }> { const townId = this.townId; let mayor = agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null; @@ -2250,7 +2273,10 @@ export class TownDO extends DurableObject { tasks: Array<{ title: string; body?: string; depends_on?: number[] }>; merge_mode?: 'review-then-land' | 'review-and-merge'; staged?: boolean; - }): Promise<{ convoy: ConvoyEntry; beads: Array<{ bead: Bead; agent: Agent | null }> }> { + }): Promise<{ + convoy: ConvoyEntry; + beads: Array<{ bead: Bead; agent: Agent | null }>; + }> { // Resolve staged: explicit request wins, otherwise fall back to town config default. const townConfig = await this.getTownConfig(); const isStaged = input.staged ?? townConfig.staged_convoys_default; @@ -2449,9 +2475,10 @@ export class TownDO extends DurableObject { /** * Transition a staged convoy to active: hook agents and begin dispatch. */ - async startConvoy( - convoyId: string - ): Promise<{ convoy: ConvoyEntry; beads: Array<{ bead: Bead; agent: Agent | null }> }> { + async startConvoy(convoyId: string): Promise<{ + convoy: ConvoyEntry; + beads: Array<{ bead: Bead; agent: Agent | null }>; + }> { const convoy = this.getConvoy(convoyId); if (!convoy) throw new Error(`Convoy not found: ${convoyId}`); if (!convoy.staged) throw new Error(`Convoy is not staged: ${convoyId}`); @@ -2994,9 +3021,14 @@ export class TownDO extends DurableObject { const violations = reconciler.checkInvariants(this.sql); metrics.invariantViolations = violations.length; if (violations.length > 0) { - console.error( - `${TOWN_LOG} [reconciler:invariants] town=${townId} ${violations.length} violation(s): ${JSON.stringify(violations)}` - ); + // Emit as an analytics event for observability dashboards instead + // of console.error (which spams Workers logs every 5s per town). + this.emitEvent({ + event: 'reconciler.invariant_violations', + townId, + label: violations.map(v => `[${v.invariant}] ${v.message}`).join('; '), + value: violations.length, + }); } } catch (err) { console.warn(`${TOWN_LOG} [reconciler:invariants] town=${townId} check failed`, err); @@ -3575,7 +3607,13 @@ export class TownDO extends DurableObject { [] ), ]; - const beadCounts = { open: 0, inProgress: 0, inReview: 0, failed: 0, triageRequests: 0 }; + const beadCounts = { + open: 0, + inProgress: 0, + inReview: 0, + failed: 0, + triageRequests: 0, + }; for (const row of beadRows) { const s = `${row.status as string}`; const c = Number(row.cnt); diff --git a/cloudflare-gastown/src/dos/town/agents.ts b/cloudflare-gastown/src/dos/town/agents.ts index 62d68acdc..c208a3834 100644 --- a/cloudflare-gastown/src/dos/town/agents.ts +++ b/cloudflare-gastown/src/dos/town/agents.ts @@ -546,11 +546,20 @@ export function touchAgent( activeTools?: string[]; } ): void { + // A heartbeat is proof the agent is alive in the container. + // If the agent's status is 'idle' (e.g. due to a dispatch timeout + // race — see #1358), restore it to 'working'. This prevents the + // reconciler from treating the agent as lost while it's actively + // sending heartbeats. query( sql, /* sql */ ` UPDATE ${agent_metadata} SET ${agent_metadata.columns.last_activity_at} = ?, + ${agent_metadata.columns.status} = CASE + WHEN ${agent_metadata.columns.status} = 'idle' THEN 'working' + ELSE ${agent_metadata.columns.status} + END, ${agent_metadata.columns.last_event_type} = COALESCE(?, ${agent_metadata.columns.last_event_type}), ${agent_metadata.columns.last_event_at} = COALESCE(?, ${agent_metadata.columns.last_event_at}), ${agent_metadata.columns.active_tools} = COALESCE(?, ${agent_metadata.columns.active_tools}) diff --git a/cloudflare-gastown/src/dos/town/reconciler.ts b/cloudflare-gastown/src/dos/town/reconciler.ts index fcaae03e4..0e9ff7b0e 100644 --- a/cloudflare-gastown/src/dos/town/reconciler.ts +++ b/cloudflare-gastown/src/dos/town/reconciler.ts @@ -249,7 +249,18 @@ export function applyEvent(sql: SqlStorage, event: TownEventRecord): void { const agent = agents.getAgent(sql, event.agent_id); if (!agent) return; - // Only act on working/stalled agents whose container has stopped + // Only act on working/stalled agents whose container has stopped. + // For 'not_found': skip if the agent was dispatched recently (#1358). + // During a cold start the container may 404 on /agents/:id/status + // because the agent hasn't registered in the process manager yet. + // The 3-minute grace period covers the 60s HTTP timeout plus + // typical cold start time (git clone + worktree). Truly dead + // agents are caught by reconcileAgents after 90s of no heartbeats. + if (containerStatus === 'not_found' && agent.last_activity_at) { + const ageSec = (Date.now() - new Date(agent.last_activity_at).getTime()) / 1000; + if (ageSec < 180) return; // 3-minute grace for cold starts + } + if ( (agent.status === 'working' || agent.status === 'stalled') && (containerStatus === 'exited' || containerStatus === 'not_found') @@ -340,6 +351,9 @@ export function reconcileAgents(sql: SqlStorage): Action[] { ]); for (const agent of workingAgents) { + // Mayors are always working with no hook — skip them + if (agent.role === 'mayor') continue; + if (!agent.last_activity_at) { // No heartbeat ever received — container may have failed to start actions.push({ @@ -357,6 +371,18 @@ export function reconcileAgents(sql: SqlStorage): Action[] { to: 'idle', reason: 'heartbeat lost (3 missed cycles)', }); + } else if (!agent.current_hook_bead_id) { + // Agent is working with fresh heartbeat but no hook — it's running + // in the container but has no bead to work on (gt_done already ran, + // or the hook was cleared by another code path). Set to idle so + // processReviewQueue / schedulePendingWork can use it. + actions.push({ + type: 'transition_agent', + agent_id: agent.bead_id, + from: 'working', + to: 'idle', + reason: 'working agent has no hook (gt_done already completed)', + }); } } @@ -599,18 +625,24 @@ export function reconcileBeads(sql: SqlStorage): Action[] { for (const bead of staleInProgress) { if (!staleMs(bead.updated_at, STALE_IN_PROGRESS_TIMEOUT_MS)) continue; - // Check if any agent is hooked AND working/stalled + // Check if any agent is hooked AND (working/stalled OR has a recent + // heartbeat). The heartbeat check is defense-in-depth for #1358: if + // the agent's status is wrong (e.g. stuck on 'idle' due to a dispatch + // timeout race), a fresh heartbeat proves the agent is alive. const hookedAgent = z - .object({ status: z.string() }) + .object({ status: z.string(), last_activity_at: z.string().nullable() }) .array() .parse([ ...query( sql, /* sql */ ` - SELECT ${agent_metadata.status} + SELECT ${agent_metadata.status}, ${agent_metadata.last_activity_at} FROM ${agent_metadata} WHERE ${agent_metadata.current_hook_bead_id} = ? - AND ${agent_metadata.status} IN ('working', 'stalled') + AND ( + ${agent_metadata.status} IN ('working', 'stalled') + OR ${agent_metadata.last_activity_at} > strftime('%Y-%m-%dT%H:%M:%fZ', 'now', '-90 seconds') + ) `, [bead.bead_id] ), @@ -991,7 +1023,11 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] { } const mrRows = z - .object({ status: z.string(), type: z.string(), rig_id: z.string().nullable() }) + .object({ + status: z.string(), + type: z.string(), + rig_id: z.string().nullable(), + }) .array() .parse([ ...query( diff --git a/cloudflare-gastown/src/dos/town/review-queue.ts b/cloudflare-gastown/src/dos/town/review-queue.ts index 5b746cb6c..5eb1ad69f 100644 --- a/cloudflare-gastown/src/dos/town/review-queue.ts +++ b/cloudflare-gastown/src/dos/town/review-queue.ts @@ -24,7 +24,7 @@ import { getConvoyFeatureBranch, getConvoyMergeMode, } from './beads'; -import { getAgent, unhookBead } from './agents'; +import { getAgent, unhookBead, updateAgentStatus } from './agents'; import { getRig } from './rigs'; import type { ReviewQueueInput, ReviewQueueEntry, AgentDoneInput, Molecule } from '../../types'; @@ -621,6 +621,11 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu } unhookBead(sql, agentId); + // Set refinery to idle immediately — the review is done and the + // refinery is available for new work. Without this, processReviewQueue + // sees the refinery as 'working' and won't pop the next MR bead until + // agentCompleted fires (when the container process eventually exits). + updateAgentStatus(sql, agentId, 'idle'); return; } @@ -722,7 +727,11 @@ export function agentCompleted( } } - // Mark agent idle. + // Mark agent idle — but ONLY if it hasn't been re-dispatched (status + // still 'working' on new work) since gt_done ran. agentCompleted can + // arrive after the agent has been re-hooked and dispatched for a new + // bead. Without this guard, the stale completion event would clobber + // the live dispatch. // For refineries, preserve dispatch_attempts so Rule 6's circuit-breaker // can track cumulative re-dispatch attempts across idle→dispatch cycles. // Resetting to 0 here was enabling infinite loops (#1342). Non-refineries @@ -737,6 +746,10 @@ export function agentCompleted( ELSE 0 END WHERE ${agent_metadata.bead_id} = ? + AND NOT ( + ${agent_metadata.columns.status} = 'working' + AND ${agent_metadata.columns.current_hook_bead_id} IS NOT NULL + ) `, [agentId] ); diff --git a/cloudflare-gastown/src/dos/town/scheduling.ts b/cloudflare-gastown/src/dos/town/scheduling.ts index 0fb9544ad..e18d14eff 100644 --- a/cloudflare-gastown/src/dos/town/scheduling.ts +++ b/cloudflare-gastown/src/dos/town/scheduling.ts @@ -159,20 +159,11 @@ export async function dispatchAgent( }); } else { // Container start returned false — but the container may have - // actually started the agent (timeout race). DON'T roll back - // the bead to open. Leave it in_progress with the agent idle+hooked. - // If the agent truly failed: rehookOrphanedBeads recovers after 2 min. - // If the agent actually started: it works and calls gt_done normally. - query( - ctx.sql, - /* sql */ ` - UPDATE ${agent_metadata} - SET ${agent_metadata.columns.status} = 'idle', - ${agent_metadata.columns.last_activity_at} = ? - WHERE ${agent_metadata.bead_id} = ? - `, - [now(), agent.id] - ); + // actually started the agent (timeout race). Leave the agent + // as 'working' so the reconciler doesn't treat it as lost. + // If the agent truly didn't start: reconcileAgents catches it + // after 90s of missing heartbeats and transitions to 'idle'. + // If the agent actually started: heartbeats keep it alive. (#1358) ctx.emitEvent({ event: 'agent.dispatch_failed', townId: ctx.townId, @@ -185,7 +176,9 @@ export async function dispatchAgent( return started; } catch (err) { console.error(`${LOG} dispatchAgent: failed for agent=${agent.id}:`, err); - Sentry.captureException(err, { extra: { agentId: agent.id, beadId: bead.bead_id } }); + Sentry.captureException(err, { + extra: { agentId: agent.id, beadId: bead.bead_id }, + }); try { query( ctx.sql, diff --git a/cloudflare-gastown/test/integration/reconciler.test.ts b/cloudflare-gastown/test/integration/reconciler.test.ts index b7373b345..e5e286924 100644 --- a/cloudflare-gastown/test/integration/reconciler.test.ts +++ b/cloudflare-gastown/test/integration/reconciler.test.ts @@ -126,6 +126,77 @@ describe('Reconciler', () => { }); }); + // ── #1358: Heartbeat restores working status ──────────────────────── + + describe('#1358: dispatch timeout race recovery', () => { + it('should restore idle agent to working on heartbeat', async () => { + const agent = await town.registerAgent({ + role: 'polecat', + name: 'P1', + identity: `heartbeat-restore-${townName}`, + rig_id: 'rig-1', + }); + const bead = await town.createBead({ + type: 'issue', + title: 'Heartbeat test', + rig_id: 'rig-1', + }); + + // Simulate dispatch timeout race: agent is hooked + idle + // (dispatchAgent set it to working, then timeout set it back to idle) + await town.hookBead(agent.id, bead.bead_id); + await town.updateAgentStatus(agent.id, 'idle'); + + const before = await town.getAgentAsync(agent.id); + expect(before?.status).toBe('idle'); + + // Agent sends a heartbeat (proving it's alive in the container) + await town.touchAgentHeartbeat(agent.id); + + // Status should be restored to working + const after = await town.getAgentAsync(agent.id); + expect(after?.status).toBe('working'); + }); + + it('should not change status of a working agent on heartbeat', async () => { + const agent = await town.registerAgent({ + role: 'polecat', + name: 'P2', + identity: `heartbeat-noop-${townName}`, + rig_id: 'rig-1', + }); + const bead = await town.createBead({ + type: 'issue', + title: 'Heartbeat noop test', + rig_id: 'rig-1', + }); + + await town.hookBead(agent.id, bead.bead_id); + await town.updateAgentStatus(agent.id, 'working'); + + await town.touchAgentHeartbeat(agent.id); + + const after = await town.getAgentAsync(agent.id); + expect(after?.status).toBe('working'); + }); + + it('should not change status of an exited agent on heartbeat', async () => { + const agent = await town.registerAgent({ + role: 'polecat', + name: 'P3', + identity: `heartbeat-exited-${townName}`, + rig_id: 'rig-1', + }); + + await town.updateAgentStatus(agent.id, 'exited'); + + await town.touchAgentHeartbeat(agent.id); + + const after = await town.getAgentAsync(agent.id); + expect(after?.status).toBe('exited'); + }); + }); + // ── Event-driven agentDone ────────────────────────────────────────── describe('event-driven agentDone', () => {