Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions apps/code/src/main/services/agent/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import {
type SessionConfigOption,
type SessionNotification,
} from "@agentclientprotocol/sdk";
import { isMcpToolReadOnly } from "@posthog/agent";
import {
isMcpToolReadOnly,
isNotification,
POSTHOG_NOTIFICATIONS,
} from "@posthog/agent";
import { hydrateSessionJsonl } from "@posthog/agent/adapters/claude/session/jsonl-hydration";
import { getEffortOptions } from "@posthog/agent/adapters/claude/session/models";
import { Agent } from "@posthog/agent/agent";
Expand Down Expand Up @@ -1350,7 +1354,7 @@ For git operations while detached:
method: string,
params: Record<string, unknown>,
): Promise<void> => {
if (method === "_posthog/sdk_session") {
if (isNotification(method, POSTHOG_NOTIFICATIONS.SDK_SESSION)) {
const {
taskRunId: notifTaskRunId,
sessionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
} from "@agentclientprotocol/sdk";
import type { QueuedMessage } from "@features/sessions/stores/sessionStore";
import type { SessionUpdate, ToolCall } from "@features/sessions/types";
import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent";
import {
type AcpMessage,
isJsonRpcNotification,
Expand Down Expand Up @@ -282,12 +283,6 @@ function handlePromptResponse(
b.pendingPrompts.delete(msg.id);
}

/** Check if a method matches a PostHog notification name, accounting for
* the SDK sometimes double-prefixing (`__posthog/` instead of `_posthog/`). */
function isPosthogMethod(method: string, name: string): boolean {
return method === `_posthog/${name}` || method === `__posthog/${name}`;
}

function handleNotification(
b: ItemBuilder,
msg: { method: string; params?: unknown },
Expand Down Expand Up @@ -323,7 +318,7 @@ function handleNotification(
return;
}

if (isPosthogMethod(msg.method, "console")) {
if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.CONSOLE)) {
if (!b.currentTurn) {
ensureImplicitTurn(b, ts);
}
Expand All @@ -339,7 +334,7 @@ function handleNotification(
return;
}

if (isPosthogMethod(msg.method, "compact_boundary")) {
if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY)) {
if (!b.currentTurn) ensureImplicitTurn(b, ts);
const params = msg.params as {
trigger: "manual" | "auto";
Expand All @@ -356,7 +351,7 @@ function handleNotification(
return;
}

if (isPosthogMethod(msg.method, "status")) {
if (isNotification(msg.method, POSTHOG_NOTIFICATIONS.STATUS)) {
if (!b.currentTurn) ensureImplicitTurn(b, ts);
const params = msg.params as { status: string; isComplete?: boolean };
if (params.status === "compacting" && !params.isComplete) {
Expand Down
16 changes: 16 additions & 0 deletions apps/code/src/renderer/features/sessions/service/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ const createMockSession = (
startedAt: Date.now(),
status: "connected",
isPromptPending: false,
isCompacting: false,
promptStartedAt: null,
pendingPermissions: new Map(),
pausedDurationMs: 0,
Expand Down Expand Up @@ -551,6 +552,21 @@ describe("SessionService", () => {
);
});

it("queues message when compaction is in progress", async () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
createMockSession({ isCompacting: true }),
);

const result = await service.sendPrompt("task-123", "Hello");

expect(result.stopReason).toBe("queued");
expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith(
"task-123",
"Hello",
);
});

it("sends prompt via tRPC when session is ready", async () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
Expand Down
75 changes: 58 additions & 17 deletions apps/code/src/renderer/features/sessions/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
} from "@features/sessions/stores/sessionStore";
import { useSettingsStore } from "@features/settings/stores/settingsStore";
import { taskViewedApi } from "@features/sidebar/hooks/useTaskViewed";
import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent";
import { DEFAULT_GATEWAY_MODEL } from "@posthog/agent/gateway-models";
import { getIsOnline } from "@renderer/stores/connectivityStore";
import { trpcClient } from "@renderer/trpc/client";
Expand Down Expand Up @@ -495,6 +496,7 @@ export class SessionService {
errorMessage:
"Session disconnected due to inactivity. Click Retry to reconnect.",
isPromptPending: false,
isCompacting: false,
promptStartedAt: null,
});
}
Expand Down Expand Up @@ -765,27 +767,14 @@ export class SessionService {
"stopReason" in msg.result
) {
const stopReason = (msg.result as { stopReason?: string }).stopReason;
const hasQueuedMessages =
session.messageQueue.length > 0 && session.status === "connected";
const hasQueuedMessages = this.drainQueuedMessages(taskRunId, session);

// Only notify when queue is empty - queued messages will start a new turn
if (stopReason && !hasQueuedMessages) {
notifyPromptComplete(session.taskTitle, stopReason, session.taskId);
}

taskViewedApi.markActivity(session.taskId);

// Process queued messages after turn completes - send all as one prompt
if (hasQueuedMessages) {
setTimeout(() => {
this.sendQueuedMessages(session.taskId).catch((err) => {
log.error("Failed to send queued messages", {
taskId: session.taskId,
error: err,
});
});
}, 0);
}
}

if ("method" in msg && msg.method === "session/update" && "params" in msg) {
Expand Down Expand Up @@ -828,10 +817,10 @@ export class SessionService {
}
}

// Handle _posthog/sdk_session notifications for adapter info
// Handle SDK_SESSION notifications for adapter info
if (
"method" in msg &&
msg.method === "_posthog/sdk_session" &&
isNotification(msg.method, POSTHOG_NOTIFICATIONS.SDK_SESSION) &&
"params" in msg
) {
const params = msg.params as {
Expand All @@ -848,6 +837,54 @@ export class SessionService {
});
}
}

if (
"method" in msg &&
"params" in msg &&
isNotification(msg.method, POSTHOG_NOTIFICATIONS.STATUS)
) {
const params = msg.params as { status?: string; isComplete?: boolean };
if (params?.status === "compacting") {
sessionStoreSetters.updateSession(taskRunId, {
isCompacting: !params.isComplete,
});
}
}

if (
"method" in msg &&
isNotification(msg.method, POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY)
) {
sessionStoreSetters.updateSession(taskRunId, {
isCompacting: false,
});

this.drainQueuedMessages(taskRunId, session);
}
}

private drainQueuedMessages(
taskRunId: string,
session: AgentSession,
): boolean {
const freshSession = sessionStoreSetters.getSessions()[taskRunId];
const hasQueuedMessages =
freshSession &&
freshSession.messageQueue.length > 0 &&
freshSession.status === "connected";

if (hasQueuedMessages) {
setTimeout(() => {
this.sendQueuedMessages(session.taskId).catch((err) => {
log.error("Failed to send queued messages", {
taskId: session.taskId,
error: err,
});
});
}, 0);
}

return hasQueuedMessages;
}

private handlePermissionRequest(
Expand Down Expand Up @@ -921,12 +958,13 @@ export class SessionService {
throw new Error(`Session is not ready (status: ${session.status})`);
}

if (session.isPromptPending) {
if (session.isPromptPending || session.isCompacting) {
const promptText = extractPromptText(prompt);
sessionStoreSetters.enqueueMessage(taskId, promptText);
log.info("Message queued", {
taskId,
queueLength: session.messageQueue.length + 1,
reason: session.isCompacting ? "compacting" : "prompt_pending",
});
return { stopReason: "queued" };
}
Expand Down Expand Up @@ -1053,11 +1091,13 @@ export class SessionService {
errorDetails ||
"Session connection lost. Please retry or start a new session.",
isPromptPending: false,
isCompacting: false,
promptStartedAt: null,
});
} else {
sessionStoreSetters.updateSession(session.taskRunId, {
isPromptPending: false,
isCompacting: false,
promptStartedAt: null,
});
}
Expand Down Expand Up @@ -2150,6 +2190,7 @@ export class SessionService {
startedAt: Date.now(),
status: "connecting",
isPromptPending: false,
isCompacting: false,
promptStartedAt: null,
pendingPermissions: new Map(),
pausedDurationMs: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export interface AgentSession {
errorTitle?: string;
errorMessage?: string;
isPromptPending: boolean;
isCompacting: boolean;
promptStartedAt: number | null;
logUrl?: string;
processedLineCount?: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type {
ToolCallContent,
ToolCallLocation,
} from "@features/sessions/types";
import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent";
import type { ChangedFile, GitFileStatus } from "@shared/types";
import {
type AcpMessage,
Expand Down Expand Up @@ -169,7 +170,9 @@ export function buildCloudEventSummary(

const merged = mergeToolCall(toolCalls.get(toolCallId), patch);
toolCalls.set(toolCallId, merged);
} else if (isPosthogMethod(message.method, "tree_snapshot")) {
} else if (
isNotification(message.method, POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT)
) {
const params = message.params as
| {
changes?: Array<{ path: string; status: "A" | "M" | "D" }>;
Expand Down Expand Up @@ -229,10 +232,6 @@ export function extractCloudFileDiff(
};
}

function isPosthogMethod(method: string, name: string): boolean {
return method === `_posthog/${name}` || method === `__posthog/${name}`;
}

export function extractCloudToolChangedFiles(
toolCalls: Map<string, ParsedToolCall>,
): ChangedFile[] {
Expand Down
19 changes: 15 additions & 4 deletions packages/agent/src/acp-extensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
* - Custom notification methods are prefixed with `_posthog/`
* - Custom data can be attached via `_meta` fields
*
* Note: When using `extNotification()` from the ACP SDK, it automatically
* adds an extra underscore prefix (e.g., `_posthog/tree_snapshot` becomes
* `__posthog/tree_snapshot` in the log). Code that reads logs should handle both.
*
* See: https://agentclientprotocol.com/docs/extensibility
*/

Expand Down Expand Up @@ -68,3 +64,18 @@ export const POSTHOG_NOTIFICATIONS = {
/** Token usage update for a session turn */
USAGE_UPDATE: "_posthog/usage_update",
} as const;

type NotificationMethod =
(typeof POSTHOG_NOTIFICATIONS)[keyof typeof POSTHOG_NOTIFICATIONS];

/**
* Check if an ACP method matches a PostHog notification, handling the
* possible `__posthog/` double-prefix from extNotification().
*/
export function isNotification(
method: string | undefined,
notification: NotificationMethod,
): boolean {
if (!method) return false;
return method === notification || method === `_${notification}`;
}
2 changes: 1 addition & 1 deletion packages/agent/src/adapters/claude/claude-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
),
...(meta?.taskRunId
? [
this.client.extNotification("_posthog/sdk_session", {
this.client.extNotification(POSTHOG_NOTIFICATIONS.SDK_SESSION, {
taskRunId: meta.taskRunId,
sessionId,
adapter: "claude",
Expand Down
7 changes: 4 additions & 3 deletions packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type {
BetaContentBlock,
BetaRawContentBlockDelta,
} from "@anthropic-ai/sdk/resources/beta.mjs";
import { POSTHOG_NOTIFICATIONS } from "@/acp-extensions";
import { image, text } from "../../../utils/acp-content";
import { unreachable } from "../../../utils/common";
import type { Logger } from "../../../utils/logger";
Expand Down Expand Up @@ -550,7 +551,7 @@ export async function handleSystemMessage(
case "init":
break;
case "compact_boundary":
await client.extNotification("_posthog/compact_boundary", {
await client.extNotification(POSTHOG_NOTIFICATIONS.COMPACT_BOUNDARY, {
sessionId,
trigger: message.compact_metadata.trigger,
preTokens: message.compact_metadata.pre_tokens,
Expand All @@ -566,7 +567,7 @@ export async function handleSystemMessage(
case "status":
if (message.status === "compacting") {
logger.info("Session compacting started", { sessionId });
await client.extNotification("_posthog/status", {
await client.extNotification(POSTHOG_NOTIFICATIONS.STATUS, {
sessionId,
status: "compacting",
});
Expand All @@ -579,7 +580,7 @@ export async function handleSystemMessage(
status: message.status,
summary: message.summary,
});
await client.extNotification("_posthog/task_notification", {
await client.extNotification(POSTHOG_NOTIFICATIONS.TASK_NOTIFICATION, {
sessionId,
taskId: message.task_id,
status: message.status,
Expand Down
1 change: 1 addition & 0 deletions packages/agent/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { isNotification, POSTHOG_NOTIFICATIONS } from "./acp-extensions";
export {
getMcpToolMetadata,
isMcpToolReadOnly,
Expand Down
Loading
Loading