diff --git a/apps/web/src/environments/runtime/connection.test.ts b/apps/web/src/environments/runtime/connection.test.ts index d42471b861..3861101491 100644 --- a/apps/web/src/environments/runtime/connection.test.ts +++ b/apps/web/src/environments/runtime/connection.test.ts @@ -6,10 +6,12 @@ import type { WsRpcClient } from "~/rpc/wsRpcClient"; function createTestClient(options?: { readonly getSnapshot?: () => Promise<{ readonly snapshotSequence: number }>; + readonly replayEvents?: () => Promise>; }) { const lifecycleListeners = new Set<(event: any) => void>(); const configListeners = new Set<(event: any) => void>(); const terminalListeners = new Set<(event: any) => void>(); + let domainResubscribe: (() => void) | undefined; const getSnapshot = vi.fn( options?.getSnapshot ?? @@ -20,6 +22,7 @@ function createTestClient(options?: { threads: [], }) as any), ); + const replayEvents = vi.fn(options?.replayEvents ?? (async () => [])); const client = { dispose: vi.fn(async () => undefined), @@ -49,8 +52,15 @@ function createTestClient(options?: { dispatchCommand: vi.fn(async () => undefined), getTurnDiff: vi.fn(async () => undefined), getFullThreadDiff: vi.fn(async () => undefined), - replayEvents: vi.fn(async () => []), - onDomainEvent: () => () => undefined, + replayEvents, + onDomainEvent: vi.fn((_: (event: any) => void, options?: { onResubscribe?: () => void }) => { + domainResubscribe = options?.onResubscribe; + return () => { + if (domainResubscribe === options?.onResubscribe) { + domainResubscribe = undefined; + } + }; + }), }, terminal: { open: vi.fn(async () => undefined), @@ -114,6 +124,9 @@ function createTestClient(options?: { }); } }, + triggerDomainResubscribe: () => { + domainResubscribe?.(); + }, }; } @@ -213,4 +226,63 @@ describe("createEnvironmentConnection", () => { await connection.dispose(); }); + + it("swallows replay recovery failures triggered by resubscribe", async () => { + const environmentId = EnvironmentId.makeUnsafe("env-1"); + const snapshotError = new Error("snapshot failed"); + let snapshotCalls = 0; + const { client, triggerDomainResubscribe } = createTestClient({ + getSnapshot: async () => { + snapshotCalls += 1; + if (snapshotCalls === 1) { + return { + snapshotSequence: 1, + projects: [], + threads: [], + } as any; + } + + throw snapshotError; + }, + replayEvents: async () => { + throw new Error("SocketCloseError: 1006"); + }, + }); + + const connection = createEnvironmentConnection({ + kind: "saved", + knownEnvironment: { + id: "env-1", + label: "Remote env", + source: "manual", + target: { + httpBaseUrl: "http://example.test", + wsBaseUrl: "ws://example.test", + }, + environmentId, + }, + client, + applyEventBatch: vi.fn(), + syncSnapshot: vi.fn(), + applyTerminalEvent: vi.fn(), + }); + + await Promise.resolve(); + await Promise.resolve(); + + const onUnhandledRejection = vi.fn(); + process.on("unhandledRejection", onUnhandledRejection); + + try { + triggerDomainResubscribe(); + await new Promise((resolve) => setTimeout(resolve, 0)); + await new Promise((resolve) => setTimeout(resolve, 0)); + } finally { + process.off("unhandledRejection", onUnhandledRejection); + } + + expect(onUnhandledRejection).not.toHaveBeenCalled(); + + await connection.dispose(); + }); }); diff --git a/apps/web/src/environments/runtime/connection.ts b/apps/web/src/environments/runtime/connection.ts index 692092527e..aa171d4e86 100644 --- a/apps/web/src/environments/runtime/connection.ts +++ b/apps/web/src/environments/runtime/connection.ts @@ -128,6 +128,10 @@ export function createEnvironmentConnection( queueMicrotask(flushPendingDomainEvents); }; + const scheduleReplayRecovery = (reason: "sequence-gap" | "resubscribe") => { + void runReplayRecovery(reason).catch(() => undefined); + }; + const runReplayRecovery = async (reason: "sequence-gap" | "resubscribe"): Promise => { if (!recovery.beginReplayRecovery(reason)) { return; @@ -172,7 +176,7 @@ export function createEnvironmentConnection( return; } } - void runReplayRecovery(reason); + scheduleReplayRecovery(reason); } else if (replayCompletion.shouldReplay && import.meta.env.MODE !== "test") { console.warn( "[orchestration-recovery]", @@ -198,7 +202,7 @@ export function createEnvironmentConnection( if (!disposed) { input.syncSnapshot(snapshot, environmentId); if (recovery.completeSnapshotRecovery(snapshot.snapshotSequence)) { - void runReplayRecovery("sequence-gap"); + scheduleReplayRecovery("sequence-gap"); } } } catch (error) { @@ -245,7 +249,7 @@ export function createEnvironmentConnection( } if (action === "recover") { flushPendingDomainEvents(); - void runReplayRecovery("sequence-gap"); + scheduleReplayRecovery("sequence-gap"); } }, { @@ -254,7 +258,7 @@ export function createEnvironmentConnection( return; } flushPendingDomainEvents(); - void runReplayRecovery("resubscribe"); + scheduleReplayRecovery("resubscribe"); }, }, ); diff --git a/apps/web/src/rpc/wsTransport.test.ts b/apps/web/src/rpc/wsTransport.test.ts index e8948d8a2e..fea4454cd6 100644 --- a/apps/web/src/rpc/wsTransport.test.ts +++ b/apps/web/src/rpc/wsTransport.test.ts @@ -707,6 +707,39 @@ describe("WsTransport", () => { await transport.dispose(); }); + it("logs a transport disconnect once even when multiple subscriptions fail together", async () => { + const transport = new WsTransport("ws://localhost:3020"); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => undefined); + + const unsubscribeA = transport.subscribe( + () => Stream.fail(new Error("SocketCloseError: 1006")), + vi.fn(), + { retryDelay: 10 }, + ); + const unsubscribeB = transport.subscribe( + () => Stream.fail(new Error("SocketCloseError: 1006")), + vi.fn(), + { retryDelay: 10 }, + ); + + await waitFor(() => { + expect(sockets).toHaveLength(1); + }); + + getSocket().open(); + + await waitFor(() => { + expect(warnSpy).toHaveBeenCalledTimes(1); + }); + expect(warnSpy).toHaveBeenCalledWith("WebSocket RPC subscription disconnected", { + error: "SocketCloseError: 1006", + }); + + unsubscribeA(); + unsubscribeB(); + await transport.dispose(); + }); + it("streams finite request events without re-subscribing", async () => { const transport = new WsTransport("ws://localhost:3020"); const listener = vi.fn(); diff --git a/apps/web/src/rpc/wsTransport.ts b/apps/web/src/rpc/wsTransport.ts index 0bcc8a8a50..851cc5046c 100644 --- a/apps/web/src/rpc/wsTransport.ts +++ b/apps/web/src/rpc/wsTransport.ts @@ -50,6 +50,7 @@ export class WsTransport { private readonly url: WsRpcProtocolSocketUrlProvider; private readonly lifecycleHandlers: WsProtocolLifecycleHandlers | undefined; private disposed = false; + private hasReportedTransportDisconnect = false; private reconnectChain: Promise = Promise.resolve(); private session: TransportSession; @@ -136,6 +137,7 @@ export class WsTransport { listener, () => active, () => { + this.hasReportedTransportDisconnect = false; hasReceivedValue = true; }, ); @@ -156,9 +158,12 @@ export class WsTransport { return; } - console.warn("WebSocket RPC subscription disconnected", { - error: formattedError, - }); + if (!this.hasReportedTransportDisconnect) { + console.warn("WebSocket RPC subscription disconnected", { + error: formattedError, + }); + } + this.hasReportedTransportDisconnect = true; await sleep(retryDelayMs); } }