Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tricky-wings-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/ai-client': patch
---

Fixes a race condition in ChatClient.streamResponse() where this.abortController.signal could reference a stale or null controller by the time it is passed to this.connection.connect()
19 changes: 14 additions & 5 deletions packages/typescript/ai-client/src/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ export class ChatClient {
this.setError(undefined)
this.errorReportedGeneration = null
this.abortController = new AbortController()
// Capture the signal immediately so that a concurrent stop() or
// sendMessage() that reassigns this.abortController cannot cause
// connect() to receive a stale or null signal.
const signal = this.abortController.signal
// Reset pending tool executions for the new stream
this.pendingToolExecutions.clear()
let streamCompletedSuccessfully = false
Expand All @@ -583,6 +587,15 @@ export class ChatClient {
// Call onResponse callback
await this.callbacksRef.current.onResponse()

// If the stream was cancelled during the onResponse await (e.g. stop()
// from a callback or unmount, or reload() superseding this stream),
// bail out before allocating waitForProcessing() — otherwise the
// resolveProcessing() that ran during cancellation is a no-op and the
// await processingComplete below would deadlock.
if (signal.aborted) {
return false
}

// Merge body: base body + per-message body (per-message takes priority)
// Include conversationId for server-side event correlation
const mergedBody = {
Expand Down Expand Up @@ -610,11 +623,7 @@ export class ChatClient {
const processingComplete = this.waitForProcessing()

// Send through normalized connection (pushes chunks to subscription queue)
await this.connection.send(
messages,
mergedBody,
this.abortController.signal,
)
await this.connection.send(messages, mergedBody, signal)

// Wait for subscription loop to finish processing all chunks
await processingComplete
Expand Down
103 changes: 103 additions & 0 deletions packages/typescript/ai-client/tests/chat-client-abort.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,107 @@ describe('ChatClient - Abort Signal Handling', () => {
// Each should be a different signal instance
expect(abortSignals[0]).not.toBe(abortSignals[1])
})

it('should resolve cleanly when stop() is called during onResponse', async () => {
let connectCalled = false
const errorSpy = vi.fn()

const adapter: ConnectionAdapter = {
// eslint-disable-next-line @typescript-eslint/require-await
async *connect(_messages, _data, _abortSignal) {
connectCalled = true
yield asChunk({
type: 'RUN_FINISHED',
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop',
})
},
}

const client = new ChatClient({
connection: adapter,
onError: errorSpy,
onResponse: () => {
// stop() during the onResponse await aborts the captured signal and
// nulls this.abortController. Pre-fix this dereferenced null and
// threw a TypeError; with the captured-signal fix and the post-await
// signal.aborted check, streamResponse short-circuits cleanly.
client.stop()
},
})

await client.append({
id: 'user-1',
role: 'user',
parts: [{ type: 'text', content: 'Hello' }],
createdAt: new Date(),
})

// Cancelled streams must not invoke the connection (no wasted request),
// surface no error to user code, and not deadlock the append() promise.
expect(connectCalled).toBe(false)
expect(errorSpy).not.toHaveBeenCalled()
expect(client.getError()).toBeUndefined()
expect(client.getIsLoading()).toBe(false)
})

it('should resolve cleanly when reload() supersedes the stream during onResponse', async () => {
const signalsPassedToConnect: Array<AbortSignal> = []
let reloadPromise: Promise<unknown> | undefined
const errorSpy = vi.fn()

const adapter: ConnectionAdapter = {
// eslint-disable-next-line @typescript-eslint/require-await
async *connect(_messages, _data, abortSignal) {
if (abortSignal) {
signalsPassedToConnect.push(abortSignal)
}
yield asChunk({
type: 'RUN_FINISHED',
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop',
})
},
}

let firstCall = true
const client = new ChatClient({
connection: adapter,
onError: errorSpy,
onResponse: () => {
if (firstCall) {
firstCall = false
// reload() aborts the in-flight stream's signal and starts a fresh
// streamResponse that assigns a new AbortController. Pre-fix, the
// first stream re-read this.abortController.signal after this
// await and would either crash or pass the second stream's signal
// to its own connect() call. With the fix, the first stream
// short-circuits because its captured signal was aborted.
reloadPromise = client.reload()
}
},
})

await client.append({
id: 'user-1',
role: 'user',
parts: [{ type: 'text', content: 'Hello' }],
createdAt: new Date(),
})

await reloadPromise

Comment thread
coderabbitai[bot] marked this conversation as resolved.
// Only the surviving (reload) stream invokes connect(); the cancelled
// first stream short-circuits before reaching the connection layer.
// Its signal must be fresh (not the aborted one from the cancelled stream).
expect(signalsPassedToConnect.length).toBe(1)
expect(signalsPassedToConnect[0]).toBeInstanceOf(AbortSignal)
expect(signalsPassedToConnect[0]?.aborted).toBe(false)
expect(errorSpy).not.toHaveBeenCalled()
expect(client.getError()).toBeUndefined()
})
})
Loading