From 255e742b2cbd6cfecc86bd5504e449f4867e5ab3 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 7 Apr 2026 19:27:59 -0700 Subject: [PATCH 1/6] Queue messages during conversation compaction --- .../features/sessions/service/service.test.ts | 16 +++++++ .../features/sessions/service/service.ts | 45 ++++++++++++++++++- .../features/sessions/stores/sessionStore.ts | 1 + packages/agent/src/index.ts | 1 + 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 1bf42f202..3c38cee7f 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -235,6 +235,7 @@ const createMockSession = ( startedAt: Date.now(), status: "connected", isPromptPending: false, + isCompacting: false, promptStartedAt: null, pendingPermissions: new Map(), pausedDurationMs: 0, @@ -551,6 +552,21 @@ describe("SessionService", () => { ); }); + it("queues message when compaction is in progress", async () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ isCompacting: true }), + ); + + const result = await service.sendPrompt("task-123", "Hello"); + + expect(result.stopReason).toBe("queued"); + expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( + "task-123", + "Hello", + ); + }); + it("sends prompt via tRPC when session is ready", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 4d97a7868..7d7ae7feb 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -26,6 +26,7 @@ import { } from "@features/sessions/stores/sessionStore"; import { useSettingsStore } from "@features/settings/stores/settingsStore"; import { taskViewedApi } from "@features/sidebar/hooks/useTaskViewed"; +import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import { DEFAULT_GATEWAY_MODEL } from "@posthog/agent/gateway-models"; import { getIsOnline } from "@renderer/stores/connectivityStore"; import { trpcClient } from "@renderer/trpc/client"; @@ -495,6 +496,7 @@ export class SessionService { errorMessage: "Session disconnected due to inactivity. Click Retry to reconnect.", isPromptPending: false, + isCompacting: false, promptStartedAt: null, }); } @@ -831,7 +833,7 @@ export class SessionService { // Handle _posthog/sdk_session notifications for adapter info if ( "method" in msg && - msg.method === "_posthog/sdk_session" && + msg.method === POSTHOG_NOTIFICATIONS.SDK_SESSION && "params" in msg ) { const params = msg.params as { @@ -848,6 +850,41 @@ export class SessionService { }); } } + + if ( + "method" in msg && + "params" in msg && + msg.method === POSTHOG_NOTIFICATIONS.STATUS + ) { + const params = msg.params as { status?: string; isComplete?: boolean }; + if (params?.status === "compacting" && !params.isComplete) { + sessionStoreSetters.updateSession(taskRunId, { + isCompacting: true, + }); + } + } + + if ( + "method" in msg && + msg.method === POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY + ) { + sessionStoreSetters.updateSession(taskRunId, { + isCompacting: false, + }); + + const hasQueuedMessages = + session.messageQueue.length > 0 && session.status === "connected"; + if (hasQueuedMessages) { + setTimeout(() => { + this.sendQueuedMessages(session.taskId).catch((err) => { + log.error("Failed to send queued messages after compaction", { + taskId: session.taskId, + error: err, + }); + }); + }, 0); + } + } } private handlePermissionRequest( @@ -921,12 +958,13 @@ export class SessionService { throw new Error(`Session is not ready (status: ${session.status})`); } - if (session.isPromptPending) { + if (session.isPromptPending || session.isCompacting) { const promptText = extractPromptText(prompt); sessionStoreSetters.enqueueMessage(taskId, promptText); log.info("Message queued", { taskId, queueLength: session.messageQueue.length + 1, + reason: session.isCompacting ? "compacting" : "prompt_pending", }); return { stopReason: "queued" }; } @@ -1053,11 +1091,13 @@ export class SessionService { errorDetails || "Session connection lost. Please retry or start a new session.", isPromptPending: false, + isCompacting: false, promptStartedAt: null, }); } else { sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: false, + isCompacting: false, promptStartedAt: null, }); } @@ -2150,6 +2190,7 @@ export class SessionService { startedAt: Date.now(), status: "connecting", isPromptPending: false, + isCompacting: false, promptStartedAt: null, pendingPermissions: new Map(), pausedDurationMs: 0, diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index cd3fa84ee..882b018bb 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -47,6 +47,7 @@ export interface AgentSession { errorTitle?: string; errorMessage?: string; isPromptPending: boolean; + isCompacting: boolean; promptStartedAt: number | null; logUrl?: string; processedLineCount?: number; diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 3817c4909..300bb1edc 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -1,3 +1,4 @@ +export { POSTHOG_NOTIFICATIONS } from "./acp-extensions"; export { getMcpToolMetadata, isMcpToolReadOnly, From c7da9fdf8b1045ff2788bf00e5f971bd818098f5 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 7 Apr 2026 19:42:03 -0700 Subject: [PATCH 2/6] Use POSTHOG_NOTIFICATIONS constants --- apps/code/src/main/services/agent/service.ts | 4 +-- .../components/buildConversationItems.ts | 13 +++------ .../features/sessions/service/service.ts | 2 +- .../task-detail/utils/cloudToolChanges.ts | 7 ++--- packages/agent/src/acp-extensions.ts | 4 --- .../agent/src/adapters/claude/claude-agent.ts | 2 +- .../adapters/claude/conversion/sdk-to-acp.ts | 7 +++-- packages/agent/src/sagas/resume-saga.test.ts | 29 ------------------- packages/agent/src/sagas/resume-saga.ts | 8 +---- 9 files changed, 15 insertions(+), 61 deletions(-) diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index e5bcf5cd8..3a8724fcc 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -12,7 +12,7 @@ import { type SessionConfigOption, type SessionNotification, } from "@agentclientprotocol/sdk"; -import { isMcpToolReadOnly } from "@posthog/agent"; +import { isMcpToolReadOnly, POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import { hydrateSessionJsonl } from "@posthog/agent/adapters/claude/session/jsonl-hydration"; import { getEffortOptions } from "@posthog/agent/adapters/claude/session/models"; import { Agent } from "@posthog/agent/agent"; @@ -1350,7 +1350,7 @@ For git operations while detached: method: string, params: Record, ): Promise => { - if (method === "_posthog/sdk_session") { + if (method === POSTHOG_NOTIFICATIONS.SDK_SESSION) { const { taskRunId: notifTaskRunId, sessionId, diff --git a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts index 9f6002c1f..88532e89d 100644 --- a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts +++ b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts @@ -4,6 +4,7 @@ import type { } from "@agentclientprotocol/sdk"; import type { QueuedMessage } from "@features/sessions/stores/sessionStore"; import type { SessionUpdate, ToolCall } from "@features/sessions/types"; +import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import { type AcpMessage, isJsonRpcNotification, @@ -282,12 +283,6 @@ function handlePromptResponse( b.pendingPrompts.delete(msg.id); } -/** Check if a method matches a PostHog notification name, accounting for - * the SDK sometimes double-prefixing (`__posthog/` instead of `_posthog/`). */ -function isPosthogMethod(method: string, name: string): boolean { - return method === `_posthog/${name}` || method === `__posthog/${name}`; -} - function handleNotification( b: ItemBuilder, msg: { method: string; params?: unknown }, @@ -323,7 +318,7 @@ function handleNotification( return; } - if (isPosthogMethod(msg.method, "console")) { + if (msg.method === POSTHOG_NOTIFICATIONS.CONSOLE) { if (!b.currentTurn) { ensureImplicitTurn(b, ts); } @@ -339,7 +334,7 @@ function handleNotification( return; } - if (isPosthogMethod(msg.method, "compact_boundary")) { + if (msg.method === POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY) { if (!b.currentTurn) ensureImplicitTurn(b, ts); const params = msg.params as { trigger: "manual" | "auto"; @@ -356,7 +351,7 @@ function handleNotification( return; } - if (isPosthogMethod(msg.method, "status")) { + if (msg.method === POSTHOG_NOTIFICATIONS.STATUS) { if (!b.currentTurn) ensureImplicitTurn(b, ts); const params = msg.params as { status: string; isComplete?: boolean }; if (params.status === "compacting" && !params.isComplete) { diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 7d7ae7feb..0276a8dd0 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -830,7 +830,7 @@ export class SessionService { } } - // Handle _posthog/sdk_session notifications for adapter info + // Handle SDK_SESSION notifications for adapter info if ( "method" in msg && msg.method === POSTHOG_NOTIFICATIONS.SDK_SESSION && diff --git a/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts b/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts index f72fca800..b89097aaa 100644 --- a/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts +++ b/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts @@ -2,6 +2,7 @@ import type { ToolCallContent, ToolCallLocation, } from "@features/sessions/types"; +import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import type { ChangedFile, GitFileStatus } from "@shared/types"; import { type AcpMessage, @@ -169,7 +170,7 @@ export function buildCloudEventSummary( const merged = mergeToolCall(toolCalls.get(toolCallId), patch); toolCalls.set(toolCallId, merged); - } else if (isPosthogMethod(message.method, "tree_snapshot")) { + } else if (message.method === POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT) { const params = message.params as | { changes?: Array<{ path: string; status: "A" | "M" | "D" }>; @@ -229,10 +230,6 @@ export function extractCloudFileDiff( }; } -function isPosthogMethod(method: string, name: string): boolean { - return method === `_posthog/${name}` || method === `__posthog/${name}`; -} - export function extractCloudToolChangedFiles( toolCalls: Map, ): ChangedFile[] { diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index ef43cce4c..9f3404f02 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -5,10 +5,6 @@ * - Custom notification methods are prefixed with `_posthog/` * - Custom data can be attached via `_meta` fields * - * Note: When using `extNotification()` from the ACP SDK, it automatically - * adds an extra underscore prefix (e.g., `_posthog/tree_snapshot` becomes - * `__posthog/tree_snapshot` in the log). Code that reads logs should handle both. - * * See: https://agentclientprotocol.com/docs/extensibility */ diff --git a/packages/agent/src/adapters/claude/claude-agent.ts b/packages/agent/src/adapters/claude/claude-agent.ts index 4679ae2bd..6774b73a6 100644 --- a/packages/agent/src/adapters/claude/claude-agent.ts +++ b/packages/agent/src/adapters/claude/claude-agent.ts @@ -931,7 +931,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { ), ...(meta?.taskRunId ? [ - this.client.extNotification("_posthog/sdk_session", { + this.client.extNotification(POSTHOG_NOTIFICATIONS.SDK_SESSION, { taskRunId: meta.taskRunId, sessionId, adapter: "claude", diff --git a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts index 938e7704c..15471b63a 100644 --- a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts +++ b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts @@ -17,6 +17,7 @@ import type { BetaContentBlock, BetaRawContentBlockDelta, } from "@anthropic-ai/sdk/resources/beta.mjs"; +import { POSTHOG_NOTIFICATIONS } from "@/index"; import { image, text } from "../../../utils/acp-content"; import { unreachable } from "../../../utils/common"; import type { Logger } from "../../../utils/logger"; @@ -550,7 +551,7 @@ export async function handleSystemMessage( case "init": break; case "compact_boundary": - await client.extNotification("_posthog/compact_boundary", { + await client.extNotification(POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY, { sessionId, trigger: message.compact_metadata.trigger, preTokens: message.compact_metadata.pre_tokens, @@ -566,7 +567,7 @@ export async function handleSystemMessage( case "status": if (message.status === "compacting") { logger.info("Session compacting started", { sessionId }); - await client.extNotification("_posthog/status", { + await client.extNotification(POSTHOG_NOTIFICATIONS.STATUS, { sessionId, status: "compacting", }); @@ -579,7 +580,7 @@ export async function handleSystemMessage( status: message.status, summary: message.summary, }); - await client.extNotification("_posthog/task_notification", { + await client.extNotification(POSTHOG_NOTIFICATIONS.TASK_NOTIFICATION, { sessionId, taskId: message.task_id, status: message.status, diff --git a/packages/agent/src/sagas/resume-saga.test.ts b/packages/agent/src/sagas/resume-saga.test.ts index f37509202..b924adbb3 100644 --- a/packages/agent/src/sagas/resume-saga.test.ts +++ b/packages/agent/src/sagas/resume-saga.test.ts @@ -441,35 +441,6 @@ describe("ResumeSaga", () => { expect(result.data.latestSnapshot?.treeHash).toBe("hash-2"); }); - it("finds snapshot with SDK double-underscore prefix", async () => { - (mockApiClient.getTaskRun as ReturnType).mockResolvedValue( - createTaskRun(), - ); - ( - mockApiClient.fetchTaskRunLogs as ReturnType - ).mockResolvedValue([ - createNotification(`_${POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT}`, { - treeHash: "sdk-prefixed-hash", - baseCommit: "abc", - changes: [], - timestamp: new Date().toISOString(), - }), - ]); - - const saga = new ResumeSaga(mockLogger); - const result = await saga.run({ - taskId: "task-1", - runId: "run-1", - repositoryPath: repo.path, - apiClient: mockApiClient, - }); - - expect(result.success).toBe(true); - if (!result.success) return; - - expect(result.data.latestSnapshot?.treeHash).toBe("sdk-prefixed-hash"); - }); - it("returns interrupted flag from snapshot", async () => { (mockApiClient.getTaskRun as ReturnType).mockResolvedValue( createTaskRun(), diff --git a/packages/agent/src/sagas/resume-saga.ts b/packages/agent/src/sagas/resume-saga.ts index cc76e1bc3..760d4a32a 100644 --- a/packages/agent/src/sagas/resume-saga.ts +++ b/packages/agent/src/sagas/resume-saga.ts @@ -178,15 +178,9 @@ export class ResumeSaga extends Saga { private findLatestTreeSnapshot( entries: StoredNotification[], ): TreeSnapshotEvent | null { - const sdkPrefixedMethod = `_${POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT}`; - for (let i = entries.length - 1; i >= 0; i--) { const entry = entries[i]; - const method = entry.notification?.method; - if ( - method === sdkPrefixedMethod || - method === POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT - ) { + if (entry.notification?.method === POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT) { const params = entry.notification.params as | TreeSnapshotEvent | undefined; From 0bb2a007e63b139502ec27fe5aabb7f49a3cb135 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 7 Apr 2026 19:47:55 -0700 Subject: [PATCH 3/6] use isNotification() for all PostHog method comparisons --- apps/code/src/main/services/agent/service.ts | 8 ++++++-- .../sessions/components/buildConversationItems.ts | 10 ++++++---- .../renderer/features/sessions/service/service.ts | 11 +++++++---- .../features/task-detail/utils/cloudToolChanges.ts | 9 +++++++-- packages/agent/src/acp-extensions.ts | 14 ++++++++++++++ packages/agent/src/index.ts | 2 +- packages/agent/src/sagas/resume-saga.ts | 9 +++++++-- 7 files changed, 48 insertions(+), 15 deletions(-) diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index 3a8724fcc..5ae20bf0b 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -12,7 +12,11 @@ import { type SessionConfigOption, type SessionNotification, } from "@agentclientprotocol/sdk"; -import { isMcpToolReadOnly, POSTHOG_NOTIFICATIONS } from "@posthog/agent"; +import { + isMcpToolReadOnly, + isNotification, + POSTHOG_NOTIFICATIONS, +} from "@posthog/agent"; import { hydrateSessionJsonl } from "@posthog/agent/adapters/claude/session/jsonl-hydration"; import { getEffortOptions } from "@posthog/agent/adapters/claude/session/models"; import { Agent } from "@posthog/agent/agent"; @@ -1350,7 +1354,7 @@ For git operations while detached: method: string, params: Record, ): Promise => { - if (method === POSTHOG_NOTIFICATIONS.SDK_SESSION) { + if (isNotification(method, POSTHOG_NOTIFICATIONS.SDK_SESSION)) { const { taskRunId: notifTaskRunId, sessionId, diff --git a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts index 88532e89d..88607a8d3 100644 --- a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts +++ b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts @@ -4,7 +4,7 @@ import type { } from "@agentclientprotocol/sdk"; import type { QueuedMessage } from "@features/sessions/stores/sessionStore"; import type { SessionUpdate, ToolCall } from "@features/sessions/types"; -import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; +import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import { type AcpMessage, isJsonRpcNotification, @@ -318,7 +318,7 @@ function handleNotification( return; } - if (msg.method === POSTHOG_NOTIFICATIONS.CONSOLE) { + if (isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.CONSOLE)) { if (!b.currentTurn) { ensureImplicitTurn(b, ts); } @@ -334,7 +334,9 @@ function handleNotification( return; } - if (msg.method === POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY) { + if ( + isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY) + ) { if (!b.currentTurn) ensureImplicitTurn(b, ts); const params = msg.params as { trigger: "manual" | "auto"; @@ -351,7 +353,7 @@ function handleNotification( return; } - if (msg.method === POSTHOG_NOTIFICATIONS.STATUS) { + if (isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.STATUS)) { if (!b.currentTurn) ensureImplicitTurn(b, ts); const params = msg.params as { status: string; isComplete?: boolean }; if (params.status === "compacting" && !params.isComplete) { diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 0276a8dd0..87d7c75a6 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -26,7 +26,7 @@ import { } from "@features/sessions/stores/sessionStore"; import { useSettingsStore } from "@features/settings/stores/settingsStore"; import { taskViewedApi } from "@features/sidebar/hooks/useTaskViewed"; -import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; +import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import { DEFAULT_GATEWAY_MODEL } from "@posthog/agent/gateway-models"; import { getIsOnline } from "@renderer/stores/connectivityStore"; import { trpcClient } from "@renderer/trpc/client"; @@ -833,7 +833,7 @@ export class SessionService { // Handle SDK_SESSION notifications for adapter info if ( "method" in msg && - msg.method === POSTHOG_NOTIFICATIONS.SDK_SESSION && + isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.SDK_SESSION) && "params" in msg ) { const params = msg.params as { @@ -854,7 +854,7 @@ export class SessionService { if ( "method" in msg && "params" in msg && - msg.method === POSTHOG_NOTIFICATIONS.STATUS + isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.STATUS) ) { const params = msg.params as { status?: string; isComplete?: boolean }; if (params?.status === "compacting" && !params.isComplete) { @@ -866,7 +866,10 @@ export class SessionService { if ( "method" in msg && - msg.method === POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY + isNotification( + msg.method as string, + POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY, + ) ) { sessionStoreSetters.updateSession(taskRunId, { isCompacting: false, diff --git a/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts b/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts index b89097aaa..cb869c1cb 100644 --- a/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts +++ b/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts @@ -2,7 +2,7 @@ import type { ToolCallContent, ToolCallLocation, } from "@features/sessions/types"; -import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; +import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import type { ChangedFile, GitFileStatus } from "@shared/types"; import { type AcpMessage, @@ -170,7 +170,12 @@ export function buildCloudEventSummary( const merged = mergeToolCall(toolCalls.get(toolCallId), patch); toolCalls.set(toolCallId, merged); - } else if (message.method === POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT) { + } else if ( + isNotification( + message.method as string, + POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, + ) + ) { const params = message.params as | { changes?: Array<{ path: string; status: "A" | "M" | "D" }>; diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index 9f3404f02..915844bb8 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -64,3 +64,17 @@ export const POSTHOG_NOTIFICATIONS = { /** Token usage update for a session turn */ USAGE_UPDATE: "_posthog/usage_update", } as const; + +type NotificationMethod = + (typeof POSTHOG_NOTIFICATIONS)[keyof typeof POSTHOG_NOTIFICATIONS]; + +/** + * Check if an ACP method matches a PostHog notification, handling the + * possible `__posthog/` double-prefix from extNotification(). + */ +export function isNotification( + method: string, + notification: NotificationMethod, +): boolean { + return method === notification || method === `_${notification}`; +} diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 300bb1edc..05dc8bae5 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -1,4 +1,4 @@ -export { POSTHOG_NOTIFICATIONS } from "./acp-extensions"; +export { isNotification, POSTHOG_NOTIFICATIONS } from "./acp-extensions"; export { getMcpToolMetadata, isMcpToolReadOnly, diff --git a/packages/agent/src/sagas/resume-saga.ts b/packages/agent/src/sagas/resume-saga.ts index 760d4a32a..e90d35a87 100644 --- a/packages/agent/src/sagas/resume-saga.ts +++ b/packages/agent/src/sagas/resume-saga.ts @@ -1,6 +1,6 @@ import type { ContentBlock } from "@agentclientprotocol/sdk"; import { Saga } from "@posthog/shared"; -import { POSTHOG_NOTIFICATIONS } from "../acp-extensions"; +import { isNotification, POSTHOG_NOTIFICATIONS } from "../acp-extensions"; import type { PostHogAPIClient } from "../posthog-api"; import { TreeTracker } from "../tree-tracker"; import type { @@ -180,7 +180,12 @@ export class ResumeSaga extends Saga { ): TreeSnapshotEvent | null { for (let i = entries.length - 1; i >= 0; i--) { const entry = entries[i]; - if (entry.notification?.method === POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT) { + if ( + isNotification( + entry.notification?.method ?? "", + POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, + ) + ) { const params = entry.notification.params as | TreeSnapshotEvent | undefined; From cd74b5d5beec2ef186915406c4dffb9e319d0418 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 7 Apr 2026 21:09:54 -0700 Subject: [PATCH 4/6] Fix stale session refs, as-string casts and import path --- .../components/buildConversationItems.ts | 8 +++---- .../features/sessions/service/service.ts | 23 +++++++++++-------- .../task-detail/utils/cloudToolChanges.ts | 5 +--- packages/agent/src/acp-extensions.ts | 3 ++- .../adapters/claude/conversion/sdk-to-acp.ts | 2 +- 5 files changed, 20 insertions(+), 21 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts index 88607a8d3..38f442e42 100644 --- a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts +++ b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts @@ -318,7 +318,7 @@ function handleNotification( return; } - if (isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.CONSOLE)) { + if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.CONSOLE)) { if (!b.currentTurn) { ensureImplicitTurn(b, ts); } @@ -334,9 +334,7 @@ function handleNotification( return; } - if ( - isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY) - ) { + if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY)) { if (!b.currentTurn) ensureImplicitTurn(b, ts); const params = msg.params as { trigger: "manual" | "auto"; @@ -353,7 +351,7 @@ function handleNotification( return; } - if (isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.STATUS)) { + if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.STATUS)) { if (!b.currentTurn) ensureImplicitTurn(b, ts); const params = msg.params as { status: string; isComplete?: boolean }; if (params.status === "compacting" && !params.isComplete) { diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 87d7c75a6..c2fa3a472 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -767,8 +767,11 @@ export class SessionService { "stopReason" in msg.result ) { const stopReason = (msg.result as { stopReason?: string }).stopReason; + const freshSession = sessionStoreSetters.getSessions()[taskRunId]; const hasQueuedMessages = - session.messageQueue.length > 0 && session.status === "connected"; + freshSession && + freshSession.messageQueue.length > 0 && + freshSession.status === "connected"; // Only notify when queue is empty - queued messages will start a new turn if (stopReason && !hasQueuedMessages) { @@ -833,7 +836,7 @@ export class SessionService { // Handle SDK_SESSION notifications for adapter info if ( "method" in msg && - isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.SDK_SESSION) && + isNotification(msg.method, POSTHOG_NOTIFICATIONS.SDK_SESSION) && "params" in msg ) { const params = msg.params as { @@ -854,29 +857,29 @@ export class SessionService { if ( "method" in msg && "params" in msg && - isNotification(msg.method as string, POSTHOG_NOTIFICATIONS.STATUS) + isNotification(msg.method, POSTHOG_NOTIFICATIONS.STATUS) ) { const params = msg.params as { status?: string; isComplete?: boolean }; - if (params?.status === "compacting" && !params.isComplete) { + if (params?.status === "compacting") { sessionStoreSetters.updateSession(taskRunId, { - isCompacting: true, + isCompacting: !params.isComplete, }); } } if ( "method" in msg && - isNotification( - msg.method as string, - POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY, - ) + isNotification(msg.method, POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY) ) { sessionStoreSetters.updateSession(taskRunId, { isCompacting: false, }); + const freshSession = sessionStoreSetters.getSessions()[taskRunId]; const hasQueuedMessages = - session.messageQueue.length > 0 && session.status === "connected"; + freshSession && + freshSession.messageQueue.length > 0 && + freshSession.status === "connected"; if (hasQueuedMessages) { setTimeout(() => { this.sendQueuedMessages(session.taskId).catch((err) => { diff --git a/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts b/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts index cb869c1cb..f4e7f1430 100644 --- a/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts +++ b/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts @@ -171,10 +171,7 @@ export function buildCloudEventSummary( const merged = mergeToolCall(toolCalls.get(toolCallId), patch); toolCalls.set(toolCallId, merged); } else if ( - isNotification( - message.method as string, - POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, - ) + isNotification(message.method, POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT) ) { const params = message.params as | { diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index 915844bb8..62a2a1083 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -73,8 +73,9 @@ type NotificationMethod = * possible `__posthog/` double-prefix from extNotification(). */ export function isNotification( - method: string, + method: string | undefined, notification: NotificationMethod, ): boolean { + if (!method) return false; return method === notification || method === `_${notification}`; } diff --git a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts index 15471b63a..cf0e7b4d3 100644 --- a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts +++ b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts @@ -17,7 +17,7 @@ import type { BetaContentBlock, BetaRawContentBlockDelta, } from "@anthropic-ai/sdk/resources/beta.mjs"; -import { POSTHOG_NOTIFICATIONS } from "@/index"; +import { POSTHOG_NOTIFICATIONS } from "@/acp-extensions"; import { image, text } from "../../../utils/acp-content"; import { unreachable } from "../../../utils/common"; import type { Logger } from "../../../utils/logger"; From 9b444578552324c6b4f2a5562b689defca062f92 Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Tue, 7 Apr 2026 21:11:51 -0700 Subject: [PATCH 5/6] Remove redundant ?? "" fallback --- packages/agent/src/sagas/resume-saga.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/agent/src/sagas/resume-saga.ts b/packages/agent/src/sagas/resume-saga.ts index e90d35a87..c363020f2 100644 --- a/packages/agent/src/sagas/resume-saga.ts +++ b/packages/agent/src/sagas/resume-saga.ts @@ -182,7 +182,7 @@ export class ResumeSaga extends Saga { const entry = entries[i]; if ( isNotification( - entry.notification?.method ?? "", + entry.notification?.method, POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, ) ) { From 2e603eade6c2f40f9ab35cd108e76915dbcc968d Mon Sep 17 00:00:00 2001 From: Charles Vien Date: Wed, 8 Apr 2026 15:49:20 -0700 Subject: [PATCH 6/6] Extract drainQueuedMessages helper to deduplicate queue logic --- .../features/sessions/service/service.ts | 56 +++++++++---------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index c2fa3a472..faca70e45 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -767,11 +767,7 @@ export class SessionService { "stopReason" in msg.result ) { const stopReason = (msg.result as { stopReason?: string }).stopReason; - const freshSession = sessionStoreSetters.getSessions()[taskRunId]; - const hasQueuedMessages = - freshSession && - freshSession.messageQueue.length > 0 && - freshSession.status === "connected"; + const hasQueuedMessages = this.drainQueuedMessages(taskRunId, session); // Only notify when queue is empty - queued messages will start a new turn if (stopReason && !hasQueuedMessages) { @@ -779,18 +775,6 @@ export class SessionService { } taskViewedApi.markActivity(session.taskId); - - // Process queued messages after turn completes - send all as one prompt - if (hasQueuedMessages) { - setTimeout(() => { - this.sendQueuedMessages(session.taskId).catch((err) => { - log.error("Failed to send queued messages", { - taskId: session.taskId, - error: err, - }); - }); - }, 0); - } } if ("method" in msg && msg.method === "session/update" && "params" in msg) { @@ -875,22 +859,32 @@ export class SessionService { isCompacting: false, }); - const freshSession = sessionStoreSetters.getSessions()[taskRunId]; - const hasQueuedMessages = - freshSession && - freshSession.messageQueue.length > 0 && - freshSession.status === "connected"; - if (hasQueuedMessages) { - setTimeout(() => { - this.sendQueuedMessages(session.taskId).catch((err) => { - log.error("Failed to send queued messages after compaction", { - taskId: session.taskId, - error: err, - }); + this.drainQueuedMessages(taskRunId, session); + } + } + + private drainQueuedMessages( + taskRunId: string, + session: AgentSession, + ): boolean { + const freshSession = sessionStoreSetters.getSessions()[taskRunId]; + const hasQueuedMessages = + freshSession && + freshSession.messageQueue.length > 0 && + freshSession.status === "connected"; + + if (hasQueuedMessages) { + setTimeout(() => { + this.sendQueuedMessages(session.taskId).catch((err) => { + log.error("Failed to send queued messages", { + taskId: session.taskId, + error: err, }); - }, 0); - } + }); + }, 0); } + + return hasQueuedMessages; } private handlePermissionRequest(