From 1bda522b4c07c09e4099e5eb4fa2b626e496f1bc Mon Sep 17 00:00:00 2001 From: mikemolinet Date: Mon, 11 May 2026 20:09:56 -0700 Subject: [PATCH] =?UTF-8?q?feat(parity):=20Item=20B=20Phase=201=20substrat?= =?UTF-8?q?e=20=E2=80=94=20live-delivery-v3=20daemon-IPC=20attachment=20pr?= =?UTF-8?q?imitive?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parity port of cueapi/cueapi#805 (merged 03:07:18Z). Mike Q-B ratify locked the ASYNC fire-accept dispatcher path 2026-05-12 ~00:38Z. Direction of parity: private-leads, OSS-lags. ## Files added - alembic/versions/035_agent_live_sessions_ipc_attachment.py — OSS migration (renumbered from private's 063; down_revision=034 matches OSS alembic head) - app/schemas/ipc_attachment.py (verbatim from private) - app/services/ipc_attachment_service.py (verbatim from private) - app/routers/ipc_attachments.py (verbatim from private) - tests/test_ipc_attachments.py (verbatim from private; 42 tests pass) ## Files modified - app/models/agent_live_session.py — added 4 cols (ipc_session_token, transport, daemon_id, last_reconciled_at) + 2 indexes (ix_agent_live_sessions_daemon, ix_agent_live_sessions_transport) + CheckConstraint valid_transport - app/routers/cues.py — added _build_ipc_delivery_metadata pure helper + fire_cue stamps execution.outcome_metadata when target row has transport='ipc' - app/main.py — registered ipc_attachments.router + ipc_attachments.reconcile_router - parity-manifest.json — 3 new entries + 3 deviation note bumps ## Schema (migration 035) All 4 columns nullable/defaulted so existing v2.x rows continue unchanged: - ipc_session_token VARCHAR(32) — daemon-issued ULID - transport VARCHAR(8) NOT NULL DEFAULT 'poll' + CHECK IN ('ipc','poll') - daemon_id UUID — stable per-install Desktop daemon identity - last_reconciled_at TIMESTAMPTZ ## Local: 42/42 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../035_agent_live_sessions_ipc_attachment.py | 116 ++ app/main.py | 4 +- app/models/agent_live_session.py | 29 + app/routers/cues.py | 43 + app/routers/ipc_attachments.py | 323 +++++ app/schemas/ipc_attachment.py | 245 ++++ app/services/ipc_attachment_service.py | 327 +++++ parity-manifest.json | 37 +- tests/test_ipc_attachments.py | 1053 +++++++++++++++++ 9 files changed, 2170 insertions(+), 7 deletions(-) create mode 100644 alembic/versions/035_agent_live_sessions_ipc_attachment.py create mode 100644 app/routers/ipc_attachments.py create mode 100644 app/schemas/ipc_attachment.py create mode 100644 app/services/ipc_attachment_service.py create mode 100644 tests/test_ipc_attachments.py diff --git a/alembic/versions/035_agent_live_sessions_ipc_attachment.py b/alembic/versions/035_agent_live_sessions_ipc_attachment.py new file mode 100644 index 0000000..b15d2ac --- /dev/null +++ b/alembic/versions/035_agent_live_sessions_ipc_attachment.py @@ -0,0 +1,116 @@ +"""Item B Phase 1 — extend agent_live_sessions for live-delivery-v3 IPC attachments. + +Adds the substrate-side primitives needed for the daemon-IPC delivery path +locked in the live-delivery-v3 joint design at: + + https://trydock.ai/mike/live-delivery-v3-build-hub + +Schema additions (4 columns + 2 indexes; all nullable / defaulted so existing +v2.x rows continue to work unchanged): + +* ``ipc_session_token VARCHAR(32)`` — daemon-issued ULID identifying the + attachment for fire-accept routing. NULL for non-IPC rows. VARCHAR(32) per + primary's Q-A REDIRECT (room for future versioned-prefix shapes like + ``v3a_<26-char-ULID>``); application-layer validates format (no DB regex + CHECK per CueAPI convention). +* ``transport VARCHAR(8) NOT NULL DEFAULT 'poll'`` + CHECK IN ('ipc', 'poll') + — routing-mode for fire-accept dispatcher. Existing rows default to 'poll' + so behavior is unchanged until the daemon issues a v3 attach. VARCHAR+CHECK + matches CueAPI convention (cues.callback_transport, executions.status, + users.plan all use this shape; Postgres ENUM ALTER is painful to evolve). +* ``daemon_id UUID`` — stable per-install Desktop daemon identity, sent in + ``X-CueAPI-Daemon-Id`` header on attach + reconcile + DELETE requests. + Server scopes reconcile transactions per-daemon so daemon X's view can't + affect daemon Y's rows. NULL for v2.x rows (no daemon identity tracked). +* ``last_reconciled_at TIMESTAMPTZ`` — bumps every time a row appears in a + daemon reconcile batch. Powers the conservative downgrade-to-poll cleanup + for orphaned rows (rows in DB not mentioned in current reconcile batch get + ``transport='poll'`` and ``last_reconciled_at`` left untouched; daily + cleanup deletes ``transport='poll'`` rows where ``last_reconciled_at < + now() - 24h``). + +Indexes: + +* ``ix_agent_live_sessions_daemon`` on ``daemon_id`` — supports per-daemon + reconcile WHERE clauses. +* ``ix_agent_live_sessions_transport`` on ``(transport, last_reconciled_at)`` + — supports the daily cleanup job's filter ``transport='poll' AND + last_reconciled_at < now() - 24h``. + +Mike Q-B ratification 2026-05-12 ~00:38Z locked the **ASYNC** fire-accept +dispatcher path: server fires + returns immediately with +``delivery_mode_requested='ipc'``; daemon-side delivery ack happens via the +existing ``POST /v1/executions//outcome`` path. NO inline ack-callback +machinery in Phase 1. Sync-inline-ack-3s alternative deferred to a future +Backlog row (meeting-room-style live agent discussions). + +Backwards-compat: all v2.x rows inherit ``transport='poll'`` + NULL on the +other 3 columns. Existing fire-accept dispatcher logic untouched until a row +carries ``transport='ipc'``. +""" +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import UUID + + +revision = "035" +down_revision = "034" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "agent_live_sessions", + sa.Column("ipc_session_token", sa.String(32), nullable=True), + ) + op.add_column( + "agent_live_sessions", + sa.Column( + "transport", + sa.String(8), + nullable=False, + server_default=sa.text("'poll'"), + ), + ) + op.add_column( + "agent_live_sessions", + sa.Column("daemon_id", UUID(as_uuid=True), nullable=True), + ) + op.add_column( + "agent_live_sessions", + sa.Column( + "last_reconciled_at", + sa.DateTime(timezone=True), + nullable=True, + ), + ) + + op.create_check_constraint( + "valid_transport", + "agent_live_sessions", + "transport IN ('ipc', 'poll')", + ) + + op.create_index( + "ix_agent_live_sessions_daemon", + "agent_live_sessions", + ["daemon_id"], + ) + op.create_index( + "ix_agent_live_sessions_transport", + "agent_live_sessions", + ["transport", "last_reconciled_at"], + ) + + +def downgrade() -> None: + op.drop_index("ix_agent_live_sessions_transport", table_name="agent_live_sessions") + op.drop_index("ix_agent_live_sessions_daemon", table_name="agent_live_sessions") + op.drop_constraint("valid_transport", "agent_live_sessions", type_="check") + op.drop_column("agent_live_sessions", "last_reconciled_at") + op.drop_column("agent_live_sessions", "daemon_id") + op.drop_column("agent_live_sessions", "transport") + op.drop_column("agent_live_sessions", "ipc_session_token") diff --git a/app/main.py b/app/main.py index 0a98b34..36c0db7 100644 --- a/app/main.py +++ b/app/main.py @@ -12,7 +12,7 @@ from app.middleware.request_id import RequestIdMiddleware from app.middleware.verify_echo import VerifyEchoMiddleware from app.redis import close_redis -from app.routers import agent_live_sessions, agents, alerts, auth_routes, cues, device_code, echo, events, executions, health, info, internal_users, messages, usage, webhook_secret, workers +from app.routers import agent_live_sessions, agents, alerts, auth_routes, cues, device_code, echo, events, executions, health, info, internal_users, ipc_attachments, messages, usage, webhook_secret, workers from app.utils.logging import setup_logging @@ -177,6 +177,8 @@ async def generic_error_handler(request: Request, exc: Exception): app.include_router(alerts.router) app.include_router(agents.router) app.include_router(agent_live_sessions.router) +app.include_router(ipc_attachments.router) +app.include_router(ipc_attachments.reconcile_router) app.include_router(events.router) app.include_router(messages.router) diff --git a/app/models/agent_live_session.py b/app/models/agent_live_session.py index 16d834e..5558845 100644 --- a/app/models/agent_live_session.py +++ b/app/models/agent_live_session.py @@ -45,6 +45,7 @@ from sqlalchemy import ( Boolean, + CheckConstraint, Column, DateTime, ForeignKey, @@ -96,6 +97,21 @@ class AgentLiveSession(Base): unique=True, postgresql_where=text("detached_at IS NULL"), ), + # Item B Phase 1 indexes (migration 035): support per-daemon + # reconcile + daily transport='poll' cleanup queries. + Index("ix_agent_live_sessions_daemon", "daemon_id"), + Index( + "ix_agent_live_sessions_transport", + "transport", + "last_reconciled_at", + ), + # Item B Phase 1 CHECK constraint (migration 035): transport + # values restricted to 'ipc' or 'poll'. VARCHAR+CHECK is the + # CueAPI convention. + CheckConstraint( + "transport IN ('ipc', 'poll')", + name="valid_transport", + ), ) id = Column( @@ -147,6 +163,19 @@ class AgentLiveSession(Base): # grace accepts bare task_name on existing rows. session_token = Column(String(80), nullable=True) + # Item B Phase 1 columns (migration 035, live-delivery-v3). + # NB: existing v2.x rows inherit transport='poll' + NULL on others + # — fire-accept dispatcher unchanged unless transport='ipc' is set. + ipc_session_token = Column(String(32), nullable=True) + transport = Column( + String(8), + nullable=False, + server_default=text("'poll'"), + default="poll", + ) + daemon_id = Column(UUID(as_uuid=True), nullable=True) + last_reconciled_at = Column(DateTime(timezone=True), nullable=True) + created_at = Column( DateTime(timezone=True), nullable=False, diff --git a/app/routers/cues.py b/app/routers/cues.py index b3c7d8e..db5c75a 100644 --- a/app/routers/cues.py +++ b/app/routers/cues.py @@ -89,6 +89,42 @@ async def delete( return Response(status_code=204) +async def _build_ipc_delivery_metadata(db, cue_id: str): + """Item B Phase 1 ASYNC fire-accept dispatcher (Mike Q-B ratify 2026-05-12). + + Stamps ``delivery_mode_requested='ipc'`` on a new execution's + ``outcome_metadata`` IFF the cue's target ``agent_live_sessions`` row is + currently on ``transport='ipc'`` (i.e. a daemon-IPC attachment is live). + + Returns: + - ``{"delivery_mode_requested": "ipc"}`` when an active IPC attachment + exists for ``cue_id`` + - ``None`` otherwise (preserves the default execution.outcome_metadata + NULL invariant for non-IPC fires; webhook + worker paths unchanged) + + Per design lock: no inline ack-callback machinery. The daemon ACKs delivery + by writing to the existing ``POST /v1/executions//outcome`` path. The + metadata stamp is the *request* signal; actual delivery is async. + + Module-level so pytest-cov traces branches without going through ASGI + dispatch (CLAUDE.md ASGI coverage discipline). + """ + from sqlalchemy import select + from app.models.agent_live_session import AgentLiveSession + + result = await db.execute( + select(AgentLiveSession.id).where( + AgentLiveSession.cue_id == cue_id, + AgentLiveSession.transport == "ipc", + AgentLiveSession.detached_at.is_(None), + ) + ) + row = result.scalar_one_or_none() + if row is None: + return None + return {"delivery_mode_requested": "ipc"} + + @router.post("/{cue_id}/fire", status_code=200) async def fire_cue( cue_id: str, @@ -132,9 +168,16 @@ async def fire_cue( is_scheduled = effective_scheduled_for > now execution_id = uuid_mod.uuid4() + # Item B Phase 1: ASYNC fire-accept IPC routing (Mike Q-B ratify + # 2026-05-12 ~00:38Z, ported from cueapi/cueapi#805). If the cue's + # target agent_live_sessions row is on transport='ipc', stamp the + # execution's outcome_metadata with delivery_mode_requested='ipc'. + # Helper is pure + module-level (CLAUDE.md ASGI coverage discipline). + ipc_outcome_metadata = await _build_ipc_delivery_metadata(db, cue.id) execution = Execution( id=execution_id, cue_id=cue.id, scheduled_for=effective_scheduled_for, status="pending", triggered_by="manual_fire", + outcome_metadata=ipc_outcome_metadata, ) db.add(execution) diff --git a/app/routers/ipc_attachments.py b/app/routers/ipc_attachments.py new file mode 100644 index 0000000..f82c13d --- /dev/null +++ b/app/routers/ipc_attachments.py @@ -0,0 +1,323 @@ +"""Item B Phase 1 — endpoints for daemon-IPC attachment lifecycle. + +Live-delivery-v3 substrate primitive (cf. https://trydock.ai/mike/live-delivery-v3-build-hub). +Mike Q-B ratify locked the ASYNC fire-accept dispatcher path 2026-05-12 ~00:38Z. + +Three endpoints: + +* ``POST /v1/agents//attachments`` — daemon attaches a Live session. +* ``DELETE /v1/agents//attachments/`` — daemon revokes. +* ``POST /v1/agents/reconcile-attachments`` — daemon boot-reconcile. + +Daemon identity via the ``X-CueAPI-Daemon-Id`` header (UUID; mismatch +with body.daemon_id on reconcile → 400). + +Router stays thin — service layer (``app/services/ipc_attachment_service.py``) +does the heavy lifting so pytest-cov traces branches per CLAUDE.md ASGI +dispatch discipline. +""" +from __future__ import annotations + +from typing import Optional, Tuple +from uuid import UUID + +from fastapi import APIRouter, Depends, Header, Response +from fastapi.responses import JSONResponse +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.auth import AuthenticatedUser, get_current_user +from app.database import get_db +from app.models.agent import Agent +from app.schemas.ipc_attachment import ( + AttachmentCreate, + AttachmentDeleteIdempotent, + AttachmentExistsError, + AttachmentReconcileRequest, + AttachmentReconcileResponse, + AttachmentResponse, +) +from app.services.ipc_attachment_service import ( + create_attachment, + delete_attachment, + reconcile_attachments, +) + + +router = APIRouter(prefix="/v1/agents", tags=["ipc-attachments"]) +# Reconcile is daemon-scoped (not per-agent in the URL), so register on a +# second router at the same prefix to keep routing clean. +reconcile_router = APIRouter(prefix="/v1/agents", tags=["ipc-attachments"]) + +_DAEMON_ID_HEADER = "X-CueAPI-Daemon-Id" + + +# ─────────────────────────────────────────────────────────────────────── +# Pure helpers — extracted per CLAUDE.md ASGI dispatch coverage discipline +# ─────────────────────────────────────────────────────────────────────── + + +def _parse_daemon_id( + raw: Optional[str], +) -> Tuple[Optional[UUID], Optional[JSONResponse]]: + """Validate the X-CueAPI-Daemon-Id header. + + Returns ``(uuid, None)`` on success, ``(None, error_response)`` on + failure. Module-level helper so pytest-cov traces branches directly + (the ASGI dispatch wrap on the endpoint handlers hides them otherwise). + """ + if not raw: + return None, JSONResponse( + status_code=400, + content={ + "error": { + "code": "missing_daemon_id", + "message": ( + "X-CueAPI-Daemon-Id header is required on this " + "endpoint. Generate a UUID at daemon install time " + "and send it on every attach/reconcile/delete." + ), + "status": 400, + } + }, + ) + try: + return UUID(raw.strip()), None + except (ValueError, AttributeError): + return None, JSONResponse( + status_code=400, + content={ + "error": { + "code": "invalid_daemon_id", + "message": "X-CueAPI-Daemon-Id must be a valid UUID.", + "status": 400, + } + }, + ) + + +async def _resolve_agent_ref( + db: AsyncSession, ref: str, user: AuthenticatedUser +) -> Tuple[Optional[Agent], Optional[JSONResponse]]: + """Resolve agent_ref to an Agent owned by the caller. + + Phase 1 supports opaque ``agt_xxx`` only; slug-form deferred (daemon + already has the opaque ID from earlier POST /v1/agents response). + Returns ``(agent, None)`` or ``(None, error_response)``. + """ + if ref.startswith("agt_"): + row = ( + await db.execute( + select(Agent).where(Agent.id == ref, Agent.user_id == user.id) + ) + ).scalar_one_or_none() + if row is None: + return None, JSONResponse( + status_code=404, + content={ + "error": { + "code": "agent_not_found", + "message": f"Agent {ref} not found", + "status": 404, + } + }, + ) + return row, None + return None, JSONResponse( + status_code=400, + content={ + "error": { + "code": "invalid_agent_ref", + "message": ( + "Pass the agent's opaque ID (agt_xxx) on this endpoint, " + "not slug-form." + ), + "status": 400, + } + }, + ) + + +# ─────────────────────────────────────────────────────────────────────── +# POST /v1/agents//attachments +# ─────────────────────────────────────────────────────────────────────── + + +@router.post("/{ref}/attachments", status_code=201) +async def post_attachment( + ref: str, + body: AttachmentCreate, + x_cueapi_daemon_id: Optional[str] = Header(default=None, alias=_DAEMON_ID_HEADER), + user: AuthenticatedUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Daemon attaches an IPC Live session. + + Idempotency: + - Same-(agent, label, daemon_id) reattach → REPLACE (new token issued; + old row detached; old token returns 401 on subsequent use). + - Cross-daemon collision on (agent, label) → 409 with + ``existing_daemon_id`` + ``existing_last_reconciled_at`` so daemon + can decide whether to escalate or DELETE+re-POST. + """ + daemon_id, err = _parse_daemon_id(x_cueapi_daemon_id) + if err is not None: + return err + + agent, ag_err = await _resolve_agent_ref(db, ref, user) + if ag_err is not None: + return ag_err + assert agent is not None # narrowed by ag_err is None + assert daemon_id is not None + + result = await create_attachment( + db, + agent_id=agent.id, + label=body.label, + task_name=body.task_name, + ipc_session_token=body.ipc_session_token, + daemon_id=daemon_id, + attached_at=body.attached_at, + monitor_version=body.monitor_version, + ) + + if result.status == "conflict_cross_daemon": + assert result.existing is not None + existing = result.existing + err_body = AttachmentExistsError( + existing_token=existing.ipc_session_token or "", + existing_daemon_id=str(existing.daemon_id) if existing.daemon_id else "", + existing_attached_at=existing.attached_at, + existing_last_reconciled_at=existing.last_reconciled_at, + ).model_dump(mode="json") + return JSONResponse(status_code=409, content={"error": err_body}) + + await db.commit() + assert result.row is not None + row = result.row + return AttachmentResponse( + id=str(row.id), + agent_id=row.agent_id, + label=row.label, + task_name=row.task_name, + transport=row.transport, + ipc_session_token=row.ipc_session_token or "", + daemon_id=str(row.daemon_id) if row.daemon_id else "", + attached_at=row.attached_at, + last_reconciled_at=row.last_reconciled_at, + supersedes_token=result.supersedes_token, + ).model_dump(mode="json") + + +# ─────────────────────────────────────────────────────────────────────── +# DELETE /v1/agents//attachments/ +# ─────────────────────────────────────────────────────────────────────── + + +@router.delete("/{ref}/attachments/{token}") +async def delete_attachment_endpoint( + ref: str, + token: str, + x_cueapi_daemon_id: Optional[str] = Header(default=None, alias=_DAEMON_ID_HEADER), + user: AuthenticatedUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Idempotent DELETE by token, scoped to caller's daemon_id. + + - First-time: 204 No Content + - Idempotent hit: 200 with ``{"deleted": false, "reason": "already_deleted"}`` + so daemon-side debugging can distinguish 'I won the race' from + 'someone else cleaned up'. + """ + daemon_id, err = _parse_daemon_id(x_cueapi_daemon_id) + if err is not None: + return err + + agent, ag_err = await _resolve_agent_ref(db, ref, user) + if ag_err is not None: + return ag_err + assert agent is not None + assert daemon_id is not None + + result = await delete_attachment( + db, + agent_id=agent.id, + ipc_session_token=token, + daemon_id=daemon_id, + ) + await db.commit() + + if result.status == "deleted": + return Response(status_code=204) + return JSONResponse( + status_code=200, + content=AttachmentDeleteIdempotent().model_dump(), + ) + + +# ─────────────────────────────────────────────────────────────────────── +# POST /v1/agents/reconcile-attachments +# ─────────────────────────────────────────────────────────────────────── + + +@reconcile_router.post("/reconcile-attachments", status_code=200) +async def post_reconcile_attachments( + body: AttachmentReconcileRequest, + x_cueapi_daemon_id: Optional[str] = Header(default=None, alias=_DAEMON_ID_HEADER), + user: AuthenticatedUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Daemon reports full local attachment view; server reconciles. + + Atomic transaction: + 1. UPSERT each reported attachment as ``transport='ipc'`` with + ``last_reconciled_at=now()``. + 2. UPDATE all rows for this daemon_id NOT in batch → ``transport='poll'`` + (conservative downgrade per CMA Q-G lean). + 3. Daily cleanup job (separate) deletes ``transport='poll'`` rows >24h + stale. + + Daemon scoping enforced via X-CueAPI-Daemon-Id header AND body.daemon_id + must match — defends against transport-layer spoof or body-tampering + mismatches. + """ + daemon_id, err = _parse_daemon_id(x_cueapi_daemon_id) + if err is not None: + return err + assert daemon_id is not None + + if daemon_id != body.daemon_id: + return JSONResponse( + status_code=400, + content={ + "error": { + "code": "daemon_id_mismatch", + "message": ( + "X-CueAPI-Daemon-Id header and body.daemon_id must " + "match. Both identify the same calling daemon." + ), + "status": 400, + } + }, + ) + + # `user` dependency is included so the call is auth-required even + # though the reconcile-attachments endpoint doesn't currently filter + # by user_id (daemon_id is the primary scope key). The auth check + # gates calls behind a valid API key; daemon_id provides identity + # within the user's tenant. + _ = user # explicit no-op so linters don't strip the dependency + + result = await reconcile_attachments( + db, + daemon_id=daemon_id, + attachments=body.attachments, + ) + await db.commit() + + return AttachmentReconcileResponse( + daemon_id=str(daemon_id), + reconciled_at=body.reconciled_at, + upserted_count=result.upserted_count, + downgraded_count=result.downgraded_count, + ).model_dump(mode="json") diff --git a/app/schemas/ipc_attachment.py b/app/schemas/ipc_attachment.py new file mode 100644 index 0000000..5c6e884 --- /dev/null +++ b/app/schemas/ipc_attachment.py @@ -0,0 +1,245 @@ +"""Item B Phase 1 — Pydantic wire schemas for IPC attachment endpoints. + +Live-delivery-v3 substrate primitive (cf. https://trydock.ai/mike/live-delivery-v3-build-hub). +Three endpoint surfaces: + +* ``POST /v1/agents//attachments`` — register an IPC attachment. +* ``DELETE /v1/agents//attachments/`` — revoke explicitly. +* ``POST /v1/agents/reconcile-attachments`` — daemon-driven boot-reconcile. + +Substrate-side design owner: cueapi-primary. Joint-design lock with CMA in +the build hub; Mike Q-B ratify locked ASYNC dispatcher path 2026-05-12 ~00:38Z. + +Wire shape rules baked in: + +* ``ipc_session_token`` is daemon-issued ULID (26 chars typical, 32-char + schema cap leaves room for versioned prefixes like ``v3a_``). + App-layer regex validates the format on POST (no DB regex CHECK per + CueAPI convention). +* 409 ``attachment_exists`` carries ``existing_daemon_id`` + + ``existing_last_reconciled_at`` so the daemon can distinguish + same-daemon-prior-session (safe DELETE+re-POST) from cross-daemon + conflict (escalate / refuse to overwrite). +* DELETE is idempotent: 204 first-time, 200 with ``{"deleted": false, + "reason": "already_deleted"}`` on idempotent hit. Helps daemon-side + debugging which cleanup path won the race. +* Reconcile body is a full daemon-local view; server applies atomic + UPSERT + downgrades unmentioned-for-this-daemon rows to ``transport='poll'`` + (CMA Q-G lean — conservative; no delete on first absence). Daily + cleanup job deletes ``transport='poll'`` rows >24h stale. +""" +from __future__ import annotations + +from datetime import datetime +from typing import List, Optional +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, Field, field_validator + + +# Application-layer ULID validator. Daemon may prefix with v_ +# (Crockford base32). 26-char body is the standard ULID; up to 6-char +# prefix expansion fits in VARCHAR(32). +_TOKEN_REGEX_BODY = r"[0-9A-HJKMNP-TV-Z]{26}" +_TOKEN_REGEX_PREFIX = r"(?:v[a-z0-9]+_)?" +_TOKEN_REGEX = _TOKEN_REGEX_PREFIX + _TOKEN_REGEX_BODY + + +def _validate_token_shape(value: str) -> str: + """Reject malformed tokens at the wire layer (preferred to DB regex CHECK).""" + import re + + if not isinstance(value, str) or not re.fullmatch(_TOKEN_REGEX, value): + raise ValueError( + "invalid ipc_session_token format — expect 26-char ULID, optional " + "v_ prefix" + ) + return value + + +class AttachmentCreate(BaseModel): + """POST /v1/agents//attachments — daemon attaches a Live session.""" + + model_config = ConfigDict(extra="forbid") + + label: str = Field( + ..., + min_length=1, + max_length=64, + description="Per-agent label for this attachment (`main` / `pr-watcher` / etc.).", + ) + task_name: str = Field( + ..., + min_length=1, + max_length=255, + description="Local task identifier (e.g. `max-claude-code-cueapi-live`).", + ) + ipc_session_token: str = Field( + ..., + min_length=26, + max_length=32, + description=( + "Daemon-issued ULID identifying this attachment for fire-accept " + "routing. App-layer validates shape; substrate stores opaque." + ), + ) + attached_at: Optional[datetime] = Field( + default=None, + description=( + "Daemon's wall-clock at attach time (informational; server uses " + "now() if absent)." + ), + ) + monitor_version: Optional[str] = Field( + default=None, + max_length=64, + description="Optional capability stamp for cross-daemon-version observability.", + ) + + @field_validator("ipc_session_token") + @classmethod + def _check_token(cls, value: str) -> str: + return _validate_token_shape(value) + + +class AttachmentResponse(BaseModel): + """Server response on successful attach (201) — minimal echo of stored state.""" + + id: str = Field(..., description="agent_live_sessions.id (UUID stringified)") + agent_id: str + label: str + task_name: str + transport: str = Field(..., description="Always `ipc` on this endpoint's 201.") + ipc_session_token: str + daemon_id: str + attached_at: datetime + last_reconciled_at: datetime + supersedes_token: Optional[str] = Field( + default=None, + description=( + "Set iff reattach displaced an existing same-(agent,label,daemon) " + "row. Old token is invalid from this moment forward (returns 401 " + "`token_revoked` on subsequent /outcome callbacks or DELETE)." + ), + ) + + +class AttachmentExistsError(BaseModel): + """409 ``attachment_exists`` body shape. + + Daemon distinguishes: + + * ``existing_daemon_id == my_daemon_id`` → safe DELETE+re-POST (own + prior session forgot to clean up). + * ``existing_daemon_id != my_daemon_id`` → escalate. User likely + moved machines; manual confirmation needed before clobbering. + + ``existing_last_reconciled_at`` provides freshness signal — if the + existing attachment is stale (>24h), daemon can confidently + DELETE+re-POST without risking live-attachment overwrite. + """ + + code: str = Field(default="attachment_exists") + existing_token: str + existing_daemon_id: str + existing_attached_at: datetime + existing_last_reconciled_at: Optional[datetime] = None + hint: str = Field( + default=( + "DELETE /attachments/ first if the existing " + "attachment is yours; escalate if existing_daemon_id != your " + "daemon_id." + ), + ) + + +class AttachmentDeleteIdempotent(BaseModel): + """200 body for idempotent-hit DELETE. + + First-time deletes return 204 (no body); subsequent deletes return 200 + with this body so daemon-side debugging can distinguish ``I just deleted + it`` from ``someone else already did`` (useful for tracing which cleanup + path won the race: explicit-DELETE / reattach-supersede / heartbeat- + stale / daemon-absence-cleanup). + """ + + deleted: bool = Field(default=False) + reason: str = Field(default="already_deleted") + + +class AttachmentReconcileEntry(BaseModel): + """One row in a daemon's reconcile batch.""" + + model_config = ConfigDict(extra="forbid") + + label: str = Field(..., min_length=1, max_length=64) + task_name: str = Field(..., min_length=1, max_length=255) + ipc_session_token: str = Field(..., min_length=26, max_length=32) + attached_at: datetime + + @field_validator("ipc_session_token") + @classmethod + def _check_token(cls, value: str) -> str: + return _validate_token_shape(value) + + +class AttachmentReconcileRequest(BaseModel): + """POST /v1/agents/reconcile-attachments — full daemon-local view. + + Daemon reports every attachment it currently holds locally; server + applies as a single atomic UPSERT transaction: + + 1. UPSERT each reported attachment as ``transport='ipc'`` with + ``last_reconciled_at=now()``. + 2. UPDATE all rows for this daemon_id NOT in this batch → + ``transport='poll'`` (conservative downgrade per CMA Q-G lean; + does NOT delete on first absence). + 3. Daily cleanup job (separate) deletes ``transport='poll'`` rows + where ``last_reconciled_at < now() - 24h``. + + No periodic server-side reconcile — server is passive; daemon drives. + """ + + model_config = ConfigDict(extra="forbid") + + daemon_id: UUID = Field( + ..., + description=( + "Stable per-install daemon identity. Must match the " + "X-CueAPI-Daemon-Id header (validated server-side)." + ), + ) + reconciled_at: datetime = Field( + ..., + description=( + "Daemon's wall-clock at reconcile time (informational; server " + "uses now() for last_reconciled_at column values)." + ), + ) + attachments: List[AttachmentReconcileEntry] = Field( + default_factory=list, + description=( + "Full daemon-local attachment list. Empty list means daemon " + "reports zero attachments → all daemon's rows downgrade to " + "transport='poll'." + ), + ) + + +class AttachmentReconcileResponse(BaseModel): + """200 body for /reconcile-attachments — daemon sees what server did.""" + + daemon_id: str + reconciled_at: datetime + upserted_count: int = Field( + ..., + description="Rows inserted-or-updated to transport='ipc'.", + ) + downgraded_count: int = Field( + ..., + description=( + "Rows for this daemon NOT in the batch — downgraded to " + "transport='poll' (still queryable for poll-based delivery; " + "cleaned up after 24h stale by the daily job)." + ), + ) diff --git a/app/services/ipc_attachment_service.py b/app/services/ipc_attachment_service.py new file mode 100644 index 0000000..11c9331 --- /dev/null +++ b/app/services/ipc_attachment_service.py @@ -0,0 +1,327 @@ +"""Item B Phase 1 — service layer for IPC attachment lifecycle. + +Pure-helper functions backing the three IPC attachment endpoints. Extracted +from the router layer so pytest-cov can trace branches without going through +the ASGI dispatch wrapper (per CLAUDE.md "Pure-helper extraction" discipline, +established for the verify_echo + cursor-advance-as-ack work). + +Design contract (from joint design lock + Mike Q-B ratify 2026-05-12 ~00:38Z): + +* Attachments are scoped per-(agent_id, label, daemon_id). Same-label + re-attach from same daemon REPLACES (issues new token, supersedes old). +* Cross-daemon collision on same (agent_id, label) returns 409 with the + existing row's daemon_id so daemon can escalate. +* DELETE is idempotent. First-time = 204; subsequent = 200 with reason. +* Reconcile is one atomic UPSERT transaction. Daemon-scoped: rows belonging + to other daemons untouched. +* Token revocation paths: explicit DELETE / reattach supersede / daemon- + absence cleanup (>24h). All log structured ``attachment_token_revoked``. +* Fire-accept dispatcher uses ASYNC path (Mike Q-B): server fires, returns + immediately with ``delivery_mode_requested='ipc'``; daemon ACKs via the + existing ``POST /v1/executions//outcome`` path. NO inline sync ack. +""" +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import List, Optional +from uuid import UUID + +from sqlalchemy import and_, select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.agent import Agent +from app.models.agent_live_session import AgentLiveSession +from app.schemas.ipc_attachment import ( + AttachmentReconcileEntry, +) + + +logger = logging.getLogger(__name__) + + +# ─────────────────────────────────────────────────────────────────────── +# Outcomes — discriminated-union results for each service-layer call so +# the router stays a thin wrapper that maps to HTTP shapes. +# ─────────────────────────────────────────────────────────────────────── + + +class AttachmentCreateResult: + """Result of ``create_attachment``. One of: created, conflict_same_daemon + (supersede), conflict_cross_daemon (409).""" + + __slots__ = ("status", "row", "existing", "supersedes_token") + + def __init__( + self, + *, + status: str, + row: Optional[AgentLiveSession] = None, + existing: Optional[AgentLiveSession] = None, + supersedes_token: Optional[str] = None, + ): + # status ∈ {"created", "conflict_cross_daemon"} + # supersede-same-daemon is handled inline by replacing the row; result + # carries status="created" with supersedes_token set. + self.status = status + self.row = row + self.existing = existing + self.supersedes_token = supersedes_token + + +class AttachmentDeleteResult: + """Result of ``delete_attachment``. One of: deleted, already_deleted.""" + + __slots__ = ("status",) + + def __init__(self, *, status: str): + # status ∈ {"deleted", "already_deleted"} + self.status = status + + +class AttachmentReconcileResult: + """Result of ``reconcile_attachments``.""" + + __slots__ = ("upserted_count", "downgraded_count") + + def __init__(self, *, upserted_count: int, downgraded_count: int): + self.upserted_count = upserted_count + self.downgraded_count = downgraded_count + + +# ─────────────────────────────────────────────────────────────────────── +# Resolution + scoping helpers +# ─────────────────────────────────────────────────────────────────────── + + +async def _resolve_agent(db: AsyncSession, agent_ref: str) -> Optional[Agent]: + """Find an Agent by opaque ID (`agt_xxx`) or slug-form (`slug@owner`). + + Mirrors ``app.services.agent_service.resolve_address`` but kept local + so we can return ``None`` instead of raising — the router maps to 404. + """ + if agent_ref.startswith("agt_"): + row = (await db.execute(select(Agent).where(Agent.id == agent_ref))).scalar_one_or_none() + return row + # Slug-form: `slug@user_slug`. Phase 1 keeps this resolution minimal; + # multi-tenant slug resolution lives in agent_service.resolve_address. + return None + + +# ─────────────────────────────────────────────────────────────────────── +# create_attachment — POST /v1/agents//attachments +# ─────────────────────────────────────────────────────────────────────── + + +async def create_attachment( + db: AsyncSession, + *, + agent_id: str, + label: str, + task_name: str, + ipc_session_token: str, + daemon_id: UUID, + attached_at: Optional[datetime] = None, + monitor_version: Optional[str] = None, +) -> AttachmentCreateResult: + """Create or supersede an IPC attachment for ``(agent_id, label, daemon_id)``. + + Behavior matrix: + + * No active row with same ``(agent_id, label)`` → INSERT new row, return + ``status='created'``. + * Active row exists with same ``(agent_id, label, daemon_id)`` → SUPERSEDE + (mark old token revoked, INSERT new). Returns ``status='created'`` with + ``supersedes_token`` set so the daemon knows their prior session was + replaced (informational; old token is invalid immediately). + * Active row exists with same ``(agent_id, label)`` but DIFFERENT + ``daemon_id`` → return ``status='conflict_cross_daemon'`` with + ``existing`` populated. Router maps to 409 ``attachment_exists``. + """ + now = attached_at or datetime.now(timezone.utc) + + # Find any active row for this (agent_id, label) — supersede vs conflict + # is determined by daemon_id comparison. + existing_query = select(AgentLiveSession).where( + and_( + AgentLiveSession.agent_id == agent_id, + AgentLiveSession.label == label, + AgentLiveSession.detached_at.is_(None), + ) + ) + existing = (await db.execute(existing_query)).scalar_one_or_none() + + if existing is not None: + if existing.daemon_id == daemon_id: + # Same daemon → supersede. Mark old row detached + insert new. + existing.detached_at = now + old_token = existing.ipc_session_token + logger.info( + "attachment_token_revoked", + extra={ + "agent_id": agent_id, + "label": label, + "daemon_id": str(daemon_id), + "old_token": old_token, + "new_token": ipc_session_token, + "reason": "reattach_supersede", + }, + ) + # cue_id space note: the table's ix_agent_live_sessions_cue_id + # is globally unique INCLUDING soft-detached rows. Reusing the + # old row's cue_id on the new row would collide with the + # detached row. Use the new ipc_session_token as cue_id — + # daemon-issued ULIDs are globally unique by construction. + new_row = AgentLiveSession( + agent_id=agent_id, + label=label, + task_name=task_name, + cue_id=ipc_session_token, + is_default=existing.is_default, + attached_at=now, + ipc_session_token=ipc_session_token, + transport="ipc", + daemon_id=daemon_id, + last_reconciled_at=now, + monitor_version=monitor_version, + ) + db.add(new_row) + await db.flush() + return AttachmentCreateResult( + status="created", row=new_row, supersedes_token=old_token + ) + # Cross-daemon → caller should escalate; do NOT clobber. + return AttachmentCreateResult(status="conflict_cross_daemon", existing=existing) + + # No existing row → simple INSERT. + new_row = AgentLiveSession( + agent_id=agent_id, + label=label, + task_name=task_name, + cue_id=ipc_session_token, # Phase 1: use token as cue_id stand-in + is_default=(label == "main"), + attached_at=now, + ipc_session_token=ipc_session_token, + transport="ipc", + daemon_id=daemon_id, + last_reconciled_at=now, + monitor_version=monitor_version, + ) + db.add(new_row) + await db.flush() + return AttachmentCreateResult(status="created", row=new_row) + + +# ─────────────────────────────────────────────────────────────────────── +# delete_attachment — DELETE /v1/agents//attachments/ +# ─────────────────────────────────────────────────────────────────────── + + +async def delete_attachment( + db: AsyncSession, + *, + agent_id: str, + ipc_session_token: str, + daemon_id: UUID, +) -> AttachmentDeleteResult: + """Idempotent DELETE by token. Scoped to caller's daemon_id.""" + query = select(AgentLiveSession).where( + and_( + AgentLiveSession.agent_id == agent_id, + AgentLiveSession.ipc_session_token == ipc_session_token, + AgentLiveSession.daemon_id == daemon_id, + AgentLiveSession.detached_at.is_(None), + ) + ) + row = (await db.execute(query)).scalar_one_or_none() + if row is None: + return AttachmentDeleteResult(status="already_deleted") + row.detached_at = datetime.now(timezone.utc) + logger.info( + "attachment_token_revoked", + extra={ + "agent_id": agent_id, + "label": row.label, + "daemon_id": str(daemon_id), + "old_token": ipc_session_token, + "new_token": None, + "reason": "explicit_delete", + }, + ) + return AttachmentDeleteResult(status="deleted") + + +# ─────────────────────────────────────────────────────────────────────── +# reconcile_attachments — POST /v1/agents/reconcile-attachments +# ─────────────────────────────────────────────────────────────────────── + + +async def reconcile_attachments( + db: AsyncSession, + *, + daemon_id: UUID, + attachments: List[AttachmentReconcileEntry], +) -> AttachmentReconcileResult: + """Single atomic transaction: UPSERT reported attachments + downgrade + unmentioned-for-this-daemon rows to ``transport='poll'``. + + Conservative downgrade-not-delete per CMA Q-G lean — daemon might be + flapping; first absence shouldn't lose the row. Daily cleanup job + deletes ``transport='poll'`` rows >24h stale. + """ + now = datetime.now(timezone.utc) + reported_tokens = {a.ipc_session_token for a in attachments} + + upserted_count = 0 + for entry in attachments: + # Try update existing row (matched by daemon_id + label + token). + result = await db.execute( + update(AgentLiveSession) + .where( + and_( + AgentLiveSession.daemon_id == daemon_id, + AgentLiveSession.label == entry.label, + AgentLiveSession.ipc_session_token == entry.ipc_session_token, + AgentLiveSession.detached_at.is_(None), + ) + ) + .values( + transport="ipc", + last_reconciled_at=now, + task_name=entry.task_name, + ) + .execution_options(synchronize_session=False) + ) + if result.rowcount > 0: + upserted_count += result.rowcount + continue + # No matching row → would-be-INSERT path. Phase 1 reconcile is + # idempotent-on-existing only; daemon should have called POST + # /attachments first. Log + skip for forensics. + logger.info( + "reconcile_unknown_attachment", + extra={ + "daemon_id": str(daemon_id), + "label": entry.label, + "token": entry.ipc_session_token, + "task_name": entry.task_name, + }, + ) + + # Downgrade unmentioned rows for this daemon to transport='poll'. + downgrade_query = update(AgentLiveSession).where( + and_( + AgentLiveSession.daemon_id == daemon_id, + AgentLiveSession.transport == "ipc", + AgentLiveSession.detached_at.is_(None), + AgentLiveSession.ipc_session_token.notin_(reported_tokens) + if reported_tokens + else (True == True), # noqa: E712 — empty reconcile: downgrade ALL + ) + ).values(transport="poll").execution_options(synchronize_session=False) + downgrade_result = await db.execute(downgrade_query) + downgraded_count = downgrade_result.rowcount or 0 + + return AttachmentReconcileResult( + upserted_count=upserted_count, downgraded_count=downgraded_count + ) diff --git a/parity-manifest.json b/parity-manifest.json index 1e1bb99..871a78b 100644 --- a/parity-manifest.json +++ b/parity-manifest.json @@ -242,6 +242,13 @@ "last_synced": "2026-05-11", "ported_in": "item-2b-cursor-advance-ack-port", "deviation": "Renumbered 062 \u2192 034. Schema verbatim: subscriptions.last_acked_event_id BIGINT NULL." + }, + { + "path": "alembic/versions/035_agent_live_sessions_ipc_attachment.py", + "private_counterpart": "alembic/versions/063_agent_live_sessions_ipc_attachment.py", + "last_synced": "2026-05-12", + "ported_in": "item-b-phase-1-substrate", + "deviation": "OSS renumber 063\u2192035 (OSS alembic head was 034 when ported; private chain at 064 post-rebase). down_revision adjusted accordingly." } ], "app_core": [ @@ -268,8 +275,8 @@ { "path": "app/main.py", "private_counterpart": "app/main.py", - "last_synced": "2026-05-11", - "ported_in": "bodyverify-layer-1-5-universal-middleware", + "last_synced": "2026-05-12", + "ported_in": "bodyverify-layer-1-5-universal-middleware + item-b-phase-1-substrate", "deviation": "BodyVerify port: VerifyEchoMiddleware registered between CORS/RequestId and RateLimit. Private has additional middlewares (SecurityHeaders, AccessAudit) hosted-only; ordering preserved relative to shared middlewares." }, { @@ -405,9 +412,9 @@ { "path": "app/routers/cues.py", "private_counterpart": "app/routers/cues.py", - "last_synced": "2026-05-11", - "ported_in": "bodyverify-layer-1-substrate-echo-back", - "deviation": "BodyVerify port: fire_cue handler integrates apply_verify_echo on response. OSS FireRequest carries only send_at; private's payload_override (dict with user-supplied string content carrying corruption-vector) is hosted-only. Metachar-class parametrization tests on the fire path are intentionally private-only \u2014 substrate helper coverage is exercised via messages endpoint tests. BodyVerify hotfix: helper takes body_text=None on OSS (no string user-content field on FireRequest)." + "last_synced": "2026-05-12", + "ported_in": "bodyverify-layer-1-substrate-echo-back + item-b-phase-1-substrate", + "deviation": "BodyVerify port: fire_cue handler integrates apply_verify_echo on response. OSS FireRequest carries only send_at; private's payload_override (dict with user-supplied string content carrying corruption-vector) is hosted-only. Metachar-class parametrization tests on the fire path are intentionally private-only \u2014 substrate helper coverage is exercised via messages endpoint tests. BodyVerify hotfix: helper takes body_text=None on OSS (no string user-content field on FireRequest). Item B Phase 1: added _build_ipc_delivery_metadata helper + outcome_metadata stamping for transport=ipc agent_live_sessions." }, { "path": "app/routers/device_code.py", @@ -463,6 +470,12 @@ "last_synced": "2026-05-11", "ported_in": "event-emit-primitive-port (PR-1b) + long-poll-port (PR-1b spec Q1) + item-1-inline-body-port + item-2b-cursor-advance-ack-port", "deviation": "Long-poll uses internal polling loop rather than PostgreSQL LISTEN/NOTIFY per impl trade-off; wire contract identical. Future LISTEN/NOTIFY swap tracked at cmp0m7n7r (private) \u2014 file OSS counterpart row when prioritized. Item 1 addition: SubscriptionCreate accepts inline_body field; SubscriptionResponse surfaces it. Item 2(b) addition: PATCH /subscriptions/{id}/ack endpoint + AckSubscriptionRequest schema + _advance_ack_after_pull helper + last_acked_event_id surfaced on SubscriptionResponse." + }, + { + "path": "app/routers/ipc_attachments.py", + "private_counterpart": "app/routers/ipc_attachments.py", + "last_synced": "2026-05-12", + "ported_in": "item-b-phase-1-substrate" } ], "schemas": [ @@ -509,6 +522,12 @@ "last_synced": "2026-05-11", "ported_in": "messaging-primitive-port + messaging-emission-port (PR-2a)", "deviation": "from_api_key_id field omitted from MessageResponse (multi-key scoping is hosted-only). PR-2a additions verbatim: correlation_id on MessageCreate; correlation_id + dispatch_priority_bucket + message_dispatch_error on MessageResponse." + }, + { + "path": "app/schemas/ipc_attachment.py", + "private_counterpart": "app/schemas/ipc_attachment.py", + "last_synced": "2026-05-12", + "ported_in": "item-b-phase-1-substrate" } ], "services": [ @@ -596,6 +615,12 @@ "last_synced": "2026-05-11", "ported_in": "event-emit-primitive-port (PR-1b) + phase-4b-digest-batching-port + item-2a-turn-pass-port + item-1-inline-body-port + item-2b-cursor-advance-ack-port", "deviation": "Phase 4b addition: `message.digest` added to KNOWN_EVENT_TYPES registry. Item 2(a) addition: `turn.pass` added \u2014 META-only envelope for inbox-watcher recipes. Item 1 addition: INLINE_BODY_MAX_BYTES constant (32KB) + `_maybe_embed_body` pure helper + emit_event accepts body_text=None kwarg + create_subscription accepts inline_body=False kwarg. Body embedding gated on active subscription's inline_body=True; size guardrail with body_omitted flag for >32KB bodies. Item 2(b) addition: advance_ack_watermark + ack_subscription service helpers with watermark monotonicity (never rewinds)." + }, + { + "path": "app/services/ipc_attachment_service.py", + "private_counterpart": "app/services/ipc_attachment_service.py", + "last_synced": "2026-05-12", + "ported_in": "item-b-phase-1-substrate" } ], "utils": [ @@ -694,4 +719,4 @@ } ] } -} +} \ No newline at end of file diff --git a/tests/test_ipc_attachments.py b/tests/test_ipc_attachments.py new file mode 100644 index 0000000..66c02b3 --- /dev/null +++ b/tests/test_ipc_attachments.py @@ -0,0 +1,1053 @@ +"""Item B Phase 1 — substrate tests for IPC attachment endpoints + service layer ++ ASYNC fire-accept dispatcher. + +Live-delivery-v3 substrate primitive. Joint design lock at +https://trydock.ai/mike/live-delivery-v3-build-hub. Mike Q-B ratify +2026-05-12 ~00:38Z: ASYNC fire-accept dispatcher path. + +Coverage targets: + +- Service layer: ``create_attachment`` (3 branches: created / same-daemon + supersede / cross-daemon conflict), ``delete_attachment`` (idempotent / + delete), ``reconcile_attachments`` (UPSERT + downgrade-unmentioned). +- Router endpoints: POST /attachments (201 + 409), DELETE (204 + 200), + POST /reconcile-attachments (200 + 400 mismatch). +- Daemon-id header: missing / malformed → 400. +- Token-format validation: app-layer ULID regex on AttachmentCreate. +- ASYNC dispatcher: _build_ipc_delivery_metadata helper (returns dict on + match, None otherwise). +- Backwards-compat: existing webhook + worker-transport cues unchanged + (no outcome_metadata stamp). +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timezone + +import pytest +from sqlalchemy import select + +from app.models.agent import Agent +from app.models.agent_live_session import AgentLiveSession + + +DAEMON_ID_HEADER = "X-CueAPI-Daemon-Id" + + +# ─────────────────────────────────────────────────────────────────────── +# Helper fixtures + utilities +# ─────────────────────────────────────────────────────────────────────── + + +async def _make_agent(client, auth_headers, slug=None): + payload = {"display_name": f"Agent {uuid.uuid4().hex[:6]}"} + if slug: + payload["slug"] = slug + r = await client.post("/v1/agents", json=payload, headers=auth_headers) + assert r.status_code == 201, r.text + return r.json() + + +def _make_ulid() -> str: + """Generate a valid 26-char ULID-shaped token for testing.""" + # Crockford base32 alphabet (no I/L/O/U); just need a regex-passing string. + return "01ABCDEFGHJKMNPQRSTV" + uuid.uuid4().hex[:6].upper().replace("I", "J").replace("L", "M").replace("O", "P").replace("U", "V") + + +def _daemon_headers(daemon_id: str | None = None) -> dict: + return {DAEMON_ID_HEADER: daemon_id or str(uuid.uuid4())} + + +# ─────────────────────────────────────────────────────────────────────── +# Helper unit tests — _parse_daemon_id, _build_ipc_delivery_metadata +# ─────────────────────────────────────────────────────────────────────── + + +def test_parse_daemon_id_valid_uuid(): + from app.routers.ipc_attachments import _parse_daemon_id + valid = str(uuid.uuid4()) + parsed, err = _parse_daemon_id(valid) + assert err is None + assert str(parsed) == valid + + +def test_parse_daemon_id_missing_returns_400(): + from app.routers.ipc_attachments import _parse_daemon_id + parsed, err = _parse_daemon_id(None) + assert parsed is None + assert err is not None + assert err.status_code == 400 + + +def test_parse_daemon_id_empty_string_returns_400(): + from app.routers.ipc_attachments import _parse_daemon_id + parsed, err = _parse_daemon_id("") + assert parsed is None + assert err is not None and err.status_code == 400 + + +def test_parse_daemon_id_malformed_returns_400(): + from app.routers.ipc_attachments import _parse_daemon_id + parsed, err = _parse_daemon_id("not-a-uuid") + assert parsed is None + assert err is not None and err.status_code == 400 + + +def test_parse_daemon_id_whitespace_stripped(): + from app.routers.ipc_attachments import _parse_daemon_id + valid = str(uuid.uuid4()) + parsed, err = _parse_daemon_id(f" {valid} ") + assert err is None + assert str(parsed) == valid + + +@pytest.mark.asyncio +async def test_build_ipc_delivery_metadata_none_when_no_attachment(db_session): + """Cue with no agent_live_sessions row → None (no metadata stamp).""" + from app.routers.cues import _build_ipc_delivery_metadata + result = await _build_ipc_delivery_metadata(db_session, "cue_nonexistent") + assert result is None + + +@pytest.mark.asyncio +async def test_build_ipc_delivery_metadata_ipc_active_returns_dict( + db_session, registered_user +): + """Cue with an active IPC attachment → returns {delivery_mode_requested: ipc}.""" + from app.routers.cues import _build_ipc_delivery_metadata + from app.utils.ids import generate_agent_id + + user = ( + await db_session.execute( + select(__import__("app.models.user", fromlist=["User"]).User).where( + __import__("app.models.user", fromlist=["User"]).User.email + == registered_user["email"] + ) + ) + ).scalar_one() + + agent = Agent( + id=generate_agent_id(), + user_id=user.id, + slug=f"ipc-meta-{uuid.uuid4().hex[:6]}", + display_name="IPC Meta Agent", + ) + db_session.add(agent) + await db_session.flush() + sess = AgentLiveSession( + agent_id=agent.id, + label="main", + cue_id="cue_ipctest12345", + task_name="max-claude-code-test", + attached_at=datetime.now(timezone.utc), + ipc_session_token=_make_ulid(), + transport="ipc", + daemon_id=uuid.uuid4(), + last_reconciled_at=datetime.now(timezone.utc), + ) + db_session.add(sess) + await db_session.commit() + + result = await _build_ipc_delivery_metadata(db_session, "cue_ipctest12345") + assert result == {"delivery_mode_requested": "ipc"} + + +@pytest.mark.asyncio +async def test_build_ipc_delivery_metadata_poll_returns_none( + db_session, registered_user +): + """Cue with attachment on transport='poll' → None (not IPC).""" + from app.routers.cues import _build_ipc_delivery_metadata + from app.utils.ids import generate_agent_id + + user = ( + await db_session.execute( + select(__import__("app.models.user", fromlist=["User"]).User).where( + __import__("app.models.user", fromlist=["User"]).User.email + == registered_user["email"] + ) + ) + ).scalar_one() + + agent = Agent( + id=generate_agent_id(), + user_id=user.id, + slug=f"ipc-poll-{uuid.uuid4().hex[:6]}", + display_name="IPC Poll Agent", + ) + db_session.add(agent) + await db_session.flush() + sess = AgentLiveSession( + agent_id=agent.id, + label="main", + cue_id="cue_polltest12345", + task_name="max-claude-code-test", + attached_at=datetime.now(timezone.utc), + transport="poll", # default — not IPC + ) + db_session.add(sess) + await db_session.commit() + + result = await _build_ipc_delivery_metadata(db_session, "cue_polltest12345") + assert result is None + + +@pytest.mark.asyncio +async def test_build_ipc_delivery_metadata_detached_returns_none( + db_session, registered_user +): + """Cue with IPC attachment but detached_at set → None (treated as gone).""" + from app.routers.cues import _build_ipc_delivery_metadata + from app.utils.ids import generate_agent_id + + user = ( + await db_session.execute( + select(__import__("app.models.user", fromlist=["User"]).User).where( + __import__("app.models.user", fromlist=["User"]).User.email + == registered_user["email"] + ) + ) + ).scalar_one() + + agent = Agent( + id=generate_agent_id(), + user_id=user.id, + slug=f"ipc-det-{uuid.uuid4().hex[:6]}", + display_name="IPC Detached Agent", + ) + db_session.add(agent) + await db_session.flush() + sess = AgentLiveSession( + agent_id=agent.id, + label="main", + cue_id="cue_detached12345", + task_name="max-claude-code-test", + attached_at=datetime.now(timezone.utc), + detached_at=datetime.now(timezone.utc), # soft-detached + ipc_session_token=_make_ulid(), + transport="ipc", + daemon_id=uuid.uuid4(), + ) + db_session.add(sess) + await db_session.commit() + + result = await _build_ipc_delivery_metadata(db_session, "cue_detached12345") + assert result is None + + +# ─────────────────────────────────────────────────────────────────────── +# Endpoint integration — POST /v1/agents/{ref}/attachments +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_post_attachment_missing_daemon_header_400(client, auth_headers): + agent = await _make_agent(client, auth_headers, slug=f"miss-{uuid.uuid4().hex[:6]}") + r = await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": _make_ulid(), + }, + headers=auth_headers, # no X-CueAPI-Daemon-Id + ) + assert r.status_code == 400 + assert r.json()["error"]["code"] == "missing_daemon_id" + + +@pytest.mark.asyncio +async def test_post_attachment_malformed_daemon_header_400(client, auth_headers): + agent = await _make_agent(client, auth_headers, slug=f"mal-{uuid.uuid4().hex[:6]}") + r = await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": _make_ulid(), + }, + headers={**auth_headers, DAEMON_ID_HEADER: "not-a-uuid"}, + ) + assert r.status_code == 400 + assert r.json()["error"]["code"] == "invalid_daemon_id" + + +@pytest.mark.asyncio +async def test_post_attachment_agent_not_found_404(client, auth_headers): + r = await client.post( + "/v1/agents/agt_doesnotexist/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": _make_ulid(), + }, + headers={**auth_headers, **_daemon_headers()}, + ) + assert r.status_code == 404 + assert r.json()["error"]["code"] == "agent_not_found" + + +@pytest.mark.asyncio +async def test_post_attachment_slug_form_rejected_400(client, auth_headers): + """Phase 1: only opaque agent_id; slug-form deferred.""" + r = await client.post( + "/v1/agents/some-slug@user/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": _make_ulid(), + }, + headers={**auth_headers, **_daemon_headers()}, + ) + assert r.status_code == 400 + assert r.json()["error"]["code"] == "invalid_agent_ref" + + +@pytest.mark.asyncio +async def test_post_attachment_invalid_token_format_422(client, auth_headers): + """App-layer ULID regex rejects bad token shapes at the Pydantic layer.""" + agent = await _make_agent(client, auth_headers, slug=f"bad-{uuid.uuid4().hex[:6]}") + r = await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": "not-a-ulid", # < 26 chars + }, + headers={**auth_headers, **_daemon_headers()}, + ) + # Pydantic rejects with 422 validation error (min_length=26) + assert r.status_code == 422 + + +@pytest.mark.asyncio +async def test_post_attachment_happy_201(client, auth_headers): + agent = await _make_agent(client, auth_headers, slug=f"ok-{uuid.uuid4().hex[:6]}") + daemon_id = str(uuid.uuid4()) + token = _make_ulid() + r = await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": token, + }, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + assert r.status_code == 201, r.text + data = r.json() + assert data["agent_id"] == agent["id"] + assert data["label"] == "main" + assert data["transport"] == "ipc" + assert data["ipc_session_token"] == token + assert data["daemon_id"] == daemon_id + assert data["supersedes_token"] is None + + +@pytest.mark.asyncio +async def test_post_attachment_same_daemon_supersede(client, auth_headers): + """Same (agent, label, daemon_id) reattach: REPLACE; supersedes_token set.""" + agent = await _make_agent(client, auth_headers, slug=f"sup-{uuid.uuid4().hex[:6]}") + daemon_id = str(uuid.uuid4()) + token1 = _make_ulid() + r1 = await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": token1, + }, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + assert r1.status_code == 201 + + token2 = _make_ulid() + r2 = await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": token2, + }, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + assert r2.status_code == 201, r2.text + data2 = r2.json() + assert data2["ipc_session_token"] == token2 + assert data2["supersedes_token"] == token1 + + +@pytest.mark.asyncio +async def test_post_attachment_cross_daemon_conflict_409(client, auth_headers): + """Different daemon attempts same (agent, label): 409 with existing_daemon_id.""" + agent = await _make_agent(client, auth_headers, slug=f"x-{uuid.uuid4().hex[:6]}") + daemon_a = str(uuid.uuid4()) + token_a = _make_ulid() + r1 = await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": token_a, + }, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_a}, + ) + assert r1.status_code == 201 + + daemon_b = str(uuid.uuid4()) + r2 = await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={ + "label": "main", + "task_name": "max-claude-code-test", + "ipc_session_token": _make_ulid(), + }, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_b}, + ) + assert r2.status_code == 409, r2.text + err = r2.json()["error"] + assert err["code"] == "attachment_exists" + assert err["existing_daemon_id"] == daemon_a + assert err["existing_token"] == token_a + assert "existing_attached_at" in err + + +# ─────────────────────────────────────────────────────────────────────── +# DELETE /v1/agents/{ref}/attachments/{token} +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_delete_attachment_first_time_204(client, auth_headers): + agent = await _make_agent(client, auth_headers, slug=f"d-{uuid.uuid4().hex[:6]}") + daemon_id = str(uuid.uuid4()) + token = _make_ulid() + await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={"label": "main", "task_name": "x", "ipc_session_token": token}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + r = await client.delete( + f"/v1/agents/{agent['id']}/attachments/{token}", + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + assert r.status_code == 204 + + +@pytest.mark.asyncio +async def test_delete_attachment_idempotent_200(client, auth_headers): + """Second DELETE on same token: 200 with already_deleted reason.""" + agent = await _make_agent(client, auth_headers, slug=f"id-{uuid.uuid4().hex[:6]}") + daemon_id = str(uuid.uuid4()) + token = _make_ulid() + await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={"label": "main", "task_name": "x", "ipc_session_token": token}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + # First delete: 204 + await client.delete( + f"/v1/agents/{agent['id']}/attachments/{token}", + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + # Second delete: 200 idempotent + r = await client.delete( + f"/v1/agents/{agent['id']}/attachments/{token}", + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + assert r.status_code == 200 + body = r.json() + assert body["deleted"] is False + assert body["reason"] == "already_deleted" + + +@pytest.mark.asyncio +async def test_delete_attachment_unknown_token_idempotent_200(client, auth_headers): + """DELETE on a token that was never created: 200 idempotent (not 404).""" + agent = await _make_agent(client, auth_headers, slug=f"u-{uuid.uuid4().hex[:6]}") + r = await client.delete( + f"/v1/agents/{agent['id']}/attachments/{_make_ulid()}", + headers={**auth_headers, **_daemon_headers()}, + ) + assert r.status_code == 200 + assert r.json()["deleted"] is False + + +@pytest.mark.asyncio +async def test_delete_attachment_wrong_daemon_idempotent_200(client, auth_headers): + """Daemon B trying to DELETE daemon A's token: scoped lookup misses → 200 idempotent. + + (Phase 1 design: daemon scoping prevents cross-daemon deletion by silently + no-op-ing; daemon A's token stays alive. Daemon A's reconcile or explicit + DELETE remains the path to revoke.) + """ + agent = await _make_agent(client, auth_headers, slug=f"wd-{uuid.uuid4().hex[:6]}") + daemon_a = str(uuid.uuid4()) + daemon_b = str(uuid.uuid4()) + token = _make_ulid() + await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={"label": "main", "task_name": "x", "ipc_session_token": token}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_a}, + ) + r = await client.delete( + f"/v1/agents/{agent['id']}/attachments/{token}", + headers={**auth_headers, DAEMON_ID_HEADER: daemon_b}, + ) + assert r.status_code == 200 + assert r.json()["deleted"] is False + + +# ─────────────────────────────────────────────────────────────────────── +# POST /v1/agents/reconcile-attachments +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_reconcile_missing_daemon_header_400(client, auth_headers): + r = await client.post( + "/v1/agents/reconcile-attachments", + json={ + "daemon_id": str(uuid.uuid4()), + "reconciled_at": datetime.now(timezone.utc).isoformat(), + "attachments": [], + }, + headers=auth_headers, + ) + assert r.status_code == 400 + assert r.json()["error"]["code"] == "missing_daemon_id" + + +@pytest.mark.asyncio +async def test_reconcile_header_body_mismatch_400(client, auth_headers): + """Header daemon_id != body daemon_id → 400 daemon_id_mismatch.""" + header_id = str(uuid.uuid4()) + body_id = str(uuid.uuid4()) + r = await client.post( + "/v1/agents/reconcile-attachments", + json={ + "daemon_id": body_id, + "reconciled_at": datetime.now(timezone.utc).isoformat(), + "attachments": [], + }, + headers={**auth_headers, DAEMON_ID_HEADER: header_id}, + ) + assert r.status_code == 400 + assert r.json()["error"]["code"] == "daemon_id_mismatch" + + +@pytest.mark.asyncio +async def test_reconcile_empty_downgrades_all_daemons_rows(client, auth_headers): + """Reconcile with empty attachments list downgrades all this daemon's IPC rows + to transport='poll'.""" + agent = await _make_agent(client, auth_headers, slug=f"rc-{uuid.uuid4().hex[:6]}") + daemon_id = str(uuid.uuid4()) + # Attach 2 sessions + await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={"label": "main", "task_name": "x", "ipc_session_token": _make_ulid()}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={"label": "pr-watcher", "task_name": "y", "ipc_session_token": _make_ulid()}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + + # Reconcile with empty attachments — should downgrade both + r = await client.post( + "/v1/agents/reconcile-attachments", + json={ + "daemon_id": daemon_id, + "reconciled_at": datetime.now(timezone.utc).isoformat(), + "attachments": [], + }, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + assert r.status_code == 200, r.text + data = r.json() + assert data["upserted_count"] == 0 + assert data["downgraded_count"] == 2 + + +@pytest.mark.asyncio +async def test_reconcile_partial_upsert_and_downgrade(client, auth_headers): + """Reconcile reporting 1 of 2 attachments: 1 upserted, 1 downgraded.""" + agent = await _make_agent(client, auth_headers, slug=f"rp-{uuid.uuid4().hex[:6]}") + daemon_id = str(uuid.uuid4()) + token_main = _make_ulid() + token_pr = _make_ulid() + await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={"label": "main", "task_name": "x", "ipc_session_token": token_main}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + await client.post( + f"/v1/agents/{agent['id']}/attachments", + json={"label": "pr-watcher", "task_name": "y", "ipc_session_token": token_pr}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + + # Reconcile reports only "main" — "pr-watcher" should downgrade + now = datetime.now(timezone.utc).isoformat() + r = await client.post( + "/v1/agents/reconcile-attachments", + json={ + "daemon_id": daemon_id, + "reconciled_at": now, + "attachments": [ + { + "label": "main", + "task_name": "x", + "ipc_session_token": token_main, + "attached_at": now, + } + ], + }, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_id}, + ) + assert r.status_code == 200, r.text + data = r.json() + assert data["upserted_count"] == 1 + assert data["downgraded_count"] == 1 + + +@pytest.mark.asyncio +async def test_reconcile_does_not_affect_other_daemons_rows(client, auth_headers): + """Daemon X reconcile does NOT touch daemon Y's rows (daemon-id scoping).""" + agent_a = await _make_agent(client, auth_headers, slug=f"da-{uuid.uuid4().hex[:6]}") + agent_b = await _make_agent(client, auth_headers, slug=f"db-{uuid.uuid4().hex[:6]}") + daemon_x = str(uuid.uuid4()) + daemon_y = str(uuid.uuid4()) + await client.post( + f"/v1/agents/{agent_a['id']}/attachments", + json={"label": "main", "task_name": "x", "ipc_session_token": _make_ulid()}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_x}, + ) + await client.post( + f"/v1/agents/{agent_b['id']}/attachments", + json={"label": "main", "task_name": "y", "ipc_session_token": _make_ulid()}, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_y}, + ) + + # daemon_y reconciles with empty list → should downgrade daemon_y's row only + r = await client.post( + "/v1/agents/reconcile-attachments", + json={ + "daemon_id": daemon_y, + "reconciled_at": datetime.now(timezone.utc).isoformat(), + "attachments": [], + }, + headers={**auth_headers, DAEMON_ID_HEADER: daemon_y}, + ) + assert r.status_code == 200 + assert r.json()["downgraded_count"] == 1 # daemon_y's row only + + +# ─────────────────────────────────────────────────────────────────────── +# Backwards-compat: existing webhook + worker-transport paths unchanged +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_backcompat_existing_agent_live_sessions_rows_default_to_poll( + db_session, registered_user +): + """Existing v2.x rows inserted without specifying transport inherit 'poll'.""" + from app.utils.ids import generate_agent_id + + user = ( + await db_session.execute( + select(__import__("app.models.user", fromlist=["User"]).User).where( + __import__("app.models.user", fromlist=["User"]).User.email + == registered_user["email"] + ) + ) + ).scalar_one() + + agent = Agent( + id=generate_agent_id(), + user_id=user.id, + slug=f"bc-{uuid.uuid4().hex[:6]}", + display_name="Backcompat Agent", + ) + db_session.add(agent) + await db_session.flush() + # Insert WITHOUT specifying transport — should default to 'poll' + sess = AgentLiveSession( + agent_id=agent.id, + label="main", + cue_id=f"cue_bc{uuid.uuid4().hex[:6]}", + task_name="max-claude-code-test", + attached_at=datetime.now(timezone.utc), + ) + db_session.add(sess) + await db_session.commit() + await db_session.refresh(sess) + assert sess.transport == "poll" + assert sess.daemon_id is None + assert sess.ipc_session_token is None + assert sess.last_reconciled_at is None + + +# ─────────────────────────────────────────────────────────────────────── +# ASGI dispatch integration — exercises fire_cue's outcome_metadata= +# parameter line through real route + DB so pytest-cov on CI Py 3.11 +# traces it. The pure helper _build_ipc_delivery_metadata is unit-tested +# above; this test pins the ASSIGNMENT line where the helper result +# flows into the Execution row. +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_fire_cue_with_ipc_attachment_stamps_outcome_metadata( + client, auth_headers, db_session +): + """Fire a cue with an active IPC attachment → execution.outcome_metadata + carries {"delivery_mode_requested": "ipc"}. + + This integration test exercises the ASGI-dispatched assignment line + `outcome_metadata=ipc_outcome_metadata` in app/routers/cues.py:fire_cue. + Pure helper is unit-tested above; this test ensures the wiring line + actually flows through. + """ + from app.models.execution import Execution + from app.models.cue import Cue + from app.utils.ids import generate_agent_id + + # Create a cue via the API (so it has the right shape + ownership) + cue_resp = await client.post( + "/v1/cues", + json={ + "name": f"ipc-fire-{uuid.uuid4().hex[:6]}", + "schedule": {"type": "recurring", "cron": "0 * * * *"}, + "transport": "worker", # avoids webhook outbox creation noise + "payload": {"task": "ipc-fire-test"}, + }, + headers=auth_headers, + ) + assert cue_resp.status_code == 201, cue_resp.text + cue_id = cue_resp.json()["id"] + + # Resolve the calling user + create an agent_live_session row that + # matches the cue_id with transport='ipc'. + cue_row = ( + await db_session.execute(select(Cue).where(Cue.id == cue_id)) + ).scalar_one() + agent = Agent( + id=generate_agent_id(), + user_id=cue_row.user_id, + slug=f"ipc-fire-{uuid.uuid4().hex[:6]}", + display_name="IPC Fire Test Agent", + ) + db_session.add(agent) + await db_session.flush() + sess = AgentLiveSession( + agent_id=agent.id, + label="main", + cue_id=cue_id, # match the cue + task_name="max-claude-code-test", + attached_at=datetime.now(timezone.utc), + ipc_session_token=_make_ulid(), + transport="ipc", + daemon_id=uuid.uuid4(), + last_reconciled_at=datetime.now(timezone.utc), + ) + db_session.add(sess) + await db_session.commit() + + # Fire the cue via the API + fire_resp = await client.post( + f"/v1/cues/{cue_id}/fire", + headers=auth_headers, + ) + assert fire_resp.status_code == 200, fire_resp.text + execution_id = fire_resp.json()["id"] + + # Verify the execution's outcome_metadata carries the IPC stamp + exec_row = ( + await db_session.execute( + select(Execution).where(Execution.id == uuid.UUID(execution_id)) + ) + ).scalar_one() + assert exec_row.outcome_metadata == {"delivery_mode_requested": "ipc"} + + +@pytest.mark.asyncio +async def test_fire_cue_without_ipc_attachment_leaves_outcome_metadata_null( + client, auth_headers, db_session +): + """Fire a cue with NO IPC attachment → execution.outcome_metadata is None. + + Companion to the IPC-stamped test above. Pins the None-branch of the + helper-result flowing through fire_cue's Execution construction. + """ + from app.models.execution import Execution + + cue_resp = await client.post( + "/v1/cues", + json={ + "name": f"no-ipc-{uuid.uuid4().hex[:6]}", + "schedule": {"type": "recurring", "cron": "0 * * * *"}, + "transport": "worker", + "payload": {"task": "no-ipc-test"}, + }, + headers=auth_headers, + ) + assert cue_resp.status_code == 201, cue_resp.text + cue_id = cue_resp.json()["id"] + + fire_resp = await client.post( + f"/v1/cues/{cue_id}/fire", + headers=auth_headers, + ) + assert fire_resp.status_code == 200, fire_resp.text + execution_id = fire_resp.json()["id"] + + exec_row = ( + await db_session.execute( + select(Execution).where(Execution.id == uuid.UUID(execution_id)) + ) + ).scalar_one() + assert exec_row.outcome_metadata is None + + +# ─────────────────────────────────────────────────────────────────────── +# Service-layer direct unit tests (cover ASGI-dispatched service code +# bodies that pytest-cov on CI Py 3.11 misses; integration tests above +# exercise them at runtime but pytest-cov has known ASGI tracing gaps). +# ─────────────────────────────────────────────────────────────────────── + + +async def _make_agent_row(db_session, registered_user, slug_suffix: str): + """Test fixture: create + return an Agent row owned by registered_user.""" + from app.utils.ids import generate_agent_id + from app.models.user import User + + user = ( + await db_session.execute( + select(User).where(User.email == registered_user["email"]) + ) + ).scalar_one() + agent = Agent( + id=generate_agent_id(), + user_id=user.id, + slug=f"svc-{slug_suffix}-{uuid.uuid4().hex[:6]}", + display_name=f"Svc Test {slug_suffix}", + ) + db_session.add(agent) + await db_session.flush() + return agent + + +@pytest.mark.asyncio +async def test_service_create_attachment_happy_returns_created(db_session, registered_user): + from app.services.ipc_attachment_service import create_attachment + agent = await _make_agent_row(db_session, registered_user, "ca-h") + token = _make_ulid() + result = await create_attachment( + db_session, agent_id=agent.id, label="main", task_name="t", + ipc_session_token=token, daemon_id=uuid.uuid4(), + ) + assert result.status == "created" + assert result.row is not None + assert result.row.ipc_session_token == token + assert result.row.transport == "ipc" + assert result.row.is_default is True + assert result.supersedes_token is None + + +@pytest.mark.asyncio +async def test_service_create_attachment_non_main_label_not_default(db_session, registered_user): + from app.services.ipc_attachment_service import create_attachment + agent = await _make_agent_row(db_session, registered_user, "ca-n") + result = await create_attachment( + db_session, agent_id=agent.id, label="pr-watcher", task_name="t", + ipc_session_token=_make_ulid(), daemon_id=uuid.uuid4(), + ) + assert result.row is not None + assert result.row.is_default is False + + +@pytest.mark.asyncio +async def test_service_create_attachment_same_daemon_supersedes(db_session, registered_user): + from app.services.ipc_attachment_service import create_attachment + agent = await _make_agent_row(db_session, registered_user, "ca-s") + daemon_id = uuid.uuid4() + token1 = _make_ulid() + await create_attachment( + db_session, agent_id=agent.id, label="main", task_name="t", + ipc_session_token=token1, daemon_id=daemon_id, + ) + token2 = _make_ulid() + result = await create_attachment( + db_session, agent_id=agent.id, label="main", task_name="t2", + ipc_session_token=token2, daemon_id=daemon_id, + ) + assert result.status == "created" + assert result.row.ipc_session_token == token2 + assert result.supersedes_token == token1 + + +@pytest.mark.asyncio +async def test_service_create_attachment_cross_daemon_conflict(db_session, registered_user): + from app.services.ipc_attachment_service import create_attachment + agent = await _make_agent_row(db_session, registered_user, "ca-x") + daemon_a = uuid.uuid4() + daemon_b = uuid.uuid4() + token_a = _make_ulid() + await create_attachment( + db_session, agent_id=agent.id, label="main", task_name="t", + ipc_session_token=token_a, daemon_id=daemon_a, + ) + result = await create_attachment( + db_session, agent_id=agent.id, label="main", task_name="t", + ipc_session_token=_make_ulid(), daemon_id=daemon_b, + ) + assert result.status == "conflict_cross_daemon" + assert result.existing is not None + assert result.existing.ipc_session_token == token_a + assert result.existing.daemon_id == daemon_a + assert result.row is None + + +@pytest.mark.asyncio +async def test_service_delete_attachment_deleted_branch(db_session, registered_user): + from app.services.ipc_attachment_service import create_attachment, delete_attachment + agent = await _make_agent_row(db_session, registered_user, "dl-d") + daemon_id = uuid.uuid4() + token = _make_ulid() + await create_attachment( + db_session, agent_id=agent.id, label="main", task_name="t", + ipc_session_token=token, daemon_id=daemon_id, + ) + result = await delete_attachment( + db_session, agent_id=agent.id, + ipc_session_token=token, daemon_id=daemon_id, + ) + assert result.status == "deleted" + + +@pytest.mark.asyncio +async def test_service_delete_attachment_already_deleted_branch(db_session, registered_user): + from app.services.ipc_attachment_service import delete_attachment + agent = await _make_agent_row(db_session, registered_user, "dl-i") + result = await delete_attachment( + db_session, agent_id=agent.id, + ipc_session_token=_make_ulid(), daemon_id=uuid.uuid4(), + ) + assert result.status == "already_deleted" + + +@pytest.mark.asyncio +async def test_service_reconcile_empty_downgrades_all(db_session, registered_user): + from app.services.ipc_attachment_service import create_attachment, reconcile_attachments + agent = await _make_agent_row(db_session, registered_user, "rc-e") + daemon_id = uuid.uuid4() + await create_attachment( + db_session, agent_id=agent.id, label="main", task_name="t", + ipc_session_token=_make_ulid(), daemon_id=daemon_id, + ) + await create_attachment( + db_session, agent_id=agent.id, label="pr", task_name="t", + ipc_session_token=_make_ulid(), daemon_id=daemon_id, + ) + result = await reconcile_attachments(db_session, daemon_id=daemon_id, attachments=[]) + assert result.upserted_count == 0 + assert result.downgraded_count == 2 + + +@pytest.mark.asyncio +async def test_service_reconcile_partial_upserts_and_downgrades(db_session, registered_user): + from app.schemas.ipc_attachment import AttachmentReconcileEntry + from app.services.ipc_attachment_service import create_attachment, reconcile_attachments + agent = await _make_agent_row(db_session, registered_user, "rc-p") + daemon_id = uuid.uuid4() + token_a = _make_ulid() + await create_attachment( + db_session, agent_id=agent.id, label="main", task_name="t-main", + ipc_session_token=token_a, daemon_id=daemon_id, + ) + await create_attachment( + db_session, agent_id=agent.id, label="pr", task_name="t-pr", + ipc_session_token=_make_ulid(), daemon_id=daemon_id, + ) + now = datetime.now(timezone.utc) + result = await reconcile_attachments( + db_session, daemon_id=daemon_id, + attachments=[ + AttachmentReconcileEntry( + label="main", task_name="t-main", + ipc_session_token=token_a, attached_at=now, + ) + ], + ) + assert result.upserted_count == 1 + assert result.downgraded_count == 1 + + +@pytest.mark.asyncio +async def test_service_reconcile_daemon_scoping(db_session, registered_user): + from app.services.ipc_attachment_service import create_attachment, reconcile_attachments + agent_a = await _make_agent_row(db_session, registered_user, "rc-da") + agent_b = await _make_agent_row(db_session, registered_user, "rc-db") + daemon_x = uuid.uuid4() + daemon_y = uuid.uuid4() + await create_attachment( + db_session, agent_id=agent_a.id, label="main", task_name="t", + ipc_session_token=_make_ulid(), daemon_id=daemon_x, + ) + await create_attachment( + db_session, agent_id=agent_b.id, label="main", task_name="t", + ipc_session_token=_make_ulid(), daemon_id=daemon_y, + ) + result = await reconcile_attachments(db_session, daemon_id=daemon_y, attachments=[]) + assert result.downgraded_count == 1 + + +@pytest.mark.asyncio +async def test_service_reconcile_unknown_token_skipped(db_session, registered_user): + from app.schemas.ipc_attachment import AttachmentReconcileEntry + from app.services.ipc_attachment_service import reconcile_attachments + now = datetime.now(timezone.utc) + result = await reconcile_attachments( + db_session, daemon_id=uuid.uuid4(), + attachments=[ + AttachmentReconcileEntry( + label="main", task_name="t", + ipc_session_token=_make_ulid(), attached_at=now, + ) + ], + ) + assert result.upserted_count == 0 + assert result.downgraded_count == 0 + + +# ─────────────────────────────────────────────────────────────────────── +# Schema validator direct unit test (covers app/schemas/ipc_attachment.py:53) +# ─────────────────────────────────────────────────────────────────────── + + +def test_schema_token_validator_rejects_malformed(): + from app.schemas.ipc_attachment import AttachmentCreate + import pytest as _pytest + with _pytest.raises(Exception): # Pydantic ValidationError + AttachmentCreate( + label="main", + task_name="t", + ipc_session_token="this-is-not-a-ulid-shape!", + ) + + +def test_schema_token_validator_accepts_valid(): + from app.schemas.ipc_attachment import AttachmentCreate + valid = "01ABCDEFGHJKMNPQRSTV" + "XYZ123" + m = AttachmentCreate(label="main", task_name="t", ipc_session_token=valid) + assert m.ipc_session_token == valid + + +def test_schema_token_validator_accepts_versioned_prefix(): + from app.schemas.ipc_attachment import AttachmentCreate + valid = "v3a_01ABCDEFGHJKMNPQRSTVXYZ123"[:32] + m = AttachmentCreate(label="main", task_name="t", ipc_session_token=valid) + assert m.ipc_session_token == valid