Skip to content
74 changes: 56 additions & 18 deletions cloudflare-gastown/src/dos/Town.do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ export class TownDO extends DurableObject<Env> {
}

private emitEvent(data: Omit<GastownEventData, 'userId' | 'delivery'>): void {
writeEvent(this.env, { ...data, delivery: 'internal', userId: this._ownerUserId });
writeEvent(this.env, {
...data,
delivery: 'internal',
userId: this._ownerUserId,
});
}

/** Build the context object used by the scheduling sub-module. */
Expand Down Expand Up @@ -290,7 +294,9 @@ export class TownDO extends DurableObject<Env> {
});
}

return scheduling.dispatchAgent(schedulingCtx, agent, bead, { systemPromptOverride });
return scheduling.dispatchAgent(schedulingCtx, agent, bead, {
systemPromptOverride,
});
},
stopAgent: async agentId => {
await dispatch.stopAgentInContainer(this.env, this.townId, agentId);
Expand Down Expand Up @@ -677,7 +683,9 @@ export class TownDO extends DurableObject<Env> {
const townConfig = await this.getTownConfig();
if (!townConfig.kilocode_token || townConfig.kilocode_token !== rigConfig.kilocodeToken) {
console.log(`${TOWN_LOG} configureRig: propagating kilocodeToken to town config`);
await this.updateTownConfig({ kilocode_token: rigConfig.kilocodeToken });
await this.updateTownConfig({
kilocode_token: rigConfig.kilocodeToken,
});
}
}

Expand Down Expand Up @@ -1214,10 +1222,14 @@ export class TownDO extends DurableObject<Env> {
* Return undelivered, non-expired nudges for an agent.
* Urgent nudges are returned first, then FIFO within same priority.
*/
async getPendingNudges(
agentId: string
): Promise<
{ nudge_id: string; message: string; mode: string; priority: string; source: string }[]
async getPendingNudges(agentId: string): Promise<
{
nudge_id: string;
message: string;
mode: string;
priority: string;
source: string;
}[]
> {
const rows = [
...query(
Expand Down Expand Up @@ -1768,7 +1780,12 @@ export class TownDO extends DurableObject<Env> {

/** Build the rig list for mayor agent startup (browse worktree setup on fresh containers). */
private async rigListForMayor(): Promise<
Array<{ rigId: string; gitUrl: string; defaultBranch: string; platformIntegrationId?: string }>
Array<{
rigId: string;
gitUrl: string;
defaultBranch: string;
platformIntegrationId?: string;
}>
> {
const rigRecords = rigs.listRigs(this.sql);
return Promise.all(
Expand All @@ -1792,7 +1809,10 @@ export class TownDO extends DurableObject<Env> {
message: string,
_model?: string,
uiContext?: string
): Promise<{ agentId: string; sessionStatus: 'idle' | 'active' | 'starting' }> {
): Promise<{
agentId: string;
sessionStatus: 'idle' | 'active' | 'starting';
}> {
const townId = this.townId;

let mayor = agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null;
Expand Down Expand Up @@ -1876,7 +1896,10 @@ export class TownDO extends DurableObject<Env> {
* Called eagerly on page load so the terminal is available immediately
* without requiring the user to send a message first.
*/
async ensureMayor(): Promise<{ agentId: string; sessionStatus: 'idle' | 'active' | 'starting' }> {
async ensureMayor(): Promise<{
agentId: string;
sessionStatus: 'idle' | 'active' | 'starting';
}> {
const townId = this.townId;

let mayor = agents.listAgents(this.sql, { role: 'mayor' })[0] ?? null;
Expand Down Expand Up @@ -2250,7 +2273,10 @@ export class TownDO extends DurableObject<Env> {
tasks: Array<{ title: string; body?: string; depends_on?: number[] }>;
merge_mode?: 'review-then-land' | 'review-and-merge';
staged?: boolean;
}): Promise<{ convoy: ConvoyEntry; beads: Array<{ bead: Bead; agent: Agent | null }> }> {
}): Promise<{
convoy: ConvoyEntry;
beads: Array<{ bead: Bead; agent: Agent | null }>;
}> {
// Resolve staged: explicit request wins, otherwise fall back to town config default.
const townConfig = await this.getTownConfig();
const isStaged = input.staged ?? townConfig.staged_convoys_default;
Expand Down Expand Up @@ -2449,9 +2475,10 @@ export class TownDO extends DurableObject<Env> {
/**
* Transition a staged convoy to active: hook agents and begin dispatch.
*/
async startConvoy(
convoyId: string
): Promise<{ convoy: ConvoyEntry; beads: Array<{ bead: Bead; agent: Agent | null }> }> {
async startConvoy(convoyId: string): Promise<{
convoy: ConvoyEntry;
beads: Array<{ bead: Bead; agent: Agent | null }>;
}> {
const convoy = this.getConvoy(convoyId);
if (!convoy) throw new Error(`Convoy not found: ${convoyId}`);
if (!convoy.staged) throw new Error(`Convoy is not staged: ${convoyId}`);
Expand Down Expand Up @@ -2994,9 +3021,14 @@ export class TownDO extends DurableObject<Env> {
const violations = reconciler.checkInvariants(this.sql);
metrics.invariantViolations = violations.length;
if (violations.length > 0) {
console.error(
`${TOWN_LOG} [reconciler:invariants] town=${townId} ${violations.length} violation(s): ${JSON.stringify(violations)}`
);
// Emit as an analytics event for observability dashboards instead
// of console.error (which spams Workers logs every 5s per town).
this.emitEvent({
event: 'reconciler.invariant_violations',
townId,
label: violations.map(v => `[${v.invariant}] ${v.message}`).join('; '),
value: violations.length,
});
}
} catch (err) {
console.warn(`${TOWN_LOG} [reconciler:invariants] town=${townId} check failed`, err);
Expand Down Expand Up @@ -3575,7 +3607,13 @@ export class TownDO extends DurableObject<Env> {
[]
),
];
const beadCounts = { open: 0, inProgress: 0, inReview: 0, failed: 0, triageRequests: 0 };
const beadCounts = {
open: 0,
inProgress: 0,
inReview: 0,
failed: 0,
triageRequests: 0,
};
for (const row of beadRows) {
const s = `${row.status as string}`;
const c = Number(row.cnt);
Expand Down
9 changes: 9 additions & 0 deletions cloudflare-gastown/src/dos/town/agents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,20 @@ export function touchAgent(
activeTools?: string[];
}
): void {
// A heartbeat is proof the agent is alive in the container.
// If the agent's status is 'idle' (e.g. due to a dispatch timeout
// race — see #1358), restore it to 'working'. This prevents the
// reconciler from treating the agent as lost while it's actively
// sending heartbeats.
query(
sql,
/* sql */ `
UPDATE ${agent_metadata}
SET ${agent_metadata.columns.last_activity_at} = ?,
${agent_metadata.columns.status} = CASE
WHEN ${agent_metadata.columns.status} = 'idle' THEN 'working'
ELSE ${agent_metadata.columns.status}
END,
${agent_metadata.columns.last_event_type} = COALESCE(?, ${agent_metadata.columns.last_event_type}),
${agent_metadata.columns.last_event_at} = COALESCE(?, ${agent_metadata.columns.last_event_at}),
${agent_metadata.columns.active_tools} = COALESCE(?, ${agent_metadata.columns.active_tools})
Expand Down
48 changes: 42 additions & 6 deletions cloudflare-gastown/src/dos/town/reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,18 @@ export function applyEvent(sql: SqlStorage, event: TownEventRecord): void {
const agent = agents.getAgent(sql, event.agent_id);
if (!agent) return;

// Only act on working/stalled agents whose container has stopped
// Only act on working/stalled agents whose container has stopped.
// For 'not_found': skip if the agent was dispatched recently (#1358).
// During a cold start the container may 404 on /agents/:id/status
// because the agent hasn't registered in the process manager yet.
// The 3-minute grace period covers the 60s HTTP timeout plus
// typical cold start time (git clone + worktree). Truly dead
// agents are caught by reconcileAgents after 90s of no heartbeats.
if (containerStatus === 'not_found' && agent.last_activity_at) {
const ageSec = (Date.now() - new Date(agent.last_activity_at).getTime()) / 1000;
if (ageSec < 180) return; // 3-minute grace for cold starts
}

if (
(agent.status === 'working' || agent.status === 'stalled') &&
(containerStatus === 'exited' || containerStatus === 'not_found')
Expand Down Expand Up @@ -340,6 +351,9 @@ export function reconcileAgents(sql: SqlStorage): Action[] {
]);

for (const agent of workingAgents) {
// Mayors are always working with no hook — skip them
if (agent.role === 'mayor') continue;

if (!agent.last_activity_at) {
// No heartbeat ever received — container may have failed to start
actions.push({
Expand All @@ -357,6 +371,18 @@ export function reconcileAgents(sql: SqlStorage): Action[] {
to: 'idle',
reason: 'heartbeat lost (3 missed cycles)',
});
} else if (!agent.current_hook_bead_id) {
// Agent is working with fresh heartbeat but no hook — it's running
// in the container but has no bead to work on (gt_done already ran,
// or the hook was cleared by another code path). Set to idle so
// processReviewQueue / schedulePendingWork can use it.
actions.push({
type: 'transition_agent',
agent_id: agent.bead_id,
from: 'working',
to: 'idle',
reason: 'working agent has no hook (gt_done already completed)',
});
}
}

Expand Down Expand Up @@ -599,18 +625,24 @@ export function reconcileBeads(sql: SqlStorage): Action[] {
for (const bead of staleInProgress) {
if (!staleMs(bead.updated_at, STALE_IN_PROGRESS_TIMEOUT_MS)) continue;

// Check if any agent is hooked AND working/stalled
// Check if any agent is hooked AND (working/stalled OR has a recent
// heartbeat). The heartbeat check is defense-in-depth for #1358: if
// the agent's status is wrong (e.g. stuck on 'idle' due to a dispatch
// timeout race), a fresh heartbeat proves the agent is alive.
const hookedAgent = z
.object({ status: z.string() })
.object({ status: z.string(), last_activity_at: z.string().nullable() })
.array()
.parse([
...query(
sql,
/* sql */ `
SELECT ${agent_metadata.status}
SELECT ${agent_metadata.status}, ${agent_metadata.last_activity_at}
FROM ${agent_metadata}
WHERE ${agent_metadata.current_hook_bead_id} = ?
AND ${agent_metadata.status} IN ('working', 'stalled')
AND (
${agent_metadata.status} IN ('working', 'stalled')
OR ${agent_metadata.last_activity_at} > strftime('%Y-%m-%dT%H:%M:%fZ', 'now', '-90 seconds')
)
`,
[bead.bead_id]
),
Expand Down Expand Up @@ -991,7 +1023,11 @@ export function reconcileReviewQueue(sql: SqlStorage): Action[] {
}

const mrRows = z
.object({ status: z.string(), type: z.string(), rig_id: z.string().nullable() })
.object({
status: z.string(),
type: z.string(),
rig_id: z.string().nullable(),
})
.array()
.parse([
...query(
Expand Down
17 changes: 15 additions & 2 deletions cloudflare-gastown/src/dos/town/review-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
getConvoyFeatureBranch,
getConvoyMergeMode,
} from './beads';
import { getAgent, unhookBead } from './agents';
import { getAgent, unhookBead, updateAgentStatus } from './agents';
import { getRig } from './rigs';
import type { ReviewQueueInput, ReviewQueueEntry, AgentDoneInput, Molecule } from '../../types';

Expand Down Expand Up @@ -621,6 +621,11 @@ export function agentDone(sql: SqlStorage, agentId: string, input: AgentDoneInpu
}

unhookBead(sql, agentId);
// Set refinery to idle immediately — the review is done and the
// refinery is available for new work. Without this, processReviewQueue
// sees the refinery as 'working' and won't pop the next MR bead until
// agentCompleted fires (when the container process eventually exits).
updateAgentStatus(sql, agentId, 'idle');
return;
}

Expand Down Expand Up @@ -722,7 +727,11 @@ export function agentCompleted(
}
}

// Mark agent idle.
// Mark agent idle — but ONLY if it hasn't been re-dispatched (status
// still 'working' on new work) since gt_done ran. agentCompleted can
// arrive after the agent has been re-hooked and dispatched for a new
// bead. Without this guard, the stale completion event would clobber
// the live dispatch.
// For refineries, preserve dispatch_attempts so Rule 6's circuit-breaker
// can track cumulative re-dispatch attempts across idle→dispatch cycles.
// Resetting to 0 here was enabling infinite loops (#1342). Non-refineries
Expand All @@ -737,6 +746,10 @@ export function agentCompleted(
ELSE 0
END
WHERE ${agent_metadata.bead_id} = ?
AND NOT (
${agent_metadata.columns.status} = 'working'
AND ${agent_metadata.columns.current_hook_bead_id} IS NOT NULL
)
`,
[agentId]
);
Expand Down
23 changes: 8 additions & 15 deletions cloudflare-gastown/src/dos/town/scheduling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,11 @@ export async function dispatchAgent(
});
} else {
// Container start returned false — but the container may have
// actually started the agent (timeout race). DON'T roll back
// the bead to open. Leave it in_progress with the agent idle+hooked.
// If the agent truly failed: rehookOrphanedBeads recovers after 2 min.
// If the agent actually started: it works and calls gt_done normally.
query(
ctx.sql,
/* sql */ `
UPDATE ${agent_metadata}
SET ${agent_metadata.columns.status} = 'idle',
${agent_metadata.columns.last_activity_at} = ?
WHERE ${agent_metadata.bead_id} = ?
`,
[now(), agent.id]
);
// actually started the agent (timeout race). Leave the agent
// as 'working' so the reconciler doesn't treat it as lost.
// If the agent truly didn't start: reconcileAgents catches it
// after 90s of missing heartbeats and transitions to 'idle'.
// If the agent actually started: heartbeats keep it alive. (#1358)
ctx.emitEvent({
event: 'agent.dispatch_failed',
townId: ctx.townId,
Expand All @@ -185,7 +176,9 @@ export async function dispatchAgent(
return started;
} catch (err) {
console.error(`${LOG} dispatchAgent: failed for agent=${agent.id}:`, err);
Sentry.captureException(err, { extra: { agentId: agent.id, beadId: bead.bead_id } });
Sentry.captureException(err, {
extra: { agentId: agent.id, beadId: bead.bead_id },
});
try {
query(
ctx.sql,
Expand Down
Loading
Loading