From e79d43c80661f15db7d982bac56236ce31e03cb3 Mon Sep 17 00:00:00 2001 From: Fran Dias Date: Fri, 13 Mar 2026 22:51:53 -0400 Subject: [PATCH 1/7] capture abort signal before await to prevent race condition --- packages/typescript/ai-client/src/chat-client.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/typescript/ai-client/src/chat-client.ts b/packages/typescript/ai-client/src/chat-client.ts index 1a03af515..13154a6a1 100644 --- a/packages/typescript/ai-client/src/chat-client.ts +++ b/packages/typescript/ai-client/src/chat-client.ts @@ -572,6 +572,10 @@ export class ChatClient { this.setError(undefined) this.errorReportedGeneration = null this.abortController = new AbortController() + // Capture the signal immediately so that a concurrent stop() or + // sendMessage() that reassigns this.abortController cannot cause + // connect() to receive a stale or null signal. + const signal = this.abortController.signal // Reset pending tool executions for the new stream this.pendingToolExecutions.clear() let streamCompletedSuccessfully = false @@ -613,7 +617,7 @@ export class ChatClient { await this.connection.send( messages, mergedBody, - this.abortController.signal, + signal, ) // Wait for subscription loop to finish processing all chunks From e95122af0c636444f9838bf549b9f9e5add2e84c Mon Sep 17 00:00:00 2001 From: Fran Dias Date: Fri, 13 Mar 2026 23:24:42 -0400 Subject: [PATCH 2/7] add unit tests for abort signal race condition fix Co-Authored-By: Claude Opus 4.6 (1M context) --- .../ai-client/tests/chat-client-abort.test.ts | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/packages/typescript/ai-client/tests/chat-client-abort.test.ts b/packages/typescript/ai-client/tests/chat-client-abort.test.ts index 5853e75a2..e336bab06 100644 --- a/packages/typescript/ai-client/tests/chat-client-abort.test.ts +++ b/packages/typescript/ai-client/tests/chat-client-abort.test.ts @@ -310,4 +310,100 @@ describe('ChatClient - Abort Signal Handling', () => { // Each should be a different signal instance expect(abortSignals[0]).not.toBe(abortSignals[1]) }) + + it('should pass the original signal to connect() even if stop() is called during onResponse', async () => { + let signalPassedToConnect: AbortSignal | undefined + + const adapter: ConnectionAdapter = { + // eslint-disable-next-line @typescript-eslint/require-await + async *connect(_messages, _data, abortSignal) { + signalPassedToConnect = abortSignal + yield { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + } + }, + } + + const client = new ChatClient({ + connection: adapter, + onResponse: () => { + // Simulate a concurrent stop() during the onResponse callback, + // which sets this.abortController to null. Without the fix, + // the code would dereference this.abortController.signal after + // this point and crash with a null reference. + client.stop() + }, + }) + + await client.append({ + id: 'user-1', + role: 'user', + parts: [{ type: 'text', content: 'Hello' }], + createdAt: new Date(), + }) + + // The signal should still be a valid AbortSignal instance + // (captured before the await), not undefined/null + expect(signalPassedToConnect).toBeInstanceOf(AbortSignal) + }) + + it('should pass the original signal to connect() even if sendMessage() reassigns abortController during onResponse', async () => { + const signalsPassedToConnect: Array = [] + + const adapter: ConnectionAdapter = { + // eslint-disable-next-line @typescript-eslint/require-await + async *connect(_messages, _data, abortSignal) { + if (abortSignal) { + signalsPassedToConnect.push(abortSignal) + } + yield { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + } + }, + } + + let firstCall = true + const client = new ChatClient({ + connection: adapter, + onResponse: () => { + if (firstCall) { + firstCall = false + // Trigger a second message during onResponse callback. + // This queues a new streamResponse that would create a new + // AbortController, potentially overwriting this.abortController + // before the first connect() call reads the signal. + client.append({ + id: 'user-2', + role: 'user', + parts: [{ type: 'text', content: 'Second message' }], + createdAt: new Date(), + }) + } + }, + }) + + await client.append({ + id: 'user-1', + role: 'user', + parts: [{ type: 'text', content: 'Hello' }], + createdAt: new Date(), + }) + + // Wait for the queued second stream to complete + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Both calls should have received valid, distinct AbortSignal instances + expect(signalsPassedToConnect.length).toBe(2) + expect(signalsPassedToConnect[0]).toBeInstanceOf(AbortSignal) + expect(signalsPassedToConnect[1]).toBeInstanceOf(AbortSignal) + expect(signalsPassedToConnect[0]).not.toBe(signalsPassedToConnect[1]) + }) }) From fb3b4963b65d3f0b22679c67d92eca49e8a04012 Mon Sep 17 00:00:00 2001 From: Fran Dias Date: Fri, 13 Mar 2026 23:37:25 -0400 Subject: [PATCH 3/7] replace setTimeout with deterministic promise await in test Capture the nested append() promise and await it directly instead of relying on a fixed 50ms setTimeout. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../typescript/ai-client/tests/chat-client-abort.test.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/typescript/ai-client/tests/chat-client-abort.test.ts b/packages/typescript/ai-client/tests/chat-client-abort.test.ts index e336bab06..749ba5da3 100644 --- a/packages/typescript/ai-client/tests/chat-client-abort.test.ts +++ b/packages/typescript/ai-client/tests/chat-client-abort.test.ts @@ -353,6 +353,7 @@ describe('ChatClient - Abort Signal Handling', () => { it('should pass the original signal to connect() even if sendMessage() reassigns abortController during onResponse', async () => { const signalsPassedToConnect: Array = [] + let secondAppendPromise: Promise | undefined const adapter: ConnectionAdapter = { // eslint-disable-next-line @typescript-eslint/require-await @@ -380,7 +381,7 @@ describe('ChatClient - Abort Signal Handling', () => { // This queues a new streamResponse that would create a new // AbortController, potentially overwriting this.abortController // before the first connect() call reads the signal. - client.append({ + secondAppendPromise = client.append({ id: 'user-2', role: 'user', parts: [{ type: 'text', content: 'Second message' }], @@ -397,8 +398,8 @@ describe('ChatClient - Abort Signal Handling', () => { createdAt: new Date(), }) - // Wait for the queued second stream to complete - await new Promise((resolve) => setTimeout(resolve, 50)) + // Deterministically wait for the queued second stream + await secondAppendPromise // Both calls should have received valid, distinct AbortSignal instances expect(signalsPassedToConnect.length).toBe(2) From 51878a56e9fa49512ecf07463884dc9291853dab Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Thu, 7 May 2026 18:40:27 +1000 Subject: [PATCH 4/7] Added changeset --- .changeset/tricky-wings-sniff.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/tricky-wings-sniff.md diff --git a/.changeset/tricky-wings-sniff.md b/.changeset/tricky-wings-sniff.md new file mode 100644 index 000000000..79f8e7fce --- /dev/null +++ b/.changeset/tricky-wings-sniff.md @@ -0,0 +1,5 @@ +--- +'@tanstack/ai-client': patch +--- + +Fixes a race condition in ChatClient.streamResponse() where this.abortController.signal could reference a stale or null controller by the time it is passed to this.connection.connect() From 1d420c2d3e167ef667040ed4a95186f1530e4174 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 7 May 2026 08:41:41 +0000 Subject: [PATCH 5/7] ci: apply automated fixes --- packages/typescript/ai-client/src/chat-client.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/typescript/ai-client/src/chat-client.ts b/packages/typescript/ai-client/src/chat-client.ts index 13154a6a1..13c8a5aaf 100644 --- a/packages/typescript/ai-client/src/chat-client.ts +++ b/packages/typescript/ai-client/src/chat-client.ts @@ -614,11 +614,7 @@ export class ChatClient { const processingComplete = this.waitForProcessing() // Send through normalized connection (pushes chunks to subscription queue) - await this.connection.send( - messages, - mergedBody, - signal, - ) + await this.connection.send(messages, mergedBody, signal) // Wait for subscription loop to finish processing all chunks await processingComplete From f42d3b71a05c183e3873a3d3a98b0e8b15e043d8 Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Thu, 7 May 2026 18:53:26 +1000 Subject: [PATCH 6/7] test(ai-client): exercise actual reload() race + add asChunk casts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrites the second race-condition test to use reload() instead of a queued append() — append() early-returns when isLoading and queues via queuePostStreamAction, so it never reassigns this.abortController mid-flight. reload() calls cancelInFlightStream() synchronously then starts a new streamResponse(), which is the actual code path that triggers the race the fix protects against. Adds asChunk() casts so the new yields satisfy the strict AGUIEvent typing introduced by AG-UI core interop. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ai-client/tests/chat-client-abort.test.ts | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/packages/typescript/ai-client/tests/chat-client-abort.test.ts b/packages/typescript/ai-client/tests/chat-client-abort.test.ts index 749ba5da3..ae0840907 100644 --- a/packages/typescript/ai-client/tests/chat-client-abort.test.ts +++ b/packages/typescript/ai-client/tests/chat-client-abort.test.ts @@ -318,13 +318,13 @@ describe('ChatClient - Abort Signal Handling', () => { // eslint-disable-next-line @typescript-eslint/require-await async *connect(_messages, _data, abortSignal) { signalPassedToConnect = abortSignal - yield { + yield asChunk({ type: 'RUN_FINISHED', runId: 'run-1', model: 'test', timestamp: Date.now(), finishReason: 'stop', - } + }) }, } @@ -351,9 +351,9 @@ describe('ChatClient - Abort Signal Handling', () => { expect(signalPassedToConnect).toBeInstanceOf(AbortSignal) }) - it('should pass the original signal to connect() even if sendMessage() reassigns abortController during onResponse', async () => { + it('should pass the original signal to send() even if reload() reassigns abortController during onResponse', async () => { const signalsPassedToConnect: Array = [] - let secondAppendPromise: Promise | undefined + let reloadPromise: Promise | undefined const adapter: ConnectionAdapter = { // eslint-disable-next-line @typescript-eslint/require-await @@ -361,13 +361,13 @@ describe('ChatClient - Abort Signal Handling', () => { if (abortSignal) { signalsPassedToConnect.push(abortSignal) } - yield { + yield asChunk({ type: 'RUN_FINISHED', runId: 'run-1', model: 'test', timestamp: Date.now(), finishReason: 'stop', - } + }) }, } @@ -377,16 +377,12 @@ describe('ChatClient - Abort Signal Handling', () => { onResponse: () => { if (firstCall) { firstCall = false - // Trigger a second message during onResponse callback. - // This queues a new streamResponse that would create a new - // AbortController, potentially overwriting this.abortController - // before the first connect() call reads the signal. - secondAppendPromise = client.append({ - id: 'user-2', - role: 'user', - parts: [{ type: 'text', content: 'Second message' }], - createdAt: new Date(), - }) + // reload() synchronously aborts and nulls this.abortController via + // cancelInFlightStream(), then starts a fresh streamResponse that + // assigns a new AbortController. Without the fix, the first stream + // would re-read this.abortController.signal after this await and + // receive the *second* stream's signal instead of its own. + reloadPromise = client.reload() } }, }) @@ -398,13 +394,19 @@ describe('ChatClient - Abort Signal Handling', () => { createdAt: new Date(), }) - // Deterministically wait for the queued second stream - await secondAppendPromise + await reloadPromise - // Both calls should have received valid, distinct AbortSignal instances + // Both calls must have received distinct AbortSignal instances. + // Pre-fix, both would receive the second stream's signal because the + // first stream re-read this.abortController.signal after reload(). expect(signalsPassedToConnect.length).toBe(2) expect(signalsPassedToConnect[0]).toBeInstanceOf(AbortSignal) expect(signalsPassedToConnect[1]).toBeInstanceOf(AbortSignal) expect(signalsPassedToConnect[0]).not.toBe(signalsPassedToConnect[1]) + // Exactly one of the two should be aborted (the first stream's signal, + // aborted by cancelInFlightStream()); the other is fresh. Order-agnostic + // because microtask scheduling determines which connect() runs first. + const abortedCount = signalsPassedToConnect.filter((s) => s.aborted).length + expect(abortedCount).toBe(1) }) }) From 2aac5456415a337a11be219810f48017b025ac1a Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Thu, 7 May 2026 19:12:11 +1000 Subject: [PATCH 7/7] fix(ai-client): bail out when stream is cancelled during onResponse await MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Capturing the AbortController signal locally avoided the original null deref but exposed a latent deadlock: when stop() runs during the onResponse await, cancelInFlightStream() calls resolveProcessing() before waitForProcessing() has set processingResolve, so the call is a no-op. Post-fix, streamResponse no longer crashed on the now-null controller, reached waitForProcessing() (creating a fresh resolver nothing would resolve), and hung on `await processingComplete` — breaking the ai-react useChat unmount test. Add a `signal.aborted` check after the onResponse await to short-circuit cancelled or superseded streams cleanly, restoring main's pre-fix flow control without relying on a thrown TypeError. Update the two race tests to reflect the correct semantics: cancelled streams must not invoke the connection layer or surface errors. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../typescript/ai-client/src/chat-client.ts | 9 +++ .../ai-client/tests/chat-client-abort.test.ts | 60 ++++++++++--------- 2 files changed, 41 insertions(+), 28 deletions(-) diff --git a/packages/typescript/ai-client/src/chat-client.ts b/packages/typescript/ai-client/src/chat-client.ts index 13c8a5aaf..060547d08 100644 --- a/packages/typescript/ai-client/src/chat-client.ts +++ b/packages/typescript/ai-client/src/chat-client.ts @@ -587,6 +587,15 @@ export class ChatClient { // Call onResponse callback await this.callbacksRef.current.onResponse() + // If the stream was cancelled during the onResponse await (e.g. stop() + // from a callback or unmount, or reload() superseding this stream), + // bail out before allocating waitForProcessing() — otherwise the + // resolveProcessing() that ran during cancellation is a no-op and the + // await processingComplete below would deadlock. + if (signal.aborted) { + return false + } + // Merge body: base body + per-message body (per-message takes priority) // Include conversationId for server-side event correlation const mergedBody = { diff --git a/packages/typescript/ai-client/tests/chat-client-abort.test.ts b/packages/typescript/ai-client/tests/chat-client-abort.test.ts index ae0840907..71bf71522 100644 --- a/packages/typescript/ai-client/tests/chat-client-abort.test.ts +++ b/packages/typescript/ai-client/tests/chat-client-abort.test.ts @@ -311,13 +311,14 @@ describe('ChatClient - Abort Signal Handling', () => { expect(abortSignals[0]).not.toBe(abortSignals[1]) }) - it('should pass the original signal to connect() even if stop() is called during onResponse', async () => { - let signalPassedToConnect: AbortSignal | undefined + it('should resolve cleanly when stop() is called during onResponse', async () => { + let connectCalled = false + const errorSpy = vi.fn() const adapter: ConnectionAdapter = { // eslint-disable-next-line @typescript-eslint/require-await - async *connect(_messages, _data, abortSignal) { - signalPassedToConnect = abortSignal + async *connect(_messages, _data, _abortSignal) { + connectCalled = true yield asChunk({ type: 'RUN_FINISHED', runId: 'run-1', @@ -330,11 +331,12 @@ describe('ChatClient - Abort Signal Handling', () => { const client = new ChatClient({ connection: adapter, + onError: errorSpy, onResponse: () => { - // Simulate a concurrent stop() during the onResponse callback, - // which sets this.abortController to null. Without the fix, - // the code would dereference this.abortController.signal after - // this point and crash with a null reference. + // stop() during the onResponse await aborts the captured signal and + // nulls this.abortController. Pre-fix this dereferenced null and + // threw a TypeError; with the captured-signal fix and the post-await + // signal.aborted check, streamResponse short-circuits cleanly. client.stop() }, }) @@ -346,14 +348,18 @@ describe('ChatClient - Abort Signal Handling', () => { createdAt: new Date(), }) - // The signal should still be a valid AbortSignal instance - // (captured before the await), not undefined/null - expect(signalPassedToConnect).toBeInstanceOf(AbortSignal) + // Cancelled streams must not invoke the connection (no wasted request), + // surface no error to user code, and not deadlock the append() promise. + expect(connectCalled).toBe(false) + expect(errorSpy).not.toHaveBeenCalled() + expect(client.getError()).toBeUndefined() + expect(client.getIsLoading()).toBe(false) }) - it('should pass the original signal to send() even if reload() reassigns abortController during onResponse', async () => { + it('should resolve cleanly when reload() supersedes the stream during onResponse', async () => { const signalsPassedToConnect: Array = [] let reloadPromise: Promise | undefined + const errorSpy = vi.fn() const adapter: ConnectionAdapter = { // eslint-disable-next-line @typescript-eslint/require-await @@ -374,14 +380,16 @@ describe('ChatClient - Abort Signal Handling', () => { let firstCall = true const client = new ChatClient({ connection: adapter, + onError: errorSpy, onResponse: () => { if (firstCall) { firstCall = false - // reload() synchronously aborts and nulls this.abortController via - // cancelInFlightStream(), then starts a fresh streamResponse that - // assigns a new AbortController. Without the fix, the first stream - // would re-read this.abortController.signal after this await and - // receive the *second* stream's signal instead of its own. + // reload() aborts the in-flight stream's signal and starts a fresh + // streamResponse that assigns a new AbortController. Pre-fix, the + // first stream re-read this.abortController.signal after this + // await and would either crash or pass the second stream's signal + // to its own connect() call. With the fix, the first stream + // short-circuits because its captured signal was aborted. reloadPromise = client.reload() } }, @@ -396,17 +404,13 @@ describe('ChatClient - Abort Signal Handling', () => { await reloadPromise - // Both calls must have received distinct AbortSignal instances. - // Pre-fix, both would receive the second stream's signal because the - // first stream re-read this.abortController.signal after reload(). - expect(signalsPassedToConnect.length).toBe(2) + // Only the surviving (reload) stream invokes connect(); the cancelled + // first stream short-circuits before reaching the connection layer. + // Its signal must be fresh (not the aborted one from the cancelled stream). + expect(signalsPassedToConnect.length).toBe(1) expect(signalsPassedToConnect[0]).toBeInstanceOf(AbortSignal) - expect(signalsPassedToConnect[1]).toBeInstanceOf(AbortSignal) - expect(signalsPassedToConnect[0]).not.toBe(signalsPassedToConnect[1]) - // Exactly one of the two should be aborted (the first stream's signal, - // aborted by cancelInFlightStream()); the other is fresh. Order-agnostic - // because microtask scheduling determines which connect() runs first. - const abortedCount = signalsPassedToConnect.filter((s) => s.aborted).length - expect(abortedCount).toBe(1) + expect(signalsPassedToConnect[0]?.aborted).toBe(false) + expect(errorSpy).not.toHaveBeenCalled() + expect(client.getError()).toBeUndefined() }) })