diff --git a/.changeset/tricky-wings-sniff.md b/.changeset/tricky-wings-sniff.md new file mode 100644 index 000000000..79f8e7fce --- /dev/null +++ b/.changeset/tricky-wings-sniff.md @@ -0,0 +1,5 @@ +--- +'@tanstack/ai-client': patch +--- + +Fixes a race condition in ChatClient.streamResponse() where this.abortController.signal could reference a stale or null controller by the time it is passed to this.connection.connect() diff --git a/packages/typescript/ai-client/src/chat-client.ts b/packages/typescript/ai-client/src/chat-client.ts index 1a03af515..060547d08 100644 --- a/packages/typescript/ai-client/src/chat-client.ts +++ b/packages/typescript/ai-client/src/chat-client.ts @@ -572,6 +572,10 @@ export class ChatClient { this.setError(undefined) this.errorReportedGeneration = null this.abortController = new AbortController() + // Capture the signal immediately so that a concurrent stop() or + // sendMessage() that reassigns this.abortController cannot cause + // connect() to receive a stale or null signal. + const signal = this.abortController.signal // Reset pending tool executions for the new stream this.pendingToolExecutions.clear() let streamCompletedSuccessfully = false @@ -583,6 +587,15 @@ export class ChatClient { // Call onResponse callback await this.callbacksRef.current.onResponse() + // If the stream was cancelled during the onResponse await (e.g. stop() + // from a callback or unmount, or reload() superseding this stream), + // bail out before allocating waitForProcessing() — otherwise the + // resolveProcessing() that ran during cancellation is a no-op and the + // await processingComplete below would deadlock. + if (signal.aborted) { + return false + } + // Merge body: base body + per-message body (per-message takes priority) // Include conversationId for server-side event correlation const mergedBody = { @@ -610,11 +623,7 @@ export class ChatClient { const processingComplete = this.waitForProcessing() // Send through normalized connection (pushes chunks to subscription queue) - await this.connection.send( - messages, - mergedBody, - this.abortController.signal, - ) + await this.connection.send(messages, mergedBody, signal) // Wait for subscription loop to finish processing all chunks await processingComplete diff --git a/packages/typescript/ai-client/tests/chat-client-abort.test.ts b/packages/typescript/ai-client/tests/chat-client-abort.test.ts index 5853e75a2..71bf71522 100644 --- a/packages/typescript/ai-client/tests/chat-client-abort.test.ts +++ b/packages/typescript/ai-client/tests/chat-client-abort.test.ts @@ -310,4 +310,107 @@ describe('ChatClient - Abort Signal Handling', () => { // Each should be a different signal instance expect(abortSignals[0]).not.toBe(abortSignals[1]) }) + + it('should resolve cleanly when stop() is called during onResponse', async () => { + let connectCalled = false + const errorSpy = vi.fn() + + const adapter: ConnectionAdapter = { + // eslint-disable-next-line @typescript-eslint/require-await + async *connect(_messages, _data, _abortSignal) { + connectCalled = true + yield asChunk({ + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }) + }, + } + + const client = new ChatClient({ + connection: adapter, + onError: errorSpy, + onResponse: () => { + // stop() during the onResponse await aborts the captured signal and + // nulls this.abortController. Pre-fix this dereferenced null and + // threw a TypeError; with the captured-signal fix and the post-await + // signal.aborted check, streamResponse short-circuits cleanly. + client.stop() + }, + }) + + await client.append({ + id: 'user-1', + role: 'user', + parts: [{ type: 'text', content: 'Hello' }], + createdAt: new Date(), + }) + + // Cancelled streams must not invoke the connection (no wasted request), + // surface no error to user code, and not deadlock the append() promise. + expect(connectCalled).toBe(false) + expect(errorSpy).not.toHaveBeenCalled() + expect(client.getError()).toBeUndefined() + expect(client.getIsLoading()).toBe(false) + }) + + it('should resolve cleanly when reload() supersedes the stream during onResponse', async () => { + const signalsPassedToConnect: Array = [] + let reloadPromise: Promise | undefined + const errorSpy = vi.fn() + + const adapter: ConnectionAdapter = { + // eslint-disable-next-line @typescript-eslint/require-await + async *connect(_messages, _data, abortSignal) { + if (abortSignal) { + signalsPassedToConnect.push(abortSignal) + } + yield asChunk({ + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + }) + }, + } + + let firstCall = true + const client = new ChatClient({ + connection: adapter, + onError: errorSpy, + onResponse: () => { + if (firstCall) { + firstCall = false + // reload() aborts the in-flight stream's signal and starts a fresh + // streamResponse that assigns a new AbortController. Pre-fix, the + // first stream re-read this.abortController.signal after this + // await and would either crash or pass the second stream's signal + // to its own connect() call. With the fix, the first stream + // short-circuits because its captured signal was aborted. + reloadPromise = client.reload() + } + }, + }) + + await client.append({ + id: 'user-1', + role: 'user', + parts: [{ type: 'text', content: 'Hello' }], + createdAt: new Date(), + }) + + await reloadPromise + + // Only the surviving (reload) stream invokes connect(); the cancelled + // first stream short-circuits before reaching the connection layer. + // Its signal must be fresh (not the aborted one from the cancelled stream). + expect(signalsPassedToConnect.length).toBe(1) + expect(signalsPassedToConnect[0]).toBeInstanceOf(AbortSignal) + expect(signalsPassedToConnect[0]?.aborted).toBe(false) + expect(errorSpy).not.toHaveBeenCalled() + expect(client.getError()).toBeUndefined() + }) })