Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lib/fetchWithBackoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export async function fetchWithBackoff(
);
return response;
}
// Drain the body of retried responses to free the underlying socket/buffer.
response.body?.cancel().catch(() => {});
} catch (err) {
if (hasElapsed()) {
captureException(err, {
Expand Down
88 changes: 88 additions & 0 deletions src/lib/session-ingest-client.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { captureException } from '@sentry/nextjs';
import type { fetchWithBackoff as fetchWithBackoffType } from '@/lib/fetchWithBackoff';
import { generateInternalServiceToken } from '@/lib/tokens';
import type { SessionSnapshot } from './session-ingest-client';
import {
Expand All @@ -24,11 +25,21 @@ jest.mock('@/lib/tokens', () => ({
generateInternalServiceToken: jest.fn().mockReturnValue('mock-jwt-token'),
}));

// Passthrough to global.fetch so existing single-response mocks keep working.
jest.mock('@/lib/fetchWithBackoff', () => ({
fetchWithBackoff: jest.fn((input: RequestInfo | URL, init?: RequestInit) =>
global.fetch(input, init)
),
}));

const mockFetch = jest.fn();
global.fetch = mockFetch;

const mockCaptureException = jest.mocked(captureException);
const mockGenerateInternalServiceToken = jest.mocked(generateInternalServiceToken);
const mockFetchWithBackoff = jest.mocked(
jest.requireMock<{ fetchWithBackoff: typeof fetchWithBackoffType }>('@/lib/fetchWithBackoff')
).fetchWithBackoff;

// ---------------------------------------------------------------------------
// Helpers
Expand Down Expand Up @@ -174,6 +185,83 @@ describe('fetchSessionSnapshot', () => {
});
});

// ---------------------------------------------------------------------------
// fetchSessionSnapshot — retry integration (real fetchWithBackoff)
// ---------------------------------------------------------------------------

describe('fetchSessionSnapshot retry integration', () => {
beforeEach(() => {
mockFetch.mockReset();
mockCaptureException.mockReset();
mockGenerateInternalServiceToken.mockReset().mockReturnValue('mock-jwt-token');

// Swap to the real fetchWithBackoff for this describe block.
const real = jest.requireActual<{ fetchWithBackoff: typeof fetchWithBackoffType }>(
'@/lib/fetchWithBackoff'
);
mockFetchWithBackoff.mockImplementation(real.fetchWithBackoff);
});

afterEach(() => {
mockFetchWithBackoff.mockImplementation((input: RequestInfo | URL, init?: RequestInit) =>
global.fetch(input, init)
);
});

it('succeeds after a transient 500 without reporting to Sentry', async () => {
const snapshot = makeSnapshot([
{ role: 'assistant', parts: [{ type: 'text', text: 'hello' }] },
]);

// First call: transient 500. Second call: success.
mockFetch
.mockResolvedValueOnce({
ok: false,
status: 500,
statusText: 'Internal Server Error',
text: () => Promise.resolve('transient'),
})
.mockResolvedValueOnce({
ok: true,
status: 200,
json: () => Promise.resolve(snapshot),
});

const result = await fetchSessionSnapshot('ses_retry', 'user_123');

expect(result).toEqual(snapshot);
expect(mockFetch).toHaveBeenCalledTimes(2);
// captureException should NOT have been called — the retry recovered.
expect(mockCaptureException).not.toHaveBeenCalled();
});

it('reports to Sentry only after retries are exhausted', async () => {
// Always return 500 — fetchWithBackoff will exhaust its 10s budget.
mockFetch.mockResolvedValue({
ok: false,
status: 500,
statusText: 'Internal Server Error',
text: () => Promise.resolve('persistent failure'),
});

await expect(fetchSessionSnapshot('ses_exhaust', 'user_123')).rejects.toThrow(
'Session ingest export failed: 500 Internal Server Error - persistent failure'
);

// Sentry should be called exactly once, after all retries failed.
expect(mockCaptureException).toHaveBeenCalledTimes(1);
expect(mockCaptureException).toHaveBeenCalledWith(
expect.any(Error),
expect.objectContaining({
tags: { source: 'session-ingest-client', endpoint: 'export' },
extra: { sessionId: 'ses_exhaust', status: 500 },
})
);
// fetchWithBackoff should have retried multiple times.
expect(mockFetch.mock.calls.length).toBeGreaterThan(1);
}, 15_000);
});

// ---------------------------------------------------------------------------
// deleteSession
// ---------------------------------------------------------------------------
Expand Down
27 changes: 16 additions & 11 deletions src/lib/session-ingest-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'server-only';
import { captureException } from '@sentry/nextjs';
import { z } from 'zod';
import { SESSION_INGEST_WORKER_URL } from '@/lib/config.server';
import { fetchWithBackoff } from '@/lib/fetchWithBackoff';
import { generateInternalServiceToken } from '@/lib/tokens';
import type { User } from '@kilocode/db/schema';

Expand Down Expand Up @@ -62,9 +63,11 @@ export async function fetchSessionSnapshot(
const token = generateInternalServiceToken(userId);
const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}/export`;

const response = await fetch(url, {
headers: { Authorization: `Bearer ${token}` },
});
const response = await fetchWithBackoff(
url,
{ headers: { Authorization: `Bearer ${token}` } },
{ maxDelayMs: 10_000 }
);

if (response.status === 404) {
return null;
Expand Down Expand Up @@ -125,10 +128,11 @@ export async function shareSession(
const token = generateInternalServiceToken(userId);
const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}/share`;

const response = await fetch(url, {
method: 'POST',
headers: { Authorization: `Bearer ${token}` },
});
const response = await fetchWithBackoff(
url,
{ method: 'POST', headers: { Authorization: `Bearer ${token}` } },
{ maxDelayMs: 10_000 }
);

if (response.status === 404) {
throw new Error('Session not found');
Expand Down Expand Up @@ -168,10 +172,11 @@ export async function deleteSession(sessionId: string, userId: string): Promise<
const token = generateInternalServiceToken(userId);
const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(sessionId)}`;

const response = await fetch(url, {
method: 'DELETE',
headers: { Authorization: `Bearer ${token}` },
});
const response = await fetchWithBackoff(
url,
{ method: 'DELETE', headers: { Authorization: `Bearer ${token}` } },
{ maxDelayMs: 10_000 }
);

if (response.status === 404) {
// Session already deleted or was never ingested — treat as success (idempotent delete).
Expand Down
41 changes: 20 additions & 21 deletions src/routers/cli-sessions-v2-router.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ jest.mock('@/lib/config.server', () => {
};
});

const mockShareSession = jest.fn<
Promise<{ public_id: string }>,
[sessionId: string, userId: string]
>();

jest.mock('@/lib/session-ingest-client', () => {
const actual: Record<string, unknown> = jest.requireActual('@/lib/session-ingest-client');
return {
...actual,
shareSession: (...args: [string, string]) => mockShareSession(...args),
};
});

let regularUser: User;
let otherUser: User;
let testOrganization: Organization;
Expand Down Expand Up @@ -84,7 +97,6 @@ describe('cli-sessions-v2-router', () => {
});

const v2SessionId = 'ses_test_share_v2_session_1234';
let fetchSpy: jest.SpyInstance;

beforeEach(async () => {
await db.insert(cli_sessions_v2).values({
Expand All @@ -93,16 +105,11 @@ describe('cli-sessions-v2-router', () => {
created_on_platform: 'webhook',
});

fetchSpy = jest.spyOn(global, 'fetch').mockResolvedValue(
new Response(JSON.stringify({ success: true, public_id: 'test-public-uuid' }), {
status: 200,
headers: { 'Content-Type': 'application/json' },
})
);
mockShareSession.mockResolvedValue({ public_id: 'test-public-uuid' });
});

afterEach(async () => {
fetchSpy.mockRestore();
mockShareSession.mockReset();
await db.delete(cli_sessions_v2).where(eq(cli_sessions_v2.session_id, v2SessionId));
});

Expand All @@ -119,13 +126,8 @@ describe('cli-sessions-v2-router', () => {
session_id: v2SessionId,
});

expect(fetchSpy).toHaveBeenCalledTimes(1);
const [fetchUrl, fetchOpts] = fetchSpy.mock.calls[0];
expect(fetchUrl).toBe(
`https://test-ingest.example.com/api/session/${encodeURIComponent(v2SessionId)}/share`
);
expect(fetchOpts.method).toBe('POST');
expect(fetchOpts.headers.Authorization).toMatch(/^Bearer .+/);
expect(mockShareSession).toHaveBeenCalledTimes(1);
expect(mockShareSession).toHaveBeenCalledWith(v2SessionId, regularUser.id);
});

it('should throw NOT_FOUND for non-existent v2 session', async () => {
Expand All @@ -142,11 +144,8 @@ describe('cli-sessions-v2-router', () => {
});

it('should throw INTERNAL_SERVER_ERROR when session-ingest returns an error', async () => {
fetchSpy.mockResolvedValueOnce(
new Response('Internal Server Error', {
status: 500,
statusText: 'Internal Server Error',
})
mockShareSession.mockRejectedValueOnce(
new Error('Session ingest share failed: 500 Internal Server Error')
);

const caller = await createCallerForUser(regularUser.id);
Expand All @@ -156,7 +155,7 @@ describe('cli-sessions-v2-router', () => {
kilo_session_id: v2SessionId,
trigger_id: testTriggerId,
})
).rejects.toThrow('Session share failed: 500 Internal Server Error');
).rejects.toThrow('Failed to share session');
});

it('should throw NOT_FOUND when session belongs to a different user (personal trigger)', async () => {
Expand Down
41 changes: 9 additions & 32 deletions src/routers/cli-sessions-v2-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import { captureException } from '@sentry/nextjs';
import { TRPCClientError } from '@trpc/client';
import { cli_sessions_v2 } from '@kilocode/db/schema';
import { createCloudAgentNextClient } from '@/lib/cloud-agent-next/cloud-agent-client';
import { generateApiToken, generateInternalServiceToken } from '@/lib/tokens';
import { generateApiToken } from '@/lib/tokens';
import {
fetchSessionMessages,
deleteSession as deleteSessionIngest,
shareSession as shareSessionIngest,
} from '@/lib/session-ingest-client';
import { SESSION_INGEST_WORKER_URL } from '@/lib/config.server';
import { baseGetSessionNextOutputSchema } from './cloud-agent-next-schemas';
import { sanitizeGitUrl } from '@/routers/cli-sessions-router';
import { verifyWebhookTriggerAccess } from '@/lib/webhook-trigger-ownership';
Expand Down Expand Up @@ -438,40 +437,18 @@ export const cliSessionsV2Router = createTRPCRouter({
});
}

if (!SESSION_INGEST_WORKER_URL) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'SESSION_INGEST_WORKER_URL is not configured',
});
}

const token = generateInternalServiceToken(session.kilo_user_id);
const url = `${SESSION_INGEST_WORKER_URL}/api/session/${encodeURIComponent(input.kilo_session_id)}/share`;

const response = await fetch(url, {
method: 'POST',
headers: { Authorization: `Bearer ${token}` },
});

if (!response.ok) {
const errorText = await response.text().catch(() => '');
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: `Session share failed: ${response.status} ${response.statusText}${errorText ? ` - ${errorText}` : ''}`,
});
}

const shareResponseSchema = z.object({ public_id: z.string() });
let body: z.infer<typeof shareResponseSchema>;
try {
body = shareResponseSchema.parse(await response.json());
} catch {
const result = await shareSessionIngest(input.kilo_session_id, session.kilo_user_id);
return { share_id: result.public_id, session_id: input.kilo_session_id };
} catch (error) {
if (error instanceof Error && error.message === 'Session not found') {
throw new TRPCError({ code: 'NOT_FOUND', message: 'Session not found' });
}
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Session share succeeded but response was malformed',
message: 'Failed to share session',
cause: error,
});
}

return { share_id: body.public_id, session_id: input.kilo_session_id };
}),
});
Loading