@@ -6,10 +6,12 @@ import type { WsRpcClient } from "~/rpc/wsRpcClient";
66
77function createTestClient ( options ?: {
88 readonly getSnapshot ?: ( ) => Promise < { readonly snapshotSequence : number } > ;
9+ readonly replayEvents ?: ( ) => Promise < ReadonlyArray < any > > ;
910} ) {
1011 const lifecycleListeners = new Set < ( event : any ) => void > ( ) ;
1112 const configListeners = new Set < ( event : any ) => void > ( ) ;
1213 const terminalListeners = new Set < ( event : any ) => void > ( ) ;
14+ let domainResubscribe : ( ( ) => void ) | undefined ;
1315
1416 const getSnapshot = vi . fn (
1517 options ?. getSnapshot ??
@@ -20,6 +22,7 @@ function createTestClient(options?: {
2022 threads : [ ] ,
2123 } ) as any ) ,
2224 ) ;
25+ const replayEvents = vi . fn ( options ?. replayEvents ?? ( async ( ) => [ ] ) ) ;
2326
2427 const client = {
2528 dispose : vi . fn ( async ( ) => undefined ) ,
@@ -49,8 +52,15 @@ function createTestClient(options?: {
4952 dispatchCommand : vi . fn ( async ( ) => undefined ) ,
5053 getTurnDiff : vi . fn ( async ( ) => undefined ) ,
5154 getFullThreadDiff : vi . fn ( async ( ) => undefined ) ,
52- replayEvents : vi . fn ( async ( ) => [ ] ) ,
53- onDomainEvent : ( ) => ( ) => undefined ,
55+ replayEvents,
56+ onDomainEvent : vi . fn ( ( _ : ( event : any ) => void , options ?: { onResubscribe ?: ( ) => void } ) => {
57+ domainResubscribe = options ?. onResubscribe ;
58+ return ( ) => {
59+ if ( domainResubscribe === options ?. onResubscribe ) {
60+ domainResubscribe = undefined ;
61+ }
62+ } ;
63+ } ) ,
5464 } ,
5565 terminal : {
5666 open : vi . fn ( async ( ) => undefined ) ,
@@ -114,6 +124,9 @@ function createTestClient(options?: {
114124 } ) ;
115125 }
116126 } ,
127+ triggerDomainResubscribe : ( ) => {
128+ domainResubscribe ?.( ) ;
129+ } ,
117130 } ;
118131}
119132
@@ -213,4 +226,63 @@ describe("createEnvironmentConnection", () => {
213226
214227 await connection . dispose ( ) ;
215228 } ) ;
229+
230+ it ( "swallows replay recovery failures triggered by resubscribe" , async ( ) => {
231+ const environmentId = EnvironmentId . makeUnsafe ( "env-1" ) ;
232+ const snapshotError = new Error ( "snapshot failed" ) ;
233+ let snapshotCalls = 0 ;
234+ const { client, triggerDomainResubscribe } = createTestClient ( {
235+ getSnapshot : async ( ) => {
236+ snapshotCalls += 1 ;
237+ if ( snapshotCalls === 1 ) {
238+ return {
239+ snapshotSequence : 1 ,
240+ projects : [ ] ,
241+ threads : [ ] ,
242+ } as any ;
243+ }
244+
245+ throw snapshotError ;
246+ } ,
247+ replayEvents : async ( ) => {
248+ throw new Error ( "SocketCloseError: 1006" ) ;
249+ } ,
250+ } ) ;
251+
252+ const connection = createEnvironmentConnection ( {
253+ kind : "saved" ,
254+ knownEnvironment : {
255+ id : "env-1" ,
256+ label : "Remote env" ,
257+ source : "manual" ,
258+ target : {
259+ httpBaseUrl : "http://example.test" ,
260+ wsBaseUrl : "ws://example.test" ,
261+ } ,
262+ environmentId,
263+ } ,
264+ client,
265+ applyEventBatch : vi . fn ( ) ,
266+ syncSnapshot : vi . fn ( ) ,
267+ applyTerminalEvent : vi . fn ( ) ,
268+ } ) ;
269+
270+ await Promise . resolve ( ) ;
271+ await Promise . resolve ( ) ;
272+
273+ const onUnhandledRejection = vi . fn ( ) ;
274+ process . on ( "unhandledRejection" , onUnhandledRejection ) ;
275+
276+ try {
277+ triggerDomainResubscribe ( ) ;
278+ await new Promise ( ( resolve ) => setTimeout ( resolve , 0 ) ) ;
279+ await new Promise ( ( resolve ) => setTimeout ( resolve , 0 ) ) ;
280+ } finally {
281+ process . off ( "unhandledRejection" , onUnhandledRejection ) ;
282+ }
283+
284+ expect ( onUnhandledRejection ) . not . toHaveBeenCalled ( ) ;
285+
286+ await connection . dispose ( ) ;
287+ } ) ;
216288} ) ;
0 commit comments