Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 74 additions & 2 deletions apps/web/src/environments/runtime/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import type { WsRpcClient } from "~/rpc/wsRpcClient";

function createTestClient(options?: {
readonly getSnapshot?: () => Promise<{ readonly snapshotSequence: number }>;
readonly replayEvents?: () => Promise<ReadonlyArray<any>>;
}) {
const lifecycleListeners = new Set<(event: any) => void>();
const configListeners = new Set<(event: any) => void>();
const terminalListeners = new Set<(event: any) => void>();
let domainResubscribe: (() => void) | undefined;

const getSnapshot = vi.fn(
options?.getSnapshot ??
Expand All @@ -20,6 +22,7 @@ function createTestClient(options?: {
threads: [],
}) as any),
);
const replayEvents = vi.fn(options?.replayEvents ?? (async () => []));

const client = {
dispose: vi.fn(async () => undefined),
Expand Down Expand Up @@ -49,8 +52,15 @@ function createTestClient(options?: {
dispatchCommand: vi.fn(async () => undefined),
getTurnDiff: vi.fn(async () => undefined),
getFullThreadDiff: vi.fn(async () => undefined),
replayEvents: vi.fn(async () => []),
onDomainEvent: () => () => undefined,
replayEvents,
onDomainEvent: vi.fn((_: (event: any) => void, options?: { onResubscribe?: () => void }) => {
domainResubscribe = options?.onResubscribe;
return () => {
if (domainResubscribe === options?.onResubscribe) {
domainResubscribe = undefined;
}
};
}),
},
terminal: {
open: vi.fn(async () => undefined),
Expand Down Expand Up @@ -114,6 +124,9 @@ function createTestClient(options?: {
});
}
},
triggerDomainResubscribe: () => {
domainResubscribe?.();
},
};
}

Expand Down Expand Up @@ -213,4 +226,63 @@ describe("createEnvironmentConnection", () => {

await connection.dispose();
});

it("swallows replay recovery failures triggered by resubscribe", async () => {
const environmentId = EnvironmentId.makeUnsafe("env-1");
const snapshotError = new Error("snapshot failed");
let snapshotCalls = 0;
const { client, triggerDomainResubscribe } = createTestClient({
getSnapshot: async () => {
snapshotCalls += 1;
if (snapshotCalls === 1) {
return {
snapshotSequence: 1,
projects: [],
threads: [],
} as any;
}

throw snapshotError;
},
replayEvents: async () => {
throw new Error("SocketCloseError: 1006");
},
});

const connection = createEnvironmentConnection({
kind: "saved",
knownEnvironment: {
id: "env-1",
label: "Remote env",
source: "manual",
target: {
httpBaseUrl: "http://example.test",
wsBaseUrl: "ws://example.test",
},
environmentId,
},
client,
applyEventBatch: vi.fn(),
syncSnapshot: vi.fn(),
applyTerminalEvent: vi.fn(),
});

await Promise.resolve();
await Promise.resolve();

const onUnhandledRejection = vi.fn();
process.on("unhandledRejection", onUnhandledRejection);

try {
triggerDomainResubscribe();
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));
} finally {
process.off("unhandledRejection", onUnhandledRejection);
}

expect(onUnhandledRejection).not.toHaveBeenCalled();

await connection.dispose();
});
});
12 changes: 8 additions & 4 deletions apps/web/src/environments/runtime/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ export function createEnvironmentConnection(
queueMicrotask(flushPendingDomainEvents);
};

const scheduleReplayRecovery = (reason: "sequence-gap" | "resubscribe") => {
void runReplayRecovery(reason).catch(() => undefined);
};

const runReplayRecovery = async (reason: "sequence-gap" | "resubscribe"): Promise<void> => {
if (!recovery.beginReplayRecovery(reason)) {
return;
Expand Down Expand Up @@ -172,7 +176,7 @@ export function createEnvironmentConnection(
return;
}
}
void runReplayRecovery(reason);
scheduleReplayRecovery(reason);
} else if (replayCompletion.shouldReplay && import.meta.env.MODE !== "test") {
console.warn(
"[orchestration-recovery]",
Expand All @@ -198,7 +202,7 @@ export function createEnvironmentConnection(
if (!disposed) {
input.syncSnapshot(snapshot, environmentId);
if (recovery.completeSnapshotRecovery(snapshot.snapshotSequence)) {
void runReplayRecovery("sequence-gap");
scheduleReplayRecovery("sequence-gap");
}
}
} catch (error) {
Expand Down Expand Up @@ -245,7 +249,7 @@ export function createEnvironmentConnection(
}
if (action === "recover") {
flushPendingDomainEvents();
void runReplayRecovery("sequence-gap");
scheduleReplayRecovery("sequence-gap");
}
},
{
Expand All @@ -254,7 +258,7 @@ export function createEnvironmentConnection(
return;
}
flushPendingDomainEvents();
void runReplayRecovery("resubscribe");
scheduleReplayRecovery("resubscribe");
},
},
);
Expand Down
33 changes: 33 additions & 0 deletions apps/web/src/rpc/wsTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,39 @@ describe("WsTransport", () => {
await transport.dispose();
});

it("logs a transport disconnect once even when multiple subscriptions fail together", async () => {
const transport = new WsTransport("ws://localhost:3020");
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => undefined);

const unsubscribeA = transport.subscribe(
() => Stream.fail(new Error("SocketCloseError: 1006")),
vi.fn(),
{ retryDelay: 10 },
);
const unsubscribeB = transport.subscribe(
() => Stream.fail(new Error("SocketCloseError: 1006")),
vi.fn(),
{ retryDelay: 10 },
);

await waitFor(() => {
expect(sockets).toHaveLength(1);
});

getSocket().open();

await waitFor(() => {
expect(warnSpy).toHaveBeenCalledTimes(1);
});
expect(warnSpy).toHaveBeenCalledWith("WebSocket RPC subscription disconnected", {
error: "SocketCloseError: 1006",
});

unsubscribeA();
unsubscribeB();
await transport.dispose();
});

it("streams finite request events without re-subscribing", async () => {
const transport = new WsTransport("ws://localhost:3020");
const listener = vi.fn();
Expand Down
11 changes: 8 additions & 3 deletions apps/web/src/rpc/wsTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class WsTransport {
private readonly url: WsRpcProtocolSocketUrlProvider;
private readonly lifecycleHandlers: WsProtocolLifecycleHandlers | undefined;
private disposed = false;
private hasReportedTransportDisconnect = false;
private reconnectChain: Promise<void> = Promise.resolve();
private session: TransportSession;

Expand Down Expand Up @@ -136,6 +137,7 @@ export class WsTransport {
listener,
() => active,
() => {
this.hasReportedTransportDisconnect = false;
hasReceivedValue = true;
},
);
Expand All @@ -156,9 +158,12 @@ export class WsTransport {
return;
}

console.warn("WebSocket RPC subscription disconnected", {
error: formattedError,
});
if (!this.hasReportedTransportDisconnect) {
console.warn("WebSocket RPC subscription disconnected", {
error: formattedError,
});
}
this.hasReportedTransportDisconnect = true;
await sleep(retryDelayMs);
}
}
Expand Down
Loading