From 822e657d62ef165594c2264f0e630b42e434ffce Mon Sep 17 00:00:00 2001 From: Jean du Plessis Date: Mon, 23 Mar 2026 11:22:05 +0200 Subject: [PATCH 1/2] fix(session-ingest): add retry with backoff to session-ingest-client calls Replace direct fetch() calls in session-ingest-client with fetchWithBackoff() to handle transient failures from the session ingest worker. Consolidate the duplicated share endpoint logic in cli-sessions-v2-router to use the shared shareSessionIngest helper. Drain response bodies on retry to prevent socket/buffer leaks. --- src/lib/fetchWithBackoff.ts | 2 + src/lib/session-ingest-client.test.ts | 88 +++++++++++++++++++++++++++ src/lib/session-ingest-client.ts | 27 ++++---- src/routers/cli-sessions-v2-router.ts | 41 +++---------- 4 files changed, 115 insertions(+), 43 deletions(-) diff --git a/src/lib/fetchWithBackoff.ts b/src/lib/fetchWithBackoff.ts index 083e4a97f..b58c53cfc 100644 --- a/src/lib/fetchWithBackoff.ts +++ b/src/lib/fetchWithBackoff.ts @@ -48,6 +48,8 @@ export async function fetchWithBackoff( ); return response; } + // Drain the body of retried responses to free the underlying socket/buffer. + response.body?.cancel().catch(() => {}); } catch (err) { if (hasElapsed()) { captureException(err, { diff --git a/src/lib/session-ingest-client.test.ts b/src/lib/session-ingest-client.test.ts index 4c5074af8..37777a152 100644 --- a/src/lib/session-ingest-client.test.ts +++ b/src/lib/session-ingest-client.test.ts @@ -1,4 +1,5 @@ import { captureException } from '@sentry/nextjs'; +import type { fetchWithBackoff as fetchWithBackoffType } from '@/lib/fetchWithBackoff'; import { generateInternalServiceToken } from '@/lib/tokens'; import type { SessionSnapshot } from './session-ingest-client'; import { @@ -24,11 +25,21 @@ jest.mock('@/lib/tokens', () => ({ generateInternalServiceToken: jest.fn().mockReturnValue('mock-jwt-token'), })); +// Passthrough to global.fetch so existing single-response mocks keep working. +jest.mock('@/lib/fetchWithBackoff', () => ({ + fetchWithBackoff: jest.fn((input: RequestInfo | URL, init?: RequestInit) => + global.fetch(input, init) + ), +})); + const mockFetch = jest.fn(); global.fetch = mockFetch; const mockCaptureException = jest.mocked(captureException); const mockGenerateInternalServiceToken = jest.mocked(generateInternalServiceToken); +const mockFetchWithBackoff = jest.mocked( + jest.requireMock<{ fetchWithBackoff: typeof fetchWithBackoffType }>('@/lib/fetchWithBackoff') +).fetchWithBackoff; // --------------------------------------------------------------------------- // Helpers @@ -174,6 +185,83 @@ describe('fetchSessionSnapshot', () => { }); }); +// --------------------------------------------------------------------------- +// fetchSessionSnapshot — retry integration (real fetchWithBackoff) +// --------------------------------------------------------------------------- + +describe('fetchSessionSnapshot retry integration', () => { + beforeEach(() => { + mockFetch.mockReset(); + mockCaptureException.mockReset(); + mockGenerateInternalServiceToken.mockReset().mockReturnValue('mock-jwt-token'); + + // Swap to the real fetchWithBackoff for this describe block. + const real = jest.requireActual<{ fetchWithBackoff: typeof fetchWithBackoffType }>( + '@/lib/fetchWithBackoff' + ); + mockFetchWithBackoff.mockImplementation(real.fetchWithBackoff); + }); + + afterEach(() => { + mockFetchWithBackoff.mockImplementation((input: RequestInfo | URL, init?: RequestInit) => + global.fetch(input, init) + ); + }); + + it('succeeds after a transient 500 without reporting to Sentry', async () => { + const snapshot = makeSnapshot([ + { role: 'assistant', parts: [{ type: 'text', text: 'hello' }] }, + ]); + + // First call: transient 500. Second call: success. + mockFetch + .mockResolvedValueOnce({ + ok: false, + status: 500, + statusText: 'Internal Server Error', + text: () => Promise.resolve('transient'), + }) + .mockResolvedValueOnce({ + ok: true, + status: 200, + json: () => Promise.resolve(snapshot), + }); + + const result = await fetchSessionSnapshot('ses_retry', 'user_123'); + + expect(result).toEqual(snapshot); + expect(mockFetch).toHaveBeenCalledTimes(2); + // captureException should NOT have been called — the retry recovered. + expect(mockCaptureException).not.toHaveBeenCalled(); + }); + + it('reports to Sentry only after retries are exhausted', async () => { + // Always return 500 — fetchWithBackoff will exhaust its 10s budget. + mockFetch.mockResolvedValue({ + ok: false, + status: 500, + statusText: 'Internal Server Error', + text: () => Promise.resolve('persistent failure'), + }); + + await expect(fetchSessionSnapshot('ses_exhaust', 'user_123')).rejects.toThrow( + 'Session ingest export failed: 500 Internal Server Error - persistent failure' + ); + + // Sentry should be called exactly once, after all retries failed. + expect(mockCaptureException).toHaveBeenCalledTimes(1); + expect(mockCaptureException).toHaveBeenCalledWith( + expect.any(Error), + expect.objectContaining({ + tags: { source: 'session-ingest-client', endpoint: 'export' }, + extra: { sessionId: 'ses_exhaust', status: 500 }, + }) + ); + // fetchWithBackoff should have retried multiple times. + expect(mockFetch.mock.calls.length).toBeGreaterThan(1); + }, 15_000); +}); + // --------------------------------------------------------------------------- // deleteSession // --------------------------------------------------------------------------- diff --git a/src/lib/session-ingest-client.ts b/src/lib/session-ingest-client.ts index c5fe3584a..396df10fb 100644 --- a/src/lib/session-ingest-client.ts +++ b/src/lib/session-ingest-client.ts @@ -3,6 +3,7 @@ import 'server-only'; import { captureException } from '@sentry/nextjs'; import { z } from 'zod'; import { SESSION_INGEST_WORKER_URL } from '@/lib/config.server'; +import { fetchWithBackoff } from '@/lib/fetchWithBackoff'; import { generateInternalServiceToken } from '@/lib/tokens'; import type { User } from '@kilocode/db/schema'; @@ -62,9 +63,11 @@ export async function fetchSessionSnapshot( const token = generateInternalServiceToken(userId); const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}/export`; - const response = await fetch(url, { - headers: { Authorization: `Bearer ${token}` }, - }); + const response = await fetchWithBackoff( + url, + { headers: { Authorization: `Bearer ${token}` } }, + { maxDelayMs: 10_000 } + ); if (response.status === 404) { return null; @@ -125,10 +128,11 @@ export async function shareSession( const token = generateInternalServiceToken(userId); const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}/share`; - const response = await fetch(url, { - method: 'POST', - headers: { Authorization: `Bearer ${token}` }, - }); + const response = await fetchWithBackoff( + url, + { method: 'POST', headers: { Authorization: `Bearer ${token}` } }, + { maxDelayMs: 10_000 } + ); if (response.status === 404) { throw new Error('Session not found'); @@ -168,10 +172,11 @@ export async function deleteSession(sessionId: string, userId: string): Promise< const token = generateInternalServiceToken(userId); const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}`; - const response = await fetch(url, { - method: 'DELETE', - headers: { Authorization: `Bearer ${token}` }, - }); + const response = await fetchWithBackoff( + url, + { method: 'DELETE', headers: { Authorization: `Bearer ${token}` } }, + { maxDelayMs: 10_000 } + ); if (response.status === 404) { // Session already deleted or was never ingested — treat as success (idempotent delete). diff --git a/src/routers/cli-sessions-v2-router.ts b/src/routers/cli-sessions-v2-router.ts index 40d4c6c2e..d066bdff6 100644 --- a/src/routers/cli-sessions-v2-router.ts +++ b/src/routers/cli-sessions-v2-router.ts @@ -8,13 +8,12 @@ import { captureException } from '@sentry/nextjs'; import { TRPCClientError } from '@trpc/client'; import { cli_sessions_v2 } from '@kilocode/db/schema'; import { createCloudAgentNextClient } from '@/lib/cloud-agent-next/cloud-agent-client'; -import { generateApiToken, generateInternalServiceToken } from '@/lib/tokens'; +import { generateApiToken } from '@/lib/tokens'; import { fetchSessionMessages, deleteSession as deleteSessionIngest, shareSession as shareSessionIngest, } from '@/lib/session-ingest-client'; -import { SESSION_INGEST_WORKER_URL } from '@/lib/config.server'; import { baseGetSessionNextOutputSchema } from './cloud-agent-next-schemas'; import { sanitizeGitUrl } from '@/routers/cli-sessions-router'; import { verifyWebhookTriggerAccess } from '@/lib/webhook-trigger-ownership'; @@ -438,40 +437,18 @@ export const cliSessionsV2Router = createTRPCRouter({ }); } - if (!SESSION_INGEST_WORKER_URL) { - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: 'SESSION_INGEST_WORKER_URL is not configured', - }); - } - - const token = generateInternalServiceToken(session.kilo_user_id); - const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(input.kilo_session_id)}/share`; - - const response = await fetch(url, { - method: 'POST', - headers: { Authorization: `Bearer ${token}` }, - }); - - if (!response.ok) { - const errorText = await response.text().catch(() => ''); - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `Session share failed: ${response.status} ${response.statusText}${errorText ? ` - ${errorText}` : ''}`, - }); - } - - const shareResponseSchema = z.object({ public_id: z.string() }); - let body: z.infer; try { - body = shareResponseSchema.parse(await response.json()); - } catch { + const result = await shareSessionIngest(input.kilo_session_id, session.kilo_user_id); + return { share_id: result.public_id, session_id: input.kilo_session_id }; + } catch (error) { + if (error instanceof Error && error.message === 'Session not found') { + throw new TRPCError({ code: 'NOT_FOUND', message: 'Session not found' }); + } throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', - message: 'Session share succeeded but response was malformed', + message: 'Failed to share session', + cause: error, }); } - - return { share_id: body.public_id, session_id: input.kilo_session_id }; }), }); From ecf5ec04731901aabedb0bedf1705438fbcf34cc Mon Sep 17 00:00:00 2001 From: Jean du Plessis Date: Mon, 23 Mar 2026 14:19:18 +0200 Subject: [PATCH 2/2] fix(session-ingest): update router tests to mock shareSession instead of global.fetch The shareForWebhookTrigger endpoint now delegates to the shared shareSessionIngest client. The router test was mocking global.fetch directly, which broke because fetchWithBackoff retries transient 500s and the mockResolvedValueOnce only covered the first attempt. --- src/routers/cli-sessions-v2-router.test.ts | 41 +++++++++++----------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/src/routers/cli-sessions-v2-router.test.ts b/src/routers/cli-sessions-v2-router.test.ts index bfdf2ffa4..96f885413 100644 --- a/src/routers/cli-sessions-v2-router.test.ts +++ b/src/routers/cli-sessions-v2-router.test.ts @@ -19,6 +19,19 @@ jest.mock('@/lib/config.server', () => { }; }); +const mockShareSession = jest.fn< + Promise<{ public_id: string }>, + [sessionId: string, userId: string] +>(); + +jest.mock('@/lib/session-ingest-client', () => { + const actual: Record = jest.requireActual('@/lib/session-ingest-client'); + return { + ...actual, + shareSession: (...args: [string, string]) => mockShareSession(...args), + }; +}); + let regularUser: User; let otherUser: User; let testOrganization: Organization; @@ -84,7 +97,6 @@ describe('cli-sessions-v2-router', () => { }); const v2SessionId = 'ses_test_share_v2_session_1234'; - let fetchSpy: jest.SpyInstance; beforeEach(async () => { await db.insert(cli_sessions_v2).values({ @@ -93,16 +105,11 @@ describe('cli-sessions-v2-router', () => { created_on_platform: 'webhook', }); - fetchSpy = jest.spyOn(global, 'fetch').mockResolvedValue( - new Response(JSON.stringify({ success: true, public_id: 'test-public-uuid' }), { - status: 200, - headers: { 'Content-Type': 'application/json' }, - }) - ); + mockShareSession.mockResolvedValue({ public_id: 'test-public-uuid' }); }); afterEach(async () => { - fetchSpy.mockRestore(); + mockShareSession.mockReset(); await db.delete(cli_sessions_v2).where(eq(cli_sessions_v2.session_id, v2SessionId)); }); @@ -119,13 +126,8 @@ describe('cli-sessions-v2-router', () => { session_id: v2SessionId, }); - expect(fetchSpy).toHaveBeenCalledTimes(1); - const [fetchUrl, fetchOpts] = fetchSpy.mock.calls[0]; - expect(fetchUrl).toBe( - `https://test-ingest.example.com/api/session/${encodeURIComponent(v2SessionId)}/share` - ); - expect(fetchOpts.method).toBe('POST'); - expect(fetchOpts.headers.Authorization).toMatch(/^Bearer .+/); + expect(mockShareSession).toHaveBeenCalledTimes(1); + expect(mockShareSession).toHaveBeenCalledWith(v2SessionId, regularUser.id); }); it('should throw NOT_FOUND for non-existent v2 session', async () => { @@ -142,11 +144,8 @@ describe('cli-sessions-v2-router', () => { }); it('should throw INTERNAL_SERVER_ERROR when session-ingest returns an error', async () => { - fetchSpy.mockResolvedValueOnce( - new Response('Internal Server Error', { - status: 500, - statusText: 'Internal Server Error', - }) + mockShareSession.mockRejectedValueOnce( + new Error('Session ingest share failed: 500 Internal Server Error') ); const caller = await createCallerForUser(regularUser.id); @@ -156,7 +155,7 @@ describe('cli-sessions-v2-router', () => { kilo_session_id: v2SessionId, trigger_id: testTriggerId, }) - ).rejects.toThrow('Session share failed: 500 Internal Server Error'); + ).rejects.toThrow('Failed to share session'); }); it('should throw NOT_FOUND when session belongs to a different user (personal trigger)', async () => {