Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions alembic/versions/035_agent_live_sessions_ipc_attachment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Item B Phase 1 — extend agent_live_sessions for live-delivery-v3 IPC attachments.

Adds the substrate-side primitives needed for the daemon-IPC delivery path
locked in the live-delivery-v3 joint design at:

https://trydock.ai/mike/live-delivery-v3-build-hub

Schema additions (4 columns + 2 indexes; all nullable / defaulted so existing
v2.x rows continue to work unchanged):

* ``ipc_session_token VARCHAR(32)`` — daemon-issued ULID identifying the
attachment for fire-accept routing. NULL for non-IPC rows. VARCHAR(32) per
primary's Q-A REDIRECT (room for future versioned-prefix shapes like
``v3a_<26-char-ULID>``); application-layer validates format (no DB regex
CHECK per CueAPI convention).
* ``transport VARCHAR(8) NOT NULL DEFAULT 'poll'`` + CHECK IN ('ipc', 'poll')
— routing-mode for fire-accept dispatcher. Existing rows default to 'poll'
so behavior is unchanged until the daemon issues a v3 attach. VARCHAR+CHECK
matches CueAPI convention (cues.callback_transport, executions.status,
users.plan all use this shape; Postgres ENUM ALTER is painful to evolve).
* ``daemon_id UUID`` — stable per-install Desktop daemon identity, sent in
``X-CueAPI-Daemon-Id`` header on attach + reconcile + DELETE requests.
Server scopes reconcile transactions per-daemon so daemon X's view can't
affect daemon Y's rows. NULL for v2.x rows (no daemon identity tracked).
* ``last_reconciled_at TIMESTAMPTZ`` — bumps every time a row appears in a
daemon reconcile batch. Powers the conservative downgrade-to-poll cleanup
for orphaned rows (rows in DB not mentioned in current reconcile batch get
``transport='poll'`` and ``last_reconciled_at`` left untouched; daily
cleanup deletes ``transport='poll'`` rows where ``last_reconciled_at <
now() - 24h``).

Indexes:

* ``ix_agent_live_sessions_daemon`` on ``daemon_id`` — supports per-daemon
reconcile WHERE clauses.
* ``ix_agent_live_sessions_transport`` on ``(transport, last_reconciled_at)``
— supports the daily cleanup job's filter ``transport='poll' AND
last_reconciled_at < now() - 24h``.

Mike Q-B ratification 2026-05-12 ~00:38Z locked the **ASYNC** fire-accept
dispatcher path: server fires + returns immediately with
``delivery_mode_requested='ipc'``; daemon-side delivery ack happens via the
existing ``POST /v1/executions/<id>/outcome`` path. NO inline ack-callback
machinery in Phase 1. Sync-inline-ack-3s alternative deferred to a future
Backlog row (meeting-room-style live agent discussions).

Backwards-compat: all v2.x rows inherit ``transport='poll'`` + NULL on the
other 3 columns. Existing fire-accept dispatcher logic untouched until a row
carries ``transport='ipc'``.
"""
from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import UUID


revision = "035"
down_revision = "034"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"agent_live_sessions",
sa.Column("ipc_session_token", sa.String(32), nullable=True),
)
op.add_column(
"agent_live_sessions",
sa.Column(
"transport",
sa.String(8),
nullable=False,
server_default=sa.text("'poll'"),
),
)
op.add_column(
"agent_live_sessions",
sa.Column("daemon_id", UUID(as_uuid=True), nullable=True),
)
op.add_column(
"agent_live_sessions",
sa.Column(
"last_reconciled_at",
sa.DateTime(timezone=True),
nullable=True,
),
)

op.create_check_constraint(
"valid_transport",
"agent_live_sessions",
"transport IN ('ipc', 'poll')",
)

op.create_index(
"ix_agent_live_sessions_daemon",
"agent_live_sessions",
["daemon_id"],
)
op.create_index(
"ix_agent_live_sessions_transport",
"agent_live_sessions",
["transport", "last_reconciled_at"],
)


def downgrade() -> None:
op.drop_index("ix_agent_live_sessions_transport", table_name="agent_live_sessions")
op.drop_index("ix_agent_live_sessions_daemon", table_name="agent_live_sessions")
op.drop_constraint("valid_transport", "agent_live_sessions", type_="check")
op.drop_column("agent_live_sessions", "last_reconciled_at")
op.drop_column("agent_live_sessions", "daemon_id")
op.drop_column("agent_live_sessions", "transport")
op.drop_column("agent_live_sessions", "ipc_session_token")
4 changes: 3 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from app.middleware.request_id import RequestIdMiddleware
from app.middleware.verify_echo import VerifyEchoMiddleware
from app.redis import close_redis
from app.routers import agent_live_sessions, agents, alerts, auth_routes, cues, device_code, echo, events, executions, health, info, internal_users, messages, usage, webhook_secret, workers
from app.routers import agent_live_sessions, agents, alerts, auth_routes, cues, device_code, echo, events, executions, health, info, internal_users, ipc_attachments, messages, usage, webhook_secret, workers
from app.utils.logging import setup_logging


Expand Down Expand Up @@ -177,6 +177,8 @@ async def generic_error_handler(request: Request, exc: Exception):
app.include_router(alerts.router)
app.include_router(agents.router)
app.include_router(agent_live_sessions.router)
app.include_router(ipc_attachments.router)
app.include_router(ipc_attachments.reconcile_router)
app.include_router(events.router)
app.include_router(messages.router)

Expand Down
29 changes: 29 additions & 0 deletions app/models/agent_live_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

from sqlalchemy import (
Boolean,
CheckConstraint,
Column,
DateTime,
ForeignKey,
Expand Down Expand Up @@ -96,6 +97,21 @@ class AgentLiveSession(Base):
unique=True,
postgresql_where=text("detached_at IS NULL"),
),
# Item B Phase 1 indexes (migration 035): support per-daemon
# reconcile + daily transport='poll' cleanup queries.
Index("ix_agent_live_sessions_daemon", "daemon_id"),
Index(
"ix_agent_live_sessions_transport",
"transport",
"last_reconciled_at",
),
# Item B Phase 1 CHECK constraint (migration 035): transport
# values restricted to 'ipc' or 'poll'. VARCHAR+CHECK is the
# CueAPI convention.
CheckConstraint(
"transport IN ('ipc', 'poll')",
name="valid_transport",
),
)

id = Column(
Expand Down Expand Up @@ -147,6 +163,19 @@ class AgentLiveSession(Base):
# grace accepts bare task_name on existing rows.
session_token = Column(String(80), nullable=True)

# Item B Phase 1 columns (migration 035, live-delivery-v3).
# NB: existing v2.x rows inherit transport='poll' + NULL on others
# — fire-accept dispatcher unchanged unless transport='ipc' is set.
ipc_session_token = Column(String(32), nullable=True)
transport = Column(
String(8),
nullable=False,
server_default=text("'poll'"),
default="poll",
)
daemon_id = Column(UUID(as_uuid=True), nullable=True)
last_reconciled_at = Column(DateTime(timezone=True), nullable=True)

created_at = Column(
DateTime(timezone=True),
nullable=False,
Expand Down
43 changes: 43 additions & 0 deletions app/routers/cues.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,42 @@ async def delete(
return Response(status_code=204)


async def _build_ipc_delivery_metadata(db, cue_id: str):
"""Item B Phase 1 ASYNC fire-accept dispatcher (Mike Q-B ratify 2026-05-12).

Stamps ``delivery_mode_requested='ipc'`` on a new execution's
``outcome_metadata`` IFF the cue's target ``agent_live_sessions`` row is
currently on ``transport='ipc'`` (i.e. a daemon-IPC attachment is live).

Returns:
- ``{"delivery_mode_requested": "ipc"}`` when an active IPC attachment
exists for ``cue_id``
- ``None`` otherwise (preserves the default execution.outcome_metadata
NULL invariant for non-IPC fires; webhook + worker paths unchanged)

Per design lock: no inline ack-callback machinery. The daemon ACKs delivery
by writing to the existing ``POST /v1/executions/<id>/outcome`` path. The
metadata stamp is the *request* signal; actual delivery is async.

Module-level so pytest-cov traces branches without going through ASGI
dispatch (CLAUDE.md ASGI coverage discipline).
"""
from sqlalchemy import select
from app.models.agent_live_session import AgentLiveSession

result = await db.execute(
select(AgentLiveSession.id).where(
AgentLiveSession.cue_id == cue_id,
AgentLiveSession.transport == "ipc",
AgentLiveSession.detached_at.is_(None),
)
)
row = result.scalar_one_or_none()
if row is None:
return None
return {"delivery_mode_requested": "ipc"}


@router.post("/{cue_id}/fire", status_code=200)
async def fire_cue(
cue_id: str,
Expand Down Expand Up @@ -132,9 +168,16 @@ async def fire_cue(
is_scheduled = effective_scheduled_for > now

execution_id = uuid_mod.uuid4()
# Item B Phase 1: ASYNC fire-accept IPC routing (Mike Q-B ratify
# 2026-05-12 ~00:38Z, ported from cueapi/cueapi#805). If the cue's
# target agent_live_sessions row is on transport='ipc', stamp the
# execution's outcome_metadata with delivery_mode_requested='ipc'.
# Helper is pure + module-level (CLAUDE.md ASGI coverage discipline).
ipc_outcome_metadata = await _build_ipc_delivery_metadata(db, cue.id)
execution = Execution(
id=execution_id, cue_id=cue.id, scheduled_for=effective_scheduled_for,
status="pending", triggered_by="manual_fire",
outcome_metadata=ipc_outcome_metadata,
)
db.add(execution)

Expand Down
Loading
Loading