diff --git a/src/lib/fetchWithBackoff.ts b/src/lib/fetchWithBackoff.ts index 083e4a97f..b58c53cfc 100644 --- a/src/lib/fetchWithBackoff.ts +++ b/src/lib/fetchWithBackoff.ts @@ -48,6 +48,8 @@ export async function fetchWithBackoff( ); return response; } + // Drain the body of retried responses to free the underlying socket/buffer. + response.body?.cancel().catch(() => {}); } catch (err) { if (hasElapsed()) { captureException(err, { diff --git a/src/lib/session-ingest-client.test.ts b/src/lib/session-ingest-client.test.ts index 4c5074af8..37777a152 100644 --- a/src/lib/session-ingest-client.test.ts +++ b/src/lib/session-ingest-client.test.ts @@ -1,4 +1,5 @@ import { captureException } from '@sentry/nextjs'; +import type { fetchWithBackoff as fetchWithBackoffType } from '@/lib/fetchWithBackoff'; import { generateInternalServiceToken } from '@/lib/tokens'; import type { SessionSnapshot } from './session-ingest-client'; import { @@ -24,11 +25,21 @@ jest.mock('@/lib/tokens', () => ({ generateInternalServiceToken: jest.fn().mockReturnValue('mock-jwt-token'), })); +// Passthrough to global.fetch so existing single-response mocks keep working. +jest.mock('@/lib/fetchWithBackoff', () => ({ + fetchWithBackoff: jest.fn((input: RequestInfo | URL, init?: RequestInit) => + global.fetch(input, init) + ), +})); + const mockFetch = jest.fn(); global.fetch = mockFetch; const mockCaptureException = jest.mocked(captureException); const mockGenerateInternalServiceToken = jest.mocked(generateInternalServiceToken); +const mockFetchWithBackoff = jest.mocked( + jest.requireMock<{ fetchWithBackoff: typeof fetchWithBackoffType }>('@/lib/fetchWithBackoff') +).fetchWithBackoff; // --------------------------------------------------------------------------- // Helpers @@ -174,6 +185,83 @@ describe('fetchSessionSnapshot', () => { }); }); +// --------------------------------------------------------------------------- +// fetchSessionSnapshot — retry integration (real fetchWithBackoff) +// --------------------------------------------------------------------------- + +describe('fetchSessionSnapshot retry integration', () => { + beforeEach(() => { + mockFetch.mockReset(); + mockCaptureException.mockReset(); + mockGenerateInternalServiceToken.mockReset().mockReturnValue('mock-jwt-token'); + + // Swap to the real fetchWithBackoff for this describe block. + const real = jest.requireActual<{ fetchWithBackoff: typeof fetchWithBackoffType }>( + '@/lib/fetchWithBackoff' + ); + mockFetchWithBackoff.mockImplementation(real.fetchWithBackoff); + }); + + afterEach(() => { + mockFetchWithBackoff.mockImplementation((input: RequestInfo | URL, init?: RequestInit) => + global.fetch(input, init) + ); + }); + + it('succeeds after a transient 500 without reporting to Sentry', async () => { + const snapshot = makeSnapshot([ + { role: 'assistant', parts: [{ type: 'text', text: 'hello' }] }, + ]); + + // First call: transient 500. Second call: success. + mockFetch + .mockResolvedValueOnce({ + ok: false, + status: 500, + statusText: 'Internal Server Error', + text: () => Promise.resolve('transient'), + }) + .mockResolvedValueOnce({ + ok: true, + status: 200, + json: () => Promise.resolve(snapshot), + }); + + const result = await fetchSessionSnapshot('ses_retry', 'user_123'); + + expect(result).toEqual(snapshot); + expect(mockFetch).toHaveBeenCalledTimes(2); + // captureException should NOT have been called — the retry recovered. + expect(mockCaptureException).not.toHaveBeenCalled(); + }); + + it('reports to Sentry only after retries are exhausted', async () => { + // Always return 500 — fetchWithBackoff will exhaust its 10s budget. + mockFetch.mockResolvedValue({ + ok: false, + status: 500, + statusText: 'Internal Server Error', + text: () => Promise.resolve('persistent failure'), + }); + + await expect(fetchSessionSnapshot('ses_exhaust', 'user_123')).rejects.toThrow( + 'Session ingest export failed: 500 Internal Server Error - persistent failure' + ); + + // Sentry should be called exactly once, after all retries failed. + expect(mockCaptureException).toHaveBeenCalledTimes(1); + expect(mockCaptureException).toHaveBeenCalledWith( + expect.any(Error), + expect.objectContaining({ + tags: { source: 'session-ingest-client', endpoint: 'export' }, + extra: { sessionId: 'ses_exhaust', status: 500 }, + }) + ); + // fetchWithBackoff should have retried multiple times. + expect(mockFetch.mock.calls.length).toBeGreaterThan(1); + }, 15_000); +}); + // --------------------------------------------------------------------------- // deleteSession // --------------------------------------------------------------------------- diff --git a/src/lib/session-ingest-client.ts b/src/lib/session-ingest-client.ts index c5fe3584a..396df10fb 100644 --- a/src/lib/session-ingest-client.ts +++ b/src/lib/session-ingest-client.ts @@ -3,6 +3,7 @@ import 'server-only'; import { captureException } from '@sentry/nextjs'; import { z } from 'zod'; import { SESSION_INGEST_WORKER_URL } from '@/lib/config.server'; +import { fetchWithBackoff } from '@/lib/fetchWithBackoff'; import { generateInternalServiceToken } from '@/lib/tokens'; import type { User } from '@kilocode/db/schema'; @@ -62,9 +63,11 @@ export async function fetchSessionSnapshot( const token = generateInternalServiceToken(userId); const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}/export`; - const response = await fetch(url, { - headers: { Authorization: `Bearer ${token}` }, - }); + const response = await fetchWithBackoff( + url, + { headers: { Authorization: `Bearer ${token}` } }, + { maxDelayMs: 10_000 } + ); if (response.status === 404) { return null; @@ -125,10 +128,11 @@ export async function shareSession( const token = generateInternalServiceToken(userId); const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}/share`; - const response = await fetch(url, { - method: 'POST', - headers: { Authorization: `Bearer ${token}` }, - }); + const response = await fetchWithBackoff( + url, + { method: 'POST', headers: { Authorization: `Bearer ${token}` } }, + { maxDelayMs: 10_000 } + ); if (response.status === 404) { throw new Error('Session not found'); @@ -168,10 +172,11 @@ export async function deleteSession(sessionId: string, userId: string): Promise< const token = generateInternalServiceToken(userId); const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}`; - const response = await fetch(url, { - method: 'DELETE', - headers: { Authorization: `Bearer ${token}` }, - }); + const response = await fetchWithBackoff( + url, + { method: 'DELETE', headers: { Authorization: `Bearer ${token}` } }, + { maxDelayMs: 10_000 } + ); if (response.status === 404) { // Session already deleted or was never ingested — treat as success (idempotent delete). diff --git a/src/routers/cli-sessions-v2-router.test.ts b/src/routers/cli-sessions-v2-router.test.ts index bfdf2ffa4..96f885413 100644 --- a/src/routers/cli-sessions-v2-router.test.ts +++ b/src/routers/cli-sessions-v2-router.test.ts @@ -19,6 +19,19 @@ jest.mock('@/lib/config.server', () => { }; }); +const mockShareSession = jest.fn< + Promise<{ public_id: string }>, + [sessionId: string, userId: string] +>(); + +jest.mock('@/lib/session-ingest-client', () => { + const actual: Record = jest.requireActual('@/lib/session-ingest-client'); + return { + ...actual, + shareSession: (...args: [string, string]) => mockShareSession(...args), + }; +}); + let regularUser: User; let otherUser: User; let testOrganization: Organization; @@ -84,7 +97,6 @@ describe('cli-sessions-v2-router', () => { }); const v2SessionId = 'ses_test_share_v2_session_1234'; - let fetchSpy: jest.SpyInstance; beforeEach(async () => { await db.insert(cli_sessions_v2).values({ @@ -93,16 +105,11 @@ describe('cli-sessions-v2-router', () => { created_on_platform: 'webhook', }); - fetchSpy = jest.spyOn(global, 'fetch').mockResolvedValue( - new Response(JSON.stringify({ success: true, public_id: 'test-public-uuid' }), { - status: 200, - headers: { 'Content-Type': 'application/json' }, - }) - ); + mockShareSession.mockResolvedValue({ public_id: 'test-public-uuid' }); }); afterEach(async () => { - fetchSpy.mockRestore(); + mockShareSession.mockReset(); await db.delete(cli_sessions_v2).where(eq(cli_sessions_v2.session_id, v2SessionId)); }); @@ -119,13 +126,8 @@ describe('cli-sessions-v2-router', () => { session_id: v2SessionId, }); - expect(fetchSpy).toHaveBeenCalledTimes(1); - const [fetchUrl, fetchOpts] = fetchSpy.mock.calls[0]; - expect(fetchUrl).toBe( - `https://test-ingest.example.com/api/session/${encodeURIComponent(v2SessionId)}/share` - ); - expect(fetchOpts.method).toBe('POST'); - expect(fetchOpts.headers.Authorization).toMatch(/^Bearer .+/); + expect(mockShareSession).toHaveBeenCalledTimes(1); + expect(mockShareSession).toHaveBeenCalledWith(v2SessionId, regularUser.id); }); it('should throw NOT_FOUND for non-existent v2 session', async () => { @@ -142,11 +144,8 @@ describe('cli-sessions-v2-router', () => { }); it('should throw INTERNAL_SERVER_ERROR when session-ingest returns an error', async () => { - fetchSpy.mockResolvedValueOnce( - new Response('Internal Server Error', { - status: 500, - statusText: 'Internal Server Error', - }) + mockShareSession.mockRejectedValueOnce( + new Error('Session ingest share failed: 500 Internal Server Error') ); const caller = await createCallerForUser(regularUser.id); @@ -156,7 +155,7 @@ describe('cli-sessions-v2-router', () => { kilo_session_id: v2SessionId, trigger_id: testTriggerId, }) - ).rejects.toThrow('Session share failed: 500 Internal Server Error'); + ).rejects.toThrow('Failed to share session'); }); it('should throw NOT_FOUND when session belongs to a different user (personal trigger)', async () => { diff --git a/src/routers/cli-sessions-v2-router.ts b/src/routers/cli-sessions-v2-router.ts index 40d4c6c2e..d066bdff6 100644 --- a/src/routers/cli-sessions-v2-router.ts +++ b/src/routers/cli-sessions-v2-router.ts @@ -8,13 +8,12 @@ import { captureException } from '@sentry/nextjs'; import { TRPCClientError } from '@trpc/client'; import { cli_sessions_v2 } from '@kilocode/db/schema'; import { createCloudAgentNextClient } from '@/lib/cloud-agent-next/cloud-agent-client'; -import { generateApiToken, generateInternalServiceToken } from '@/lib/tokens'; +import { generateApiToken } from '@/lib/tokens'; import { fetchSessionMessages, deleteSession as deleteSessionIngest, shareSession as shareSessionIngest, } from '@/lib/session-ingest-client'; -import { SESSION_INGEST_WORKER_URL } from '@/lib/config.server'; import { baseGetSessionNextOutputSchema } from './cloud-agent-next-schemas'; import { sanitizeGitUrl } from '@/routers/cli-sessions-router'; import { verifyWebhookTriggerAccess } from '@/lib/webhook-trigger-ownership'; @@ -438,40 +437,18 @@ export const cliSessionsV2Router = createTRPCRouter({ }); } - if (!SESSION_INGEST_WORKER_URL) { - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: 'SESSION_INGEST_WORKER_URL is not configured', - }); - } - - const token = generateInternalServiceToken(session.kilo_user_id); - const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(input.kilo_session_id)}/share`; - - const response = await fetch(url, { - method: 'POST', - headers: { Authorization: `Bearer ${token}` }, - }); - - if (!response.ok) { - const errorText = await response.text().catch(() => ''); - throw new TRPCError({ - code: 'INTERNAL_SERVER_ERROR', - message: `Session share failed: ${response.status} ${response.statusText}${errorText ? ` - ${errorText}` : ''}`, - }); - } - - const shareResponseSchema = z.object({ public_id: z.string() }); - let body: z.infer; try { - body = shareResponseSchema.parse(await response.json()); - } catch { + const result = await shareSessionIngest(input.kilo_session_id, session.kilo_user_id); + return { share_id: result.public_id, session_id: input.kilo_session_id }; + } catch (error) { + if (error instanceof Error && error.message === 'Session not found') { + throw new TRPCError({ code: 'NOT_FOUND', message: 'Session not found' }); + } throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', - message: 'Session share succeeded but response was malformed', + message: 'Failed to share session', + cause: error, }); } - - return { share_id: body.public_id, session_id: input.kilo_session_id }; }), });