From bd550bf2a61ae4f77bf0238aef7e6ea633beaa47 Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Tue, 5 May 2026 11:48:49 +1000 Subject: [PATCH 1/9] feat: streaming structured output (chat outputSchema + stream:true) - @tanstack/ai: typed StructuredOutputStream with terminal CUSTOM structured-output.complete event { object, raw, reasoning? }; optional TextAdapter.structuredOutputStream + activity-layer fallback; orchestrator hardening (always-finalize, typed RUN_ERROR with runId/model/timestamp, exactly-one-terminal-pair on tools branch, sync pre-flight errors, UI->Model message conversion on no-tools path). - @tanstack/ai-openrouter: native structuredOutputStream via single stream:true + response_format:json_schema request; always-finalize on upstream close; empty-response and parse-error surface as typed RUN_ERROR; in-stream provider errors terminate the run; chain-of-thought reasoning threaded through the final CUSTOM event. - E2E: structured-output-stream feature in matrix with happy-path + abort specs; useChat onCustomEvent/onChunk wiring exposes CUSTOM payload + delta count to DOM. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../streaming-structured-output-chat.md | 5 + .../streaming-structured-output-openrouter.md | 5 + .../ai-openrouter/src/adapters/text.ts | 276 ++++++++++++- .../tests/openrouter-adapter.test.ts | 387 ++++++++++++++++++ .../ai/src/activities/chat/adapter.ts | 17 + .../ai/src/activities/chat/index.ts | 324 ++++++++++++++- packages/typescript/ai/src/types.ts | 49 +++ testing/e2e/README.md | 41 +- .../structured-output-stream/abort.json | 16 + .../structured-output-stream/basic.json | 12 + testing/e2e/src/components/ChatUI.tsx | 23 ++ testing/e2e/src/lib/feature-support.ts | 4 + testing/e2e/src/lib/features.ts | 4 + testing/e2e/src/lib/types.ts | 2 + testing/e2e/src/routes/$provider/$feature.tsx | 22 + testing/e2e/src/routes/api.chat.ts | 34 +- .../tests/structured-output-stream.spec.ts | 87 ++++ 17 files changed, 1265 insertions(+), 43 deletions(-) create mode 100644 .changeset/streaming-structured-output-chat.md create mode 100644 .changeset/streaming-structured-output-openrouter.md create mode 100644 testing/e2e/fixtures/structured-output-stream/abort.json create mode 100644 testing/e2e/fixtures/structured-output-stream/basic.json create mode 100644 testing/e2e/tests/structured-output-stream.spec.ts diff --git a/.changeset/streaming-structured-output-chat.md b/.changeset/streaming-structured-output-chat.md new file mode 100644 index 000000000..68aa7302f --- /dev/null +++ b/.changeset/streaming-structured-output-chat.md @@ -0,0 +1,5 @@ +--- +'@tanstack/ai': minor +--- + +feat: `chat({ outputSchema, stream: true })` returns `AsyncIterable` with raw JSON deltas plus a final `CUSTOM` `structured-output.complete` event carrying the validated parsed object. The existing `chat({ outputSchema })` (non-streaming) path is unchanged. Adapters expose this via a new optional `structuredOutputStream` method on `TextAdapter`; `BaseTextAdapter` provides a default that wraps the non-streaming `structuredOutput` so adapters without native streaming JSON support still satisfy the new combination. diff --git a/.changeset/streaming-structured-output-openrouter.md b/.changeset/streaming-structured-output-openrouter.md new file mode 100644 index 000000000..2b0b7fe2b --- /dev/null +++ b/.changeset/streaming-structured-output-openrouter.md @@ -0,0 +1,5 @@ +--- +'@tanstack/ai-openrouter': minor +--- + +feat: native streaming structured output. `OpenRouterTextAdapter.structuredOutputStream()` issues a single request with `stream: true` + `response_format: { type: 'json_schema', strict: true }`, surfacing JSON deltas as `TEXT_MESSAGE_CONTENT` chunks and a final `CUSTOM` `structured-output.complete` event with the parsed object — replacing the previous two-request (streamed text → non-streamed JSON) flow when used with `chat({ outputSchema, stream: true })`. diff --git a/packages/typescript/ai-openrouter/src/adapters/text.ts b/packages/typescript/ai-openrouter/src/adapters/text.ts index 29427171c..8ef62a882 100644 --- a/packages/typescript/ai-openrouter/src/adapters/text.ts +++ b/packages/typescript/ai-openrouter/src/adapters/text.ts @@ -81,7 +81,7 @@ interface AGUIState { hasEmittedRunStarted: boolean hasEmittedTextMessageStart: boolean hasEmittedTextMessageEnd: boolean - hasEmittedRunFinished: boolean + hasFinalizedChoice: boolean hasEmittedStepStarted: boolean deferredUsage: | { promptTokens: number; completionTokens: number; totalTokens: number } @@ -131,7 +131,7 @@ export class OpenRouterTextAdapter< hasEmittedRunStarted: false, hasEmittedTextMessageStart: false, hasEmittedTextMessageEnd: false, - hasEmittedRunFinished: false, + hasFinalizedChoice: false, hasEmittedStepStarted: false, deferredUsage: undefined, computedFinishReason: undefined, @@ -204,7 +204,7 @@ export class OpenRouterTextAdapter< // Emit RUN_FINISHED after the stream ends so we capture usage from // any chunk (some SDKs send usage on a separate trailing chunk). - if (aguiState.hasEmittedRunFinished && aguiState.computedFinishReason) { + if (aguiState.hasFinalizedChoice && aguiState.computedFinishReason) { yield asChunk({ type: 'RUN_FINISHED', runId: aguiState.runId, @@ -263,6 +263,272 @@ export class OpenRouterTextAdapter< } } + async *structuredOutputStream( + options: StructuredOutputOptions>, + ): AsyncIterable { + const { chatOptions, outputSchema } = options + const { logger } = chatOptions + const timestamp = Date.now() + const toolCallBuffers = new Map() + let accumulatedReasoning = '' + let accumulatedContent = '' + let responseId: string | null = null + let currentModel = chatOptions.model + const aguiState: AGUIState = { + runId: chatOptions.runId ?? this.generateId(), + threadId: chatOptions.threadId ?? this.generateId(), + messageId: this.generateId(), + stepId: null, + reasoningMessageId: null, + hasClosedReasoning: false, + hasEmittedRunStarted: false, + hasEmittedTextMessageStart: false, + hasEmittedTextMessageEnd: false, + hasFinalizedChoice: false, + hasEmittedStepStarted: false, + deferredUsage: undefined, + computedFinishReason: undefined, + } + + const strictSchema = convertSchemaToJsonSchema(outputSchema, { + forStructuredOutput: true, + }) + + try { + // Strip tools — structured-output mode shouldn't mix tool calls into the + // request body. Matches the non-streaming `structuredOutput` behavior. + const { tools: _tools, ...baseParams } = + this.mapTextOptionsToSDK(chatOptions) + logger.request( + `activity=structured-stream provider=openrouter model=${this.model} messages=${chatOptions.messages.length} stream=true`, + { provider: 'openrouter', model: this.model }, + ) + const stream = await this.client.chat.send( + { + chatRequest: { + ...baseParams, + stream: true, + responseFormat: { + type: 'json_schema', + jsonSchema: { + name: 'structured_output', + schema: strictSchema, + strict: true, + }, + }, + }, + }, + { signal: chatOptions.request?.signal }, + ) + + for await (const chunk of stream) { + logger.provider(`provider=openrouter`, { chunk }) + if (chunk.id) responseId = chunk.id + if (chunk.model) currentModel = chunk.model + + if (!aguiState.hasEmittedRunStarted) { + aguiState.hasEmittedRunStarted = true + yield asChunk({ + type: 'RUN_STARTED', + runId: aguiState.runId, + threadId: aguiState.threadId, + model: currentModel || chatOptions.model, + timestamp, + }) + } + + if (chunk.error) { + // Provider error mid-stream is terminal: emit RUN_ERROR and stop. + // Continuing risks emitting RUN_FINISHED after RUN_ERROR. + yield asChunk({ + type: 'RUN_ERROR', + runId: aguiState.runId, + model: currentModel || chatOptions.model, + timestamp, + message: chunk.error.message || 'Unknown error', + code: String(chunk.error.code), + error: { + message: chunk.error.message || 'Unknown error', + code: String(chunk.error.code), + }, + }) + return + } + + for (const choice of chunk.choices) { + yield* this.processChoice( + choice, + toolCallBuffers, + { + id: responseId || this.generateId(), + model: currentModel, + timestamp, + }, + { reasoning: accumulatedReasoning, content: accumulatedContent }, + (r, c) => { + accumulatedReasoning = r + accumulatedContent = c + }, + chunk.usage, + aguiState, + ) + } + } + + // Finalize the run unconditionally. If the upstream stream closed + // without a finishReason (truncation, transport drop), processChoice + // never closed reasoning/text or computed a finish reason — we still + // owe consumers a CUSTOM + RUN_FINISHED (or RUN_ERROR), never silence. + const resolvedModel = currentModel || chatOptions.model + + if (aguiState.reasoningMessageId && !aguiState.hasClosedReasoning) { + aguiState.hasClosedReasoning = true + yield asChunk({ + type: 'REASONING_MESSAGE_END', + messageId: aguiState.reasoningMessageId, + model: resolvedModel, + timestamp, + }) + yield asChunk({ + type: 'REASONING_END', + messageId: aguiState.reasoningMessageId, + model: resolvedModel, + timestamp, + }) + if (aguiState.stepId) { + yield asChunk({ + type: 'STEP_FINISHED', + stepName: aguiState.stepId, + stepId: aguiState.stepId, + model: resolvedModel, + timestamp, + content: accumulatedReasoning, + }) + } + } + + if ( + aguiState.hasEmittedTextMessageStart && + !aguiState.hasEmittedTextMessageEnd + ) { + aguiState.hasEmittedTextMessageEnd = true + yield asChunk({ + type: 'TEXT_MESSAGE_END', + messageId: aguiState.messageId, + model: resolvedModel, + timestamp, + }) + } + + if (!accumulatedContent) { + // Mirrors the non-streaming `structuredOutput` empty-content error so + // refused/truncated responses surface as failures, not `null` data. + const message = 'Structured output response contained no content' + logger.errors(message, { + source: 'openrouter.structuredOutputStream', + }) + yield asChunk({ + type: 'RUN_ERROR', + runId: aguiState.runId, + model: resolvedModel, + timestamp, + message, + code: 'empty-response', + error: { message, code: 'empty-response' }, + }) + return + } + + let parsed: unknown + try { + parsed = JSON.parse(accumulatedContent) + } catch (parseError) { + const message = + parseError instanceof SyntaxError + ? `Failed to parse structured output as JSON: ${parseError.message}` + : 'Failed to parse structured output as JSON' + logger.errors(message, { + source: 'openrouter.structuredOutputStream', + error: parseError, + }) + yield asChunk({ + type: 'RUN_ERROR', + runId: aguiState.runId, + model: resolvedModel, + timestamp, + message, + code: 'parse-error', + error: { message, code: 'parse-error' }, + }) + return + } + + yield asChunk({ + type: 'CUSTOM', + name: 'structured-output.complete', + value: { + object: parsed, + raw: accumulatedContent, + // Surface accumulated chain-of-thought (if any) on the terminal + // event so consumers that only subscribe to the final result can + // still recover what the model thought through to get there. + ...(accumulatedReasoning ? { reasoning: accumulatedReasoning } : {}), + }, + model: resolvedModel, + timestamp, + }) + + yield asChunk({ + type: 'RUN_FINISHED', + runId: aguiState.runId, + threadId: aguiState.threadId, + model: resolvedModel, + timestamp, + usage: aguiState.deferredUsage, + finishReason: aguiState.computedFinishReason ?? 'stop', + }) + } catch (error) { + logger.errors('openrouter.structuredOutputStream fatal', { + error, + source: 'openrouter.structuredOutputStream', + }) + if (!aguiState.hasEmittedRunStarted) { + aguiState.hasEmittedRunStarted = true + yield asChunk({ + type: 'RUN_STARTED', + runId: aguiState.runId, + threadId: aguiState.threadId, + model: chatOptions.model, + timestamp, + }) + } + + if (error instanceof RequestAbortedError) { + yield asChunk({ + type: 'RUN_ERROR', + runId: aguiState.runId, + model: chatOptions.model, + timestamp, + message: 'Request aborted', + code: 'aborted', + error: { message: 'Request aborted', code: 'aborted' }, + }) + return + } + + yield asChunk({ + type: 'RUN_ERROR', + runId: aguiState.runId, + model: chatOptions.model, + timestamp, + message: (error as Error).message || 'Unknown error', + error: { + message: (error as Error).message || 'Unknown error', + }, + }) + } + } + async structuredOutput( options: StructuredOutputOptions>, ): Promise> { @@ -578,8 +844,8 @@ export class OpenRouterTextAdapter< // send two chunks with finishReason (one for the finish, one carrying // usage data). Without this guard TEXT_MESSAGE_END and RUN_FINISHED // would be emitted twice. - if (!aguiState.hasEmittedRunFinished) { - aguiState.hasEmittedRunFinished = true + if (!aguiState.hasFinalizedChoice) { + aguiState.hasFinalizedChoice = true // Emit all completed tool calls when finish reason indicates tool usage if (finishReason === 'tool_calls' || toolCallBuffers.size > 0) { diff --git a/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts b/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts index 206d16525..53d79e2fd 100644 --- a/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts +++ b/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts @@ -1664,3 +1664,390 @@ describe('OpenRouter STEP event consistency', () => { expect(stepFinished).toHaveLength(1) }) }) + +describe('OpenRouter structuredOutputStream', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('issues a single streaming request with response_format json_schema and emits parsed object', async () => { + const streamChunks = [ + { + id: 'chatcmpl-stream-1', + model: 'openai/gpt-4o-mini', + choices: [{ delta: { content: '{"name":"Ali' }, finishReason: null }], + }, + { + id: 'chatcmpl-stream-1', + model: 'openai/gpt-4o-mini', + choices: [ + { delta: { content: 'ce","age":30}' }, finishReason: 'stop' }, + ], + usage: { promptTokens: 5, completionTokens: 9, totalTokens: 14 }, + }, + ] + + setupMockSdkClient(streamChunks) + const adapter = createAdapter() + + const outputSchema = { + type: 'object', + properties: { + name: { type: 'string' }, + age: { type: 'number' }, + }, + required: ['name', 'age'], + } + + const chunks: Array = [] + for await (const chunk of adapter.structuredOutputStream({ + chatOptions: { + model: 'openai/gpt-4o-mini', + messages: [{ role: 'user', content: 'Give me a person' }], + logger: testLogger, + }, + outputSchema, + })) { + chunks.push(chunk) + } + + // Single SDK call with stream:true + responseFormat + expect(mockSend).toHaveBeenCalledTimes(1) + const [rawParams] = mockSend.mock.calls[0]! + const params = rawParams.chatRequest + expect(params.stream).toBe(true) + expect(params.responseFormat).toEqual({ + type: 'json_schema', + jsonSchema: { + name: 'structured_output', + schema: { + ...outputSchema, + additionalProperties: false, + }, + strict: true, + }, + }) + expect(params.tools).toBeUndefined() + + // Lifecycle events present and in the contractual order: consumers + // observing the CUSTOM `structured-output.complete` payload before + // RUN_FINISHED is part of the public contract; assert it. + const types: Array = chunks.map((c) => c.type) + const idx = (t: string) => types.indexOf(t) + expect(idx('RUN_STARTED')).toBeGreaterThanOrEqual(0) + expect(idx('TEXT_MESSAGE_START')).toBeGreaterThan(idx('RUN_STARTED')) + expect(idx('TEXT_MESSAGE_CONTENT')).toBeGreaterThan( + idx('TEXT_MESSAGE_START'), + ) + expect(idx('TEXT_MESSAGE_END')).toBeGreaterThan(idx('TEXT_MESSAGE_CONTENT')) + expect(idx('CUSTOM')).toBeGreaterThan(idx('TEXT_MESSAGE_END')) + expect(idx('RUN_FINISHED')).toBeGreaterThan(idx('CUSTOM')) + + // Two CONTENT deltas — one per stream chunk — carrying raw JSON deltas + const contentChunks = chunks.filter( + (c): c is Extract => + c.type === 'TEXT_MESSAGE_CONTENT', + ) + expect(contentChunks).toHaveLength(2) + expect(contentChunks[0]!.delta).toBe('{"name":"Ali') + expect(contentChunks[1]!.delta).toBe('ce","age":30}') + + // Final CUSTOM event carries the parsed object + raw text + const customChunks = chunks.filter( + (c): c is Extract => c.type === 'CUSTOM', + ) + expect(customChunks).toHaveLength(1) + expect(customChunks[0]!.name).toBe('structured-output.complete') + expect(customChunks[0]!.value).toEqual({ + object: { name: 'Alice', age: 30 }, + raw: '{"name":"Alice","age":30}', + }) + }) + + it('emits RUN_ERROR when accumulated content is not valid JSON', async () => { + const streamChunks = [ + { + id: 'chatcmpl-stream-bad', + model: 'openai/gpt-4o-mini', + choices: [{ delta: { content: 'not json' }, finishReason: 'stop' }], + usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 }, + }, + ] + + setupMockSdkClient(streamChunks) + const adapter = createAdapter() + + const chunks: Array = [] + for await (const chunk of adapter.structuredOutputStream({ + chatOptions: { + model: 'openai/gpt-4o-mini', + messages: [{ role: 'user', content: 'Give me a person' }], + logger: testLogger, + }, + outputSchema: { + type: 'object', + properties: { name: { type: 'string' } }, + required: ['name'], + }, + })) { + chunks.push(chunk) + } + + const errorChunks = chunks.filter((c) => c.type === 'RUN_ERROR') + expect(errorChunks).toHaveLength(1) + expect(errorChunks[0]).toMatchObject({ + type: 'RUN_ERROR', + message: expect.stringContaining('Failed to parse structured output'), + }) + + const customChunks = chunks.filter((c) => c.type === 'CUSTOM') + expect(customChunks).toHaveLength(0) + }) + + it('emits empty-response RUN_ERROR when no content is streamed', async () => { + // No content delta, just a finish — mirrors a refused/truncated response. + const streamChunks = [ + { + id: 'chatcmpl-stream-empty', + model: 'openai/gpt-4o-mini', + choices: [{ delta: { content: '' }, finishReason: 'stop' }], + usage: { promptTokens: 1, completionTokens: 0, totalTokens: 1 }, + }, + ] + + setupMockSdkClient(streamChunks) + const adapter = createAdapter() + + const chunks: Array = [] + for await (const chunk of adapter.structuredOutputStream({ + chatOptions: { + model: 'openai/gpt-4o-mini', + messages: [{ role: 'user', content: 'Give me a person' }], + logger: testLogger, + }, + outputSchema: { + type: 'object', + properties: { name: { type: 'string' } }, + required: ['name'], + }, + })) { + chunks.push(chunk) + } + + const errorChunks = chunks.filter((c) => c.type === 'RUN_ERROR') + expect(errorChunks).toHaveLength(1) + expect(errorChunks[0]).toMatchObject({ + type: 'RUN_ERROR', + code: 'empty-response', + }) + expect(chunks.filter((c) => c.type === 'CUSTOM')).toHaveLength(0) + }) + + it('finalizes the run when upstream stream closes without finishReason', async () => { + // Truncated SDK stream: deltas arrive but no finishReason. The adapter + // must still emit the terminal CUSTOM + RUN_FINISHED so consumers + // never hang waiting for completion. + const streamChunks = [ + { + id: 'chatcmpl-stream-trunc', + model: 'openai/gpt-4o-mini', + choices: [ + { delta: { content: '{"name":"Alice"}' }, finishReason: null }, + ], + }, + ] + + setupMockSdkClient(streamChunks) + const adapter = createAdapter() + + const chunks: Array = [] + for await (const chunk of adapter.structuredOutputStream({ + chatOptions: { + model: 'openai/gpt-4o-mini', + messages: [{ role: 'user', content: 'Give me a person' }], + logger: testLogger, + }, + outputSchema: { + type: 'object', + properties: { name: { type: 'string' } }, + required: ['name'], + }, + })) { + chunks.push(chunk) + } + + const customChunks = chunks.filter((c) => c.type === 'CUSTOM') + expect(customChunks).toHaveLength(1) + expect(customChunks[0]).toMatchObject({ + name: 'structured-output.complete', + value: { object: { name: 'Alice' }, raw: '{"name":"Alice"}' }, + }) + expect(chunks.filter((c) => c.type === 'RUN_FINISHED')).toHaveLength(1) + expect(chunks.filter((c) => c.type === 'RUN_ERROR')).toHaveLength(0) + }) + + it('terminates on mid-stream provider error without emitting RUN_FINISHED', async () => { + const streamChunks = [ + { + id: 'chatcmpl-stream-err', + model: 'openai/gpt-4o-mini', + choices: [{ delta: { content: '{"name":"Al' }, finishReason: null }], + }, + { + id: 'chatcmpl-stream-err', + model: 'openai/gpt-4o-mini', + error: { message: 'Upstream rate limit', code: 429 }, + choices: [], + }, + ] + + setupMockSdkClient(streamChunks) + const adapter = createAdapter() + + const chunks: Array = [] + for await (const chunk of adapter.structuredOutputStream({ + chatOptions: { + model: 'openai/gpt-4o-mini', + messages: [{ role: 'user', content: 'Give me a person' }], + logger: testLogger, + }, + outputSchema: { + type: 'object', + properties: { name: { type: 'string' } }, + required: ['name'], + }, + })) { + chunks.push(chunk) + } + + const errorChunks = chunks.filter((c) => c.type === 'RUN_ERROR') + expect(errorChunks).toHaveLength(1) + // After RUN_ERROR the stream is terminal — no RUN_FINISHED, no CUSTOM. + expect(chunks.filter((c) => c.type === 'RUN_FINISHED')).toHaveLength(0) + expect(chunks.filter((c) => c.type === 'CUSTOM')).toHaveLength(0) + }) + + it('surfaces accumulated reasoning on the structured-output.complete event', async () => { + // Thinking-model stream: reasoning deltas before content. Consumers that + // subscribe only to the terminal CUSTOM event should still recover the + // chain-of-thought via `value.reasoning`. + const streamChunks = [ + { + id: 'chatcmpl-stream-reasoning', + model: 'openai/gpt-4o-mini', + choices: [ + { + delta: { + reasoningDetails: [ + { type: 'reasoning.text', text: 'Let me think... ' }, + ], + }, + finishReason: null, + }, + ], + }, + { + id: 'chatcmpl-stream-reasoning', + model: 'openai/gpt-4o-mini', + choices: [ + { + delta: { + reasoningDetails: [ + { type: 'reasoning.text', text: 'a Strat would suit them.' }, + ], + }, + finishReason: null, + }, + ], + }, + { + id: 'chatcmpl-stream-reasoning', + model: 'openai/gpt-4o-mini', + choices: [ + { + delta: { content: '{"name":"Strat","price":1299}' }, + finishReason: 'stop', + }, + ], + }, + ] + + setupMockSdkClient(streamChunks) + const adapter = createAdapter() + + const chunks: Array = [] + for await (const chunk of adapter.structuredOutputStream({ + chatOptions: { + model: 'openai/gpt-4o-mini', + messages: [{ role: 'user', content: 'Recommend a guitar' }], + logger: testLogger, + }, + outputSchema: { + type: 'object', + properties: { + name: { type: 'string' }, + price: { type: 'number' }, + }, + required: ['name', 'price'], + }, + })) { + chunks.push(chunk) + } + + const customChunks = chunks.filter( + (c): c is Extract => c.type === 'CUSTOM', + ) + expect(customChunks).toHaveLength(1) + expect(customChunks[0]!.value).toEqual({ + object: { name: 'Strat', price: 1299 }, + raw: '{"name":"Strat","price":1299}', + reasoning: 'Let me think... a Strat would suit them.', + }) + }) + + it('omits reasoning from the CUSTOM event when none was streamed', async () => { + // Non-thinking model: no reasoning deltas. The `reasoning` field should + // be absent (not an empty string) so downstream consumers can branch on + // `value.reasoning != null` without false positives. + const streamChunks = [ + { + id: 'chatcmpl-stream-noreasoning', + model: 'openai/gpt-4o-mini', + choices: [ + { + delta: { content: '{"name":"Strat","price":1299}' }, + finishReason: 'stop', + }, + ], + }, + ] + + setupMockSdkClient(streamChunks) + const adapter = createAdapter() + + const chunks: Array = [] + for await (const chunk of adapter.structuredOutputStream({ + chatOptions: { + model: 'openai/gpt-4o-mini', + messages: [{ role: 'user', content: 'Recommend a guitar' }], + logger: testLogger, + }, + outputSchema: { + type: 'object', + properties: { + name: { type: 'string' }, + price: { type: 'number' }, + }, + required: ['name', 'price'], + }, + })) { + chunks.push(chunk) + } + + const customChunks = chunks.filter( + (c): c is Extract => c.type === 'CUSTOM', + ) + expect(customChunks).toHaveLength(1) + expect(customChunks[0]!.value).not.toHaveProperty('reasoning') + }) +}) diff --git a/packages/typescript/ai/src/activities/chat/adapter.ts b/packages/typescript/ai/src/activities/chat/adapter.ts index 4ccc6fc09..198b89ffa 100644 --- a/packages/typescript/ai/src/activities/chat/adapter.ts +++ b/packages/typescript/ai/src/activities/chat/adapter.ts @@ -97,6 +97,23 @@ export interface TextAdapter< structuredOutput: ( options: StructuredOutputOptions, ) => Promise> + + /** + * Stream structured output using the provider's native streaming structured + * output API (stream + response_format json_schema in a single request). + * + * Optional — adapters without native streaming JSON omit this method and the + * activity layer synthesizes a stream around the non-streaming + * `structuredOutput` call. + * + * Implementations must emit standard AG-UI lifecycle events (RUN_STARTED, + * TEXT_MESSAGE_*, RUN_FINISHED) carrying raw JSON text deltas, plus a final + * `CUSTOM` event named `structured-output.complete` whose `value` is + * `{ object, raw }`. + */ + structuredOutputStream?: ( + options: StructuredOutputOptions, + ) => AsyncIterable } /** diff --git a/packages/typescript/ai/src/activities/chat/index.ts b/packages/typescript/ai/src/activities/chat/index.ts index e1327fdb5..7a3c332c4 100644 --- a/packages/typescript/ai/src/activities/chat/index.ts +++ b/packages/typescript/ai/src/activities/chat/index.ts @@ -38,6 +38,8 @@ import type { RunFinishedEvent, SchemaInput, StreamChunk, + StructuredOutputCompleteEvent, + StructuredOutputStream, TextMessageContentEvent, TextOptions, Tool, @@ -213,7 +215,7 @@ export interface TextActivityOptions< export function createChatOptions< TAdapter extends AnyTextAdapter, TSchema extends SchemaInput | undefined = undefined, - TStream extends boolean = true, + TStream extends boolean = boolean, >( options: TextActivityOptions, ): TextActivityOptions { @@ -226,16 +228,28 @@ export function createChatOptions< /** * Result type for the text activity. - * - If outputSchema is provided: Promise> - * - If stream is false: Promise - * - Otherwise (stream is true, default): AsyncIterable + * - If outputSchema is provided AND stream is explicitly true: + * StructuredOutputStream> — yields raw JSON deltas + * via TEXT_MESSAGE_CONTENT plus a terminal StructuredOutputCompleteEvent + * carrying the validated object. + * - If outputSchema is provided without explicit stream:true: + * Promise>. + * - If stream is explicitly false (no schema): Promise. + * - Otherwise (default): AsyncIterable. + * + * `[TStream] extends [true]` is used (not `TStream extends true`) so that the + * default `boolean` value of `TStream` does *not* match the streaming branch. + * Without this, plain `chat({ outputSchema })` would type as a stream while + * the runtime returns a Promise — see issue #526. */ export type TextActivityResult< TSchema extends SchemaInput | undefined, - TStream extends boolean = true, + TStream extends boolean = boolean, > = TSchema extends SchemaInput - ? Promise> - : TStream extends false + ? [TStream] extends [true] + ? StructuredOutputStream> + : Promise> + : [TStream] extends [false] ? Promise : AsyncIterable @@ -1512,13 +1526,26 @@ class TextEngine< export function chat< TAdapter extends AnyTextAdapter, TSchema extends SchemaInput | undefined = undefined, - TStream extends boolean = true, + TStream extends boolean = boolean, >( options: TextActivityOptions, ): TextActivityResult { const { outputSchema, stream } = options - // If outputSchema is provided, run agentic structured output + // outputSchema + stream:true is the only branch that streams structured + // output. Without an explicit `stream: true`, schema-bearing calls run the + // agent loop and resolve to a typed Promise>. + if (outputSchema && stream === true) { + return runStreamingStructuredOutput( + options as unknown as TextActivityOptions< + AnyTextAdapter, + SchemaInput, + true + >, + ) as TextActivityResult + } + + // If outputSchema is provided, run agentic structured output (Promise) if (outputSchema) { return runAgenticStructuredOutput( options as unknown as TextActivityOptions< @@ -1678,6 +1705,285 @@ async function runAgenticStructuredOutput( return result.data as InferSchemaType } +/** + * Synthesize a streaming structured-output stream by wrapping a non-streaming + * `structuredOutput` call. Used when an adapter doesn't implement + * `structuredOutputStream` natively. + */ +async function* fallbackStructuredOutputStream( + adapter: AnyTextAdapter, + options: { chatOptions: TextOptions; outputSchema: any }, +): AsyncIterable { + const { chatOptions } = options + const runId = chatOptions.runId ?? `mock-${Date.now()}` + const threadId = chatOptions.threadId ?? `mock-${Date.now()}` + const messageId = `mock-${Date.now()}-${Math.random().toString(36).slice(2)}` + const model = chatOptions.model + const timestamp = Date.now() + + yield { + type: 'RUN_STARTED', + runId, + threadId, + model, + timestamp, + } as unknown as StreamChunk + + let result: { data: unknown; rawText: string } + try { + result = await adapter.structuredOutput(options) + } catch (error) { + const message = error instanceof Error ? error.message : 'Unknown error' + yield { + type: 'RUN_ERROR', + runId, + model, + timestamp, + message, + error: { message }, + } as unknown as StreamChunk + return + } + + yield { + type: 'TEXT_MESSAGE_START', + messageId, + role: 'assistant', + model, + timestamp, + } as unknown as StreamChunk + + yield { + type: 'TEXT_MESSAGE_CONTENT', + messageId, + delta: result.rawText, + model, + timestamp, + } as unknown as StreamChunk + + yield { + type: 'TEXT_MESSAGE_END', + messageId, + model, + timestamp, + } as unknown as StreamChunk + + yield { + type: 'CUSTOM', + name: 'structured-output.complete', + value: { object: result.data, raw: result.rawText }, + model, + timestamp, + } as unknown as StreamChunk + + yield { + type: 'RUN_FINISHED', + runId, + threadId, + model, + timestamp, + finishReason: 'stop', + } as unknown as StreamChunk +} + +/** + * Run streaming structured output: + * - Without tools: call adapter.structuredOutputStream directly (single + * provider request emitting JSON deltas + a final CUSTOM event). + * - With tools: run the agent loop, yield its non-terminal chunks, then call + * structuredOutputStream on the final messages so the structured stream's + * own RUN_STARTED/RUN_FINISHED bracket the run. + * + * Validates the parsed object against the original Standard Schema (if + * applicable) when forwarding the final `structured-output.complete` event. + * + * Pre-flight validation (missing schema, unconvertible schema) throws + * synchronously at call time rather than as a yielded RUN_ERROR mid-stream — + * those are programmer errors, not runtime conditions. + */ +function runStreamingStructuredOutput( + options: TextActivityOptions, +): StructuredOutputStream> { + const { outputSchema } = options + + if (!outputSchema) { + throw new Error('outputSchema is required for streaming structured output') + } + + // forStructuredOutput strict-converts the schema once at the activity + // boundary. Adapters can re-convert if their wire format diverges, but the + // default flow hands them a strict-ready schema. + const jsonSchema = convertSchemaToJsonSchema(outputSchema, { + forStructuredOutput: true, + }) + if (!jsonSchema) { + throw new Error('Failed to convert output schema to JSON Schema') + } + + return runStreamingStructuredOutputImpl(options, jsonSchema) +} + +async function* runStreamingStructuredOutputImpl( + options: TextActivityOptions, + jsonSchema: NonNullable>, +): StructuredOutputStream> { + const { adapter, outputSchema, middleware, context, debug, ...textOptions } = + options + const model = adapter.model + const logger = resolveDebugOption(debug) + const runId = textOptions.runId + + // Inputs may be UIMessages (from useChat) or ModelMessages (from server-side + // callers). The agent-loop branch converts via TextEngine; the no-tools + // branch must convert here so the adapter sees a uniform ModelMessage shape. + let finalMessages = convertMessagesToModelMessages( + (textOptions.messages ?? []) as Array, + ) + + if (textOptions.tools?.length) { + const engine = new TextEngine( + { + adapter, + params: { ...textOptions, model, logger } as TextOptions< + Record, + Record + >, + middleware, + context, + }, + logger, + ) + + // The structured-output stream emits its own RUN_STARTED + RUN_FINISHED + // pair to bracket the run — drop both from the engine's output so + // consumers see exactly one terminal lifecycle pair. + try { + for await (const chunk of engine.run()) { + if (chunk.type === 'RUN_STARTED' || chunk.type === 'RUN_FINISHED') { + continue + } + yield chunk + } + } catch (engineError) { + const message = (engineError as Error).message || 'Agent loop failed' + logger.errors('runStreamingStructuredOutput agent loop failed', { + error: engineError, + source: 'runStreamingStructuredOutput', + }) + yield { + type: 'RUN_ERROR', + runId, + model, + timestamp: Date.now(), + message, + code: 'agent-loop-failed', + error: { message, code: 'agent-loop-failed' }, + } as unknown as StreamChunk + return + } + + finalMessages = engine.getMessages() + } + + const { + tools: _tools, + agentLoopStrategy: _als, + ...structuredTextOptions + } = textOptions + + const providerName = + (adapter as { provider?: string }).provider ?? adapter.name + logger.request( + `activity=chat-structured-stream provider=${providerName} model=${model} messages=${finalMessages.length}`, + { + provider: providerName, + model, + messageCount: finalMessages.length, + }, + ) + + const structuredChatOptions = { + ...structuredTextOptions, + model, + messages: finalMessages, + logger, + } + + // Adapters that don't implement structuredOutputStream natively fall back + // to wrapping the non-streaming `structuredOutput` — `fallbackStructuredOutputStream` + // synthesizes the AG-UI lifecycle events around it. + const stream = adapter.structuredOutputStream + ? adapter.structuredOutputStream({ + chatOptions: structuredChatOptions, + outputSchema: jsonSchema, + }) + : fallbackStructuredOutputStream(adapter, { + chatOptions: structuredChatOptions, + outputSchema: jsonSchema, + }) + + for await (const chunk of stream) { + if ( + chunk.type === 'CUSTOM' && + chunk.name === 'structured-output.complete' + ) { + const customChunk = chunk + const value = customChunk.value as { + object: unknown + raw: string + reasoning?: string + } + if (isStandardSchema(outputSchema)) { + try { + const validated = parseWithStandardSchema(outputSchema, value.object) + yield { + ...customChunk, + // Forward `reasoning` through schema validation so consumers that + // only listen for the terminal event don't lose chain-of-thought. + value: { + object: validated, + raw: value.raw, + ...(value.reasoning ? { reasoning: value.reasoning } : {}), + }, + } as StructuredOutputCompleteEvent> + continue + } catch (err) { + const message = (err as Error).message || 'Schema validation failed' + logger.errors( + 'runStreamingStructuredOutput schema validation failed', + { + error: err, + source: 'runStreamingStructuredOutput', + // Include reasoning in error meta so post-mortems can recover + // what the model thought through before producing invalid JSON. + ...(value.reasoning ? { reasoning: value.reasoning } : {}), + }, + ) + yield { + type: 'RUN_ERROR', + runId, + model: customChunk.model ?? model, + timestamp: customChunk.timestamp ?? Date.now(), + message, + code: 'schema-validation', + error: { + message, + code: 'schema-validation', + ...(value.reasoning ? { reasoning: value.reasoning } : {}), + }, + } as unknown as StreamChunk + return + } + } + yield customChunk as StructuredOutputCompleteEvent< + InferSchemaType + > + continue + } + yield chunk + } +} + // Re-export adapter types export type { TextAdapter, diff --git a/packages/typescript/ai/src/types.ts b/packages/typescript/ai/src/types.ts index e11e7176f..abcca84c1 100644 --- a/packages/typescript/ai/src/types.ts +++ b/packages/typescript/ai/src/types.ts @@ -1044,6 +1044,55 @@ export interface CustomEvent extends AGUICustomEvent { model?: string } +/** + * Final event of a streaming structured-output run. Carries the validated + * `object` (typed as `T` after the orchestrator runs Standard Schema parsing), + * the `raw` JSON text that produced it, and — for thinking/reasoning models — + * the accumulated reasoning text. Adapters emit this with `T = unknown`; the + * chat orchestrator narrows to the schema's inferred type after validation. + * + * `reasoning` is `undefined` when the model produced none (most non-thinking + * models) and when the underlying adapter doesn't expose reasoning streams. + */ +export interface StructuredOutputCompleteEvent extends Omit< + CustomEvent, + 'name' | 'value' +> { + name: 'structured-output.complete' + value: { object: T; raw: string; reasoning?: string } +} + +/** + * Public type for streams returned by `chat({ outputSchema, stream: true })`. + * Yields all standard `StreamChunk` lifecycle events plus a terminal + * `StructuredOutputCompleteEvent` whose `value.object` is typed against the + * caller's schema. + */ +export type StructuredOutputStream = AsyncIterable< + StreamChunk | StructuredOutputCompleteEvent +> + +/** + * Type guard for the terminal `structured-output.complete` event. Use to + * narrow chunks while iterating a `StructuredOutputStream`: + * + * ```ts + * for await (const chunk of stream) { + * if (isStructuredOutputCompleteEvent(chunk)) { + * chunk.value.object // typed as MySchema + * } + * } + * ``` + */ +export function isStructuredOutputCompleteEvent( + chunk: StreamChunk | StructuredOutputCompleteEvent, +): chunk is StructuredOutputCompleteEvent { + return ( + chunk.type === 'CUSTOM' && + (chunk as CustomEvent).name === 'structured-output.complete' + ) +} + // ============================================================================ // AG-UI Reasoning Event Interfaces // ============================================================================ diff --git a/testing/e2e/README.md b/testing/e2e/README.md index cc1fb3873..285620db4 100644 --- a/testing/e2e/README.md +++ b/testing/e2e/README.md @@ -12,25 +12,26 @@ End-to-end tests for TanStack AI using Playwright and [aimock](https://github.co Each test iterates over supported providers using `providersFor('feature')`: -| Feature | Providers | Spec file | -| --------------------- | --------- | ------------------------------------- | -| chat | 7 | `tests/chat.spec.ts` | -| one-shot-text | 7 | `tests/one-shot-text.spec.ts` | -| multi-turn | 7 | `tests/multi-turn.spec.ts` | -| structured-output | 7 | `tests/structured-output.spec.ts` | -| tool-calling | 7 | `tests/tool-calling.spec.ts` | -| parallel-tool-calls | 6 | `tests/parallel-tool-calls.spec.ts` | -| tool-approval | 6 | `tests/tool-approval.spec.ts` | -| text-tool-text | 6 | `tests/text-tool-text.spec.ts` | -| agentic-structured | 7 | `tests/agentic-structured.spec.ts` | -| reasoning | 3 | `tests/reasoning.spec.ts` | -| multimodal-image | 5 | `tests/multimodal-image.spec.ts` | -| multimodal-structured | 5 | `tests/multimodal-structured.spec.ts` | -| summarize | 6 | `tests/summarize.spec.ts` | -| summarize-stream | 6 | `tests/summarize-stream.spec.ts` | -| image-gen | 7 | `tests/image-gen.spec.ts` | -| tts | 7 | `tests/tts.spec.ts` | -| transcription | 7 | `tests/transcription.spec.ts` | +| Feature | Providers | Spec file | +| ------------------------ | --------- | ---------------------------------------- | +| chat | 7 | `tests/chat.spec.ts` | +| one-shot-text | 7 | `tests/one-shot-text.spec.ts` | +| multi-turn | 7 | `tests/multi-turn.spec.ts` | +| structured-output | 7 | `tests/structured-output.spec.ts` | +| structured-output-stream | 1 | `tests/structured-output-stream.spec.ts` | +| tool-calling | 7 | `tests/tool-calling.spec.ts` | +| parallel-tool-calls | 6 | `tests/parallel-tool-calls.spec.ts` | +| tool-approval | 6 | `tests/tool-approval.spec.ts` | +| text-tool-text | 6 | `tests/text-tool-text.spec.ts` | +| agentic-structured | 7 | `tests/agentic-structured.spec.ts` | +| reasoning | 3 | `tests/reasoning.spec.ts` | +| multimodal-image | 5 | `tests/multimodal-image.spec.ts` | +| multimodal-structured | 5 | `tests/multimodal-structured.spec.ts` | +| summarize | 6 | `tests/summarize.spec.ts` | +| summarize-stream | 6 | `tests/summarize-stream.spec.ts` | +| image-gen | 7 | `tests/image-gen.spec.ts` | +| tts | 7 | `tests/tts.spec.ts` | +| transcription | 7 | `tests/transcription.spec.ts` | ### Tools-test page @@ -122,7 +123,7 @@ Clean up the fixture: } ``` -Existing prefixes: `[chat]`, `[oneshot]`, `[reasoning]`, `[multiturn-1]`, `[multiturn-2]`, `[toolcall]`, `[parallel]`, `[approval]`, `[approval-deny]`, `[text-tool-text]`, `[structured]`, `[agentic]`, `[mmimage]`, `[mmstruct]`, `[summarize]`, `[imagegen]`, `[tts]`, `[transcription]`, `[abort-test]`, `[error-test]`. +Existing prefixes: `[chat]`, `[oneshot]`, `[reasoning]`, `[multiturn-1]`, `[multiturn-2]`, `[toolcall]`, `[parallel]`, `[approval]`, `[approval-deny]`, `[text-tool-text]`, `[structured]`, `[structured-stream]`, `[agentic]`, `[mmimage]`, `[mmstruct]`, `[summarize]`, `[imagegen]`, `[tts]`, `[transcription]`, `[abort-test]`, `[error-test]`. ## 4. Writing a Test diff --git a/testing/e2e/fixtures/structured-output-stream/abort.json b/testing/e2e/fixtures/structured-output-stream/abort.json new file mode 100644 index 000000000..6ed750774 --- /dev/null +++ b/testing/e2e/fixtures/structured-output-stream/abort.json @@ -0,0 +1,16 @@ +{ + "fixtures": [ + { + "match": { + "userMessage": "[structured-stream-abort] recommend a guitar slowly" + }, + "response": { + "content": "{\"name\":\"Fender Stratocaster\",\"price\":1299,\"reason\":\"Versatile tone and comfortable playability across many genres including blues rock jazz country and pop with a wide tonal palette\",\"rating\":5}" + }, + "opts": { + "tokensPerSecond": 1, + "chunkSize": 2 + } + } + ] +} diff --git a/testing/e2e/fixtures/structured-output-stream/basic.json b/testing/e2e/fixtures/structured-output-stream/basic.json new file mode 100644 index 000000000..62ef046fc --- /dev/null +++ b/testing/e2e/fixtures/structured-output-stream/basic.json @@ -0,0 +1,12 @@ +{ + "fixtures": [ + { + "match": { + "userMessage": "[structured-stream] recommend a guitar as json" + }, + "response": { + "content": "{\"name\":\"Fender Stratocaster\",\"price\":1299,\"reason\":\"Versatile tone and comfortable playability\",\"rating\":5}" + } + } + ] +} diff --git a/testing/e2e/src/components/ChatUI.tsx b/testing/e2e/src/components/ChatUI.tsx index 7183115ae..da5f0c595 100644 --- a/testing/e2e/src/components/ChatUI.tsx +++ b/testing/e2e/src/components/ChatUI.tsx @@ -18,6 +18,13 @@ interface ChatUIProps { }) => Promise showImageInput?: boolean onStop?: () => void + /** When the streaming structured-output CUSTOM event lands, the page + * exposes the parsed object here so e2e tests can assert that the event + * reached the client (not just that the JSON text was rendered). */ + structuredObject?: unknown + /** Number of TEXT_MESSAGE_CONTENT chunks observed. Used by streaming e2e + * tests to verify the response actually streamed in multiple deltas. */ + contentDeltaCount?: number } export function ChatUI({ @@ -28,6 +35,8 @@ export function ChatUI({ addToolApprovalResponse, showImageInput, onStop, + structuredObject, + contentDeltaCount, }: ChatUIProps) { const [input, setInput] = useState('') const messagesRef = useRef(null) @@ -46,6 +55,20 @@ export function ChatUI({ return (
+ {structuredObject != null && ( +