From 403e470e69da0b4f87ca9a98d1631071cf1cb270 Mon Sep 17 00:00:00 2001 From: mikemolinet Date: Mon, 11 May 2026 09:27:37 -0700 Subject: [PATCH] feat(agents): roster endpoint + last_seen_at + ETag (parity port of cueapi/cueapi#630) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-port of closed [PR #47](https://github.com/cueapi/cueapi-core/pull/47) which was on a stale base ~8880 deletions behind main. Fresh against current main HEAD. Phase A of the Agent Directory productization. Eliminates the failure mode where agents had to remember 6+ fields per recipient AND had no way to discover the live roster. ## What lands - **GET /v1/agents/roster** — display-optimized snapshot for prompt- injection at session-boot. Distinct from the existing management surface (GET /v1/agents): - Always-full list (no pagination) - Drops opaque IDs / secrets / timestamps / tenancy metadata - Adds derived ``online``, ``last_seen_relative``, ``preferred_contact`` - Always excludes soft-deleted agents - Weak ETag + ``If-None-Match`` → 304 Not Modified for poll efficiency - ETag bucketed to 5-min windows so quiet periods produce stable hashes - ``Cache-Control: private, max-age=300`` matches derivation buckets - **Migration 031** (renumbered from private's 048) — adds ``agents.last_seen_at TIMESTAMPTZ NULL``. Nullable, no backfill. - **Hot-path hooks** write ``last_seen_at = now()``: - ``create_message`` — sender's agent (in same tx via touch_last_seen) - ``list_inbox`` — recipient's agent, on EVERY poll (via _bump_last_seen_stmt). Even when no queued messages exist, the poll proves activity. - **Online derivation** (server-computed in ``list_roster``): - within 5 min → ``online`` - within 30 min → ``away`` - older or NULL → ``offline`` - Caller override wins: PATCHed status=away/offline keeps that override regardless of recent activity ## Pure helpers (for unit-testability — pytest-cov + ASGI issue) - ``_build_roster_entry(agent, now)`` in agent_service.py: ORM Agent → (entry_dict, etag_part_string) - ``_compute_roster_etag(parts)`` in agent_service.py: list → weak ETag - ``_derive_online_state(now, last_seen_at, asserted_status)`` → (online_bool, derived_status) - ``_format_relative(now, last_seen_at)`` → "active now" / "5m ago" / ... - ``_bucketed_seen(last_seen_at)`` → string for ETag stability - ``_bump_last_seen_stmt(agent_id, now)`` in inbox_service.py: SQLAlchemy UPDATE statement - ``_etag_matches(if_none_match, server_etag)`` in agents router: conditional GET predicate ## Tests 27 new tests in tests/test_agent_roster.py (verbatim from private): shape verification, hot-path hooks (sender + recipient), derivation correctness across all 3 buckets, caller-asserted status override, soft-delete exclusion, preferred_contact derivation, last_seen_relative formatting, ETag 304 handling, ETag changes when roster mutates, pure-helper unit tests. 27/27 pass locally. Full local suite: 890 passed + 18 xfailed (pre-existing) + 4 skipped. Zero regressions. ## Re-port note Re-port of closed PR #47. Fresh against current main after PR #74 + #75 + #76 + #77 + #78 + #79 merged earlier in this session. --- alembic/versions/031_agents_last_seen_at.py | 56 +++ app/models/agent.py | 6 + app/routers/agents.py | 56 +++ app/schemas/agent.py | 35 ++ app/services/agent_service.py | 158 +++++++ app/services/inbox_service.py | 26 ++ app/services/message_service.py | 6 + parity-manifest.json | 3 +- tests/test_agent_roster.py | 442 ++++++++++++++++++++ 9 files changed, 787 insertions(+), 1 deletion(-) create mode 100644 alembic/versions/031_agents_last_seen_at.py create mode 100644 tests/test_agent_roster.py diff --git a/alembic/versions/031_agents_last_seen_at.py b/alembic/versions/031_agents_last_seen_at.py new file mode 100644 index 0000000..06fe3af --- /dev/null +++ b/alembic/versions/031_agents_last_seen_at.py @@ -0,0 +1,56 @@ +"""Agent Directory productization (Phase A) — implicit online-state derivation. + +Adds ``agents.last_seen_at`` so the server can derive an agent's +``online`` status from recent activity instead of requiring callers +to ``PATCH /v1/agents/{ref}`` with explicit status updates. + +Hot paths that write ``last_seen_at = now()`` (added in this PR's +service-layer changes): + +* ``create_message`` — sender's agent +* ``list_inbox`` — recipient's agent (poll-based delivery) +* push ``deliver_message`` worker callback — recipient's agent + +Derivation rules (computed in the service layer, not stored): + +* ``last_seen_at`` within 5 min → ``online`` +* ``last_seen_at`` within 30 min → ``away`` +* anything older / NULL → ``offline`` + +The existing ``status`` column stays as a caller-overrideable enum; +the new derivation is the default surface. Callers can still assert +``status=away`` (e.g., agent voluntarily marks itself away during a +long-running task) and the override wins over the derivation. + +Migration 047 was the last messaging-related migration (per-message +send_at). 048 is independent of messaging — it shapes the Identity +primitive directly. + +Revision ID: 031 +Revises: 030 +""" +from alembic import op +import sqlalchemy as sa + + +revision = "031" +down_revision = "030" + + +def upgrade(): + # Nullable add — no backfill required. NULL means "no activity + # observed yet" which the derivation maps to ``offline``. Existing + # rows keep their caller-asserted status until the first hot-path + # write lands. + op.add_column( + "agents", + sa.Column("last_seen_at", sa.DateTime(timezone=True), nullable=True), + ) + # No index needed — read path is per-tenant and per-agent, both + # already covered by the existing ``unique_user_agent_slug`` and + # ``ix_agents_slug`` indexes. The roster endpoint reads + # ``user_id``-scoped rows, which is already a btree-indexed FK. + + +def downgrade(): + op.drop_column("agents", "last_seen_at") diff --git a/app/models/agent.py b/app/models/agent.py index 0112ed3..19cb8e3 100644 --- a/app/models/agent.py +++ b/app/models/agent.py @@ -75,6 +75,12 @@ class Agent(Base): server_default="{}", ) status = Column(String(16), nullable=False, default="online", server_default="online") + # Agent Directory Phase A (PR #630): timestamp of last observed activity. + # Written by hot paths (create_message sender, list_inbox recipient). + # NULL = no activity observed; derivation maps to "offline". + # 5min → online, 30min → away, older/NULL → offline. + # Caller-asserted ``status`` overrides the derivation. + last_seen_at = Column(DateTime(timezone=True), nullable=True) deleted_at = Column(DateTime(timezone=True), nullable=True) created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now()) updated_at = Column( diff --git a/app/routers/agents.py b/app/routers/agents.py index 67725fc..520f10e 100644 --- a/app/routers/agents.py +++ b/app/routers/agents.py @@ -32,6 +32,8 @@ AgentCreate, AgentListResponse, AgentResponse, + AgentRosterEntry, + AgentRosterResponse, AgentUpdate, WebhookSecretResponse, ) @@ -40,6 +42,7 @@ get_agent_owned, get_webhook_secret, list_agents, + list_roster, rotate_webhook_secret, soft_delete_agent, to_response_dict, @@ -119,6 +122,59 @@ async def list_agents_endpoint( ) +def _etag_matches(if_none_match_header, server_etag): + """Pure helper: does the client's ``If-None-Match`` match the + server-computed weak ETag? + + Trims whitespace; treats empty/None header as "no match". Refactored + out of ``get_roster_endpoint`` so the ETag matching logic is unit- + testable without going through the FastAPI Request object. + """ + if not if_none_match_header: + return False + return if_none_match_header.strip() == server_etag + + +@router.get("/roster", response_model=AgentRosterResponse) +async def get_roster_endpoint( + request: Request, + user: AuthenticatedUser = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + """Directory snapshot for session-boot prompt injection (Agent Directory Phase A). + + Returns the caller's full agent roster in a display-optimized + shape — no pagination, no opaque IDs, no secrets, no timestamps, + plus derived ``online`` + ``last_seen_relative`` + ``preferred_contact`` + fields. Soft-deleted agents are always excluded. + + Distinct from ``GET /v1/agents`` (which is the management surface + with full ``AgentResponse`` shape). See PRD §Surface 5 for context. + + Conditional GET: an ``If-None-Match`` header that matches the + server-computed weak ETag returns ``304 Not Modified`` with no body. + The ETag aligns to the 5-minute online/away/offline derivation + bucket so quiet windows produce stable hashes. + + Parity port of cueapi/cueapi#630. + """ + result = await list_roster(db, user) + etag = result["etag"] + + if _etag_matches(request.headers.get("if-none-match"), etag): + return JSONResponse(status_code=304, content=None, headers={"ETag": etag}) + + body = AgentRosterResponse( + generated_at=result["generated_at"], + agents=[AgentRosterEntry(**e) for e in result["agents"]], + ) + return JSONResponse( + status_code=200, + content=body.model_dump(mode="json"), + headers={"ETag": etag, "Cache-Control": "private, max-age=300"}, + ) + + @router.get("/{ref}", response_model=AgentResponse) async def get_agent_endpoint( ref: str, diff --git a/app/schemas/agent.py b/app/schemas/agent.py index 45252b6..c544666 100644 --- a/app/schemas/agent.py +++ b/app/schemas/agent.py @@ -84,3 +84,38 @@ class WebhookSecretResponse(BaseModel): """Response for the webhook-secret retrieval and rotation endpoints.""" webhook_secret: str + + +class AgentRosterEntry(BaseModel): + """One agent in the directory snapshot returned by GET /v1/agents/roster. + + Distinct from ``AgentResponse``: drops opaque IDs, secrets, + timestamps, and tenancy metadata, and adds derived ``online`` / + ``last_seen_relative`` fields. Optimized for prompt injection at + session-boot — agents see "who else is here" natively without + needing to call a tool. See PRD §Surface 5. + """ + + name: str = Field(..., description="Stable per-tenant slug; addressable as `@`.") + display_name: str + description: Optional[str] = Field(default=None, description="From metadata.description if set.") + online: bool = Field(..., description="Derived from last_seen_at within 5 min.") + last_seen_relative: str = Field( + ..., + description="Human-readable freshness: 'active now', '5m ago', 'offline 2h', 'never'.", + ) + preferred_contact: Literal["sync", "async"] = Field( + ..., + description="Derived: webhook_url IS NOT NULL → 'sync' (push-capable), else 'async' (poll-only).", + ) + status: Literal["online", "offline", "away"] = Field( + ..., + description="Caller-asserted status (PATCH /v1/agents/{ref}); overrides derivation when explicit.", + ) + + +class AgentRosterResponse(BaseModel): + """Response for GET /v1/agents/roster — full directory snapshot.""" + + generated_at: datetime + agents: List[AgentRosterEntry] diff --git a/app/services/agent_service.py b/app/services/agent_service.py index a79e08a..d71c05c 100644 --- a/app/services/agent_service.py +++ b/app/services/agent_service.py @@ -392,3 +392,161 @@ def to_response_dict(agent: Agent, *, include_secret: bool = False) -> Dict: "created_at": agent.created_at, "updated_at": agent.updated_at, } + + +# ─────────────────────────────────────────────────────────────────── +# Agent Directory Phase A (PR #630) — roster + online derivation +# ─────────────────────────────────────────────────────────────────── + +_ONLINE_THRESHOLD_SECONDS = 300 # 5 min → online +_AWAY_THRESHOLD_SECONDS = 1800 # 30 min → away (else offline) +_ETAG_BUCKET_SECONDS = 300 # Align ETag freshness to derivation thresholds. + + +def _format_relative(now, last_seen_at) -> str: + if last_seen_at is None: + return "never" + delta = (now - last_seen_at).total_seconds() + if delta < 60: + return "active now" + if delta < 3600: + return f"{int(delta / 60)}m ago" + if delta < 86400: + return f"{int(delta / 3600)}h ago" + return f"{int(delta / 86400)}d ago" + + +def _derive_online_state(now, last_seen_at, asserted_status: str): + """Returns (online_bool, derived_status). + + Caller-asserted ``status`` (PATCH /v1/agents/{ref}) overrides the + derivation when explicitly set to 'away' or 'offline' — lets agents + voluntarily mark themselves unavailable during long-running tasks. + """ + if asserted_status in ("away", "offline"): + return (False, asserted_status) + if last_seen_at is None: + return (False, "offline") + delta = (now - last_seen_at).total_seconds() + if delta <= _ONLINE_THRESHOLD_SECONDS: + return (True, "online") + if delta <= _AWAY_THRESHOLD_SECONDS: + return (False, "away") + return (False, "offline") + + +def _bucketed_seen(last_seen_at): + """Floor last_seen_at to a 5-min bucket for ETag stability. + + The roster's derivation crosses category boundaries at 5 / 30 min, + so an ETag that flips on every-second timestamp drift would defeat + its own purpose. Bucketing means a quiet 5-minute window produces a + stable ETag — clients polling at the suggested cadence will get 304. + """ + if last_seen_at is None: + return "" + epoch = int(last_seen_at.timestamp()) + return str(epoch - (epoch % _ETAG_BUCKET_SECONDS)) + + +def _build_roster_entry(agent, now): + """Pure helper: ORM Agent row → (entry_dict, etag_part_string). + + Refactored out of ``list_roster`` so the per-agent shaping (online + derivation, description extraction, preferred-contact derivation, + ETag-part construction) is unit-testable without going through the + DB + ASGI stack. + + Takes any object with ``.slug``, ``.display_name``, ``.last_seen_at``, + ``.status``, ``.webhook_url``, and ``.metadata_`` — duck-typed so the + test can pass a SimpleNamespace. + """ + online, derived_status = _derive_online_state( + now, agent.last_seen_at, agent.status + ) + description = None + meta = agent.metadata_ or {} + if isinstance(meta, dict) and isinstance(meta.get("description"), str): + description = meta["description"] + + preferred_contact = "sync" if agent.webhook_url else "async" + entry = { + "name": agent.slug, + "display_name": agent.display_name, + "description": description, + "online": online, + "last_seen_relative": _format_relative(now, agent.last_seen_at), + "preferred_contact": preferred_contact, + "status": derived_status, + } + etag_part = "|".join([ + agent.slug, + agent.display_name, + description or "", + "1" if online else "0", + preferred_contact, + derived_status, + _bucketed_seen(agent.last_seen_at), + ]) + return entry, etag_part + + +def _compute_roster_etag(etag_parts): + """Pure helper: list of per-agent etag-part strings → weak ETag.""" + import hashlib + digest = hashlib.sha256("\n".join(etag_parts).encode("utf-8")).hexdigest() + return f'W/"{digest[:16]}"' + + +async def list_roster(db: AsyncSession, user: AuthenticatedUser) -> Dict: + """Build the roster snapshot for GET /v1/agents/roster. + + Returns ``{"generated_at": now, "agents": [...], "etag": "..."}``. + Always-full list (no pagination), always excludes soft-deleted. + Display-optimized for prompt injection — see PRD §Surface 5. + + The returned ETag is a content hash over the *material* fields + (excludes ``generated_at`` and the relative-time string, both of + which drift every second). Roster-router serves 304 when the + client's ``If-None-Match`` matches. + """ + from datetime import datetime, timezone + + now = datetime.now(timezone.utc) + + rows_q = ( + select(Agent) + .where(Agent.user_id == user.id, Agent.deleted_at.is_(None)) + .order_by(Agent.slug) + ) + rows = (await db.execute(rows_q)).scalars().all() + + entries = [] + etag_parts = [] + for agent in rows: + entry, etag_part = _build_roster_entry(agent, now) + entries.append(entry) + etag_parts.append(etag_part) + + return { + "generated_at": now, + "agents": entries, + "etag": _compute_roster_etag(etag_parts), + } + + +async def touch_last_seen(db: AsyncSession, agent_id: str) -> None: + """Update agent.last_seen_at = now() for hot-path activity. + + Called from create_message (sender side) + list_inbox (recipient + side). Best-effort; failure logged but doesn't block the main flow. + """ + from datetime import datetime, timezone + from sqlalchemy import update + now = datetime.now(timezone.utc) + try: + await db.execute( + update(Agent).where(Agent.id == agent_id).values(last_seen_at=now) + ) + except Exception: + pass # Best-effort — don't break the hot path. diff --git a/app/services/inbox_service.py b/app/services/inbox_service.py index 71fd21b..2a9fbd0 100644 --- a/app/services/inbox_service.py +++ b/app/services/inbox_service.py @@ -54,6 +54,21 @@ def _http_error(status: int, code: str, message: str) -> HTTPException: ) +def _bump_last_seen_stmt(agent_id, now): + """Pure helper: build the SQL UPDATE statement that bumps an agent's + ``last_seen_at`` to ``now``. + + Refactored out of ``list_inbox`` so the §Surface-5 hot-path hook is + unit-testable without going through DB + ASGI. Returns the SQLAlchemy + statement; caller does ``await db.execute(stmt)``. Same pattern works + for both the queued→delivered transition path and the no-transition + path inside ``list_inbox``. + + Agent Directory Phase A (PR #630). + """ + return update(Agent).where(Agent.id == agent_id).values(last_seen_at=now) + + def _parse_state_filter( states: Optional[str], *, default: Tuple[str, ...] = DEFAULT_STATES ) -> Tuple[str, ...]: @@ -216,6 +231,17 @@ async def list_inbox( .returning(Message.id) ) await db.execute(upd_q) + # Agent Directory Phase A (PR #630): touch recipient's + # last_seen_at in the same transaction as the queued→delivered + # flip. Even when no queued messages exist, the poll proves + # activity — see the else branch below. + await db.execute(_bump_last_seen_stmt(agent.id, upd_now)) + await db.commit() + else: + # No queued→delivered transition this call (filter excluded + # ``queued``), but we still observed activity from the recipient + # — bump their last_seen_at in its own transaction. + await db.execute(_bump_last_seen_stmt(agent.id, now_gate)) await db.commit() # Total (after the transition). diff --git a/app/services/message_service.py b/app/services/message_service.py index cc2de20..750f3b2 100644 --- a/app/services/message_service.py +++ b/app/services/message_service.py @@ -406,6 +406,12 @@ async def create_message( ) ) + # Agent Directory Phase A (PR #630): touch sender's last_seen_at + # within the same transaction as the message insert. Best-effort + # update — if it fails, the message still commits. + from app.services.agent_service import touch_last_seen + await touch_last_seen(db, from_agent.id) + await db.commit() await db.refresh(msg) diff --git a/parity-manifest.json b/parity-manifest.json index 3846bd1..03dbe6d 100644 --- a/parity-manifest.json +++ b/parity-manifest.json @@ -76,7 +76,8 @@ {"path": "alembic/versions/016_add_on_failure_column.py", "private_counterpart": "alembic/versions/016_add_on_failure_column.py", "last_synced": "2026-04-16"}, {"path": "alembic/versions/028_event_emit_primitive.py", "private_counterpart": "alembic/versions/058_event_emit_primitive.py", "last_synced": "2026-05-11", "ported_in": "event-emit-primitive-port (PR-1b)", "deviation": "Renumbered 058 → 028 (cueapi-core HEAD was 27 revisions behind private). Schema verbatim."}, {"path": "alembic/versions/029_messaging_emission_columns.py", "private_counterpart": "alembic/versions/059_messaging_emission_columns.py", "last_synced": "2026-05-11", "ported_in": "messaging-emission-port (PR-2a)", "deviation": "Renumbered 059 → 029. Schema verbatim: dispatch_priority_bucket + message_dispatch_error + correlation_id columns + partial-where index on correlation_id."}, - {"path": "alembic/versions/030_message_send_at.py", "private_counterpart": "alembic/versions/047_message_send_at.py", "last_synced": "2026-05-11", "ported_in": "message-send-at-port (private #623)", "deviation": "Renumbered 047 → 030. Schema verbatim: messages.send_at TIMESTAMPTZ NULL + partial index ix_messages_send_at (WHERE send_at IS NOT NULL) built CONCURRENTLY."} + {"path": "alembic/versions/030_message_send_at.py", "private_counterpart": "alembic/versions/047_message_send_at.py", "last_synced": "2026-05-11", "ported_in": "message-send-at-port (private #623)", "deviation": "Renumbered 047 → 030. Schema verbatim: messages.send_at TIMESTAMPTZ NULL + partial index ix_messages_send_at (WHERE send_at IS NOT NULL) built CONCURRENTLY."}, + {"path": "alembic/versions/031_agents_last_seen_at.py", "private_counterpart": "alembic/versions/048_agents_last_seen_at.py", "last_synced": "2026-05-11", "ported_in": "agents-roster-and-last-seen-at-port (private #630)", "deviation": "Renumbered 048 → 031. Schema verbatim: agents.last_seen_at TIMESTAMPTZ NULL (no index — read path is per-tenant + per-agent, already covered by existing indexes)."} ], "app_core": [ {"path": "app/__init__.py", "private_counterpart": "app/__init__.py", "last_synced": "2026-04-16"}, diff --git a/tests/test_agent_roster.py b/tests/test_agent_roster.py new file mode 100644 index 0000000..dfedc07 --- /dev/null +++ b/tests/test_agent_roster.py @@ -0,0 +1,442 @@ +"""Agent Directory productization (Phase A) — roster endpoint + last_seen_at hooks. + +Spec: PRD https://trydock.ai/mike/agent-directory-productization-prd §Server-side scope. + +These tests pin: + +1. ``GET /v1/agents/roster`` returns display-optimized snapshot (no + IDs, secrets, timestamps); always-full list, no pagination. +2. ``last_seen_at`` is updated by ``POST /v1/messages`` for the sender's agent. +3. ``last_seen_at`` is updated by ``GET /v1/agents/{ref}/inbox`` for the recipient's agent. +4. Roster's ``online`` field derives correctly from last_seen_at age. +5. Caller-asserted ``status=away|offline`` overrides the activity-derived state. +6. Soft-deleted agents are excluded from the roster. +7. Roster ``preferred_contact`` derives from webhook_url presence. +8. ``last_seen_relative`` formats ages correctly. +""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace + +import pytest +from sqlalchemy import select, update + +from app.models.agent import Agent +from app.services.agent_service import ( + _build_roster_entry, + _bucketed_seen, + _compute_roster_etag, + _derive_online_state, + _format_relative, +) +from app.services.inbox_service import _bump_last_seen_stmt +from app.routers.agents import _etag_matches + + +def test_etag_matches_exact(): + assert _etag_matches('W/"abc123"', 'W/"abc123"') is True + + +def test_etag_matches_with_whitespace(): + """Some clients send leading/trailing whitespace in the header.""" + assert _etag_matches(' W/"abc123" ', 'W/"abc123"') is True + + +def test_etag_matches_mismatch(): + assert _etag_matches('W/"old"', 'W/"new"') is False + + +def test_etag_matches_empty_header(): + assert _etag_matches(None, 'W/"abc"') is False + assert _etag_matches("", 'W/"abc"') is False + + +# ── Pure helper unit tests (Phase A branch coverage) ───────────────── + + +def _fake_agent(**kw): + """Duck-typed Agent for testing _build_roster_entry without a DB row.""" + defaults = { + "slug": "test-agent", + "display_name": "Test Agent", + "last_seen_at": None, + "status": "online", + "webhook_url": None, + "metadata_": {}, + } + defaults.update(kw) + return SimpleNamespace(**defaults) + + +def test_build_roster_entry_offline_no_activity(): + now = datetime.now(timezone.utc) + a = _fake_agent(last_seen_at=None) + entry, etag_part = _build_roster_entry(a, now) + assert entry["online"] is False + assert entry["status"] == "offline" + assert entry["last_seen_relative"] == "never" + assert entry["preferred_contact"] == "async" + assert entry["description"] is None + assert "test-agent|Test Agent||0|async|offline|" == etag_part + + +def test_build_roster_entry_online_recent_activity(): + now = datetime.now(timezone.utc) + a = _fake_agent(last_seen_at=now - timedelta(seconds=30)) + entry, _ = _build_roster_entry(a, now) + assert entry["online"] is True + assert entry["status"] == "online" + assert entry["last_seen_relative"] == "active now" + + +def test_build_roster_entry_with_webhook_is_sync(): + now = datetime.now(timezone.utc) + a = _fake_agent(webhook_url="https://example.com/wh") + entry, _ = _build_roster_entry(a, now) + assert entry["preferred_contact"] == "sync" + + +def test_build_roster_entry_with_metadata_description(): + now = datetime.now(timezone.utc) + a = _fake_agent(metadata_={"description": "an agent that does things"}) + entry, _ = _build_roster_entry(a, now) + assert entry["description"] == "an agent that does things" + + +def test_build_roster_entry_caller_override_wins(): + """Caller-asserted away/offline beats activity-derived online.""" + now = datetime.now(timezone.utc) + a = _fake_agent(last_seen_at=now, status="away") # active but caller said away + entry, _ = _build_roster_entry(a, now) + assert entry["status"] == "away" + assert entry["online"] is False + + +def test_compute_roster_etag_stable_for_same_input(): + parts = ["a|A||1|async|online|abc", "b|B||0|sync|offline|"] + assert _compute_roster_etag(parts) == _compute_roster_etag(parts) + # Format: weak ETag with W/" prefix and " suffix. + e = _compute_roster_etag(parts) + assert e.startswith('W/"') and e.endswith('"') + + +def test_compute_roster_etag_changes_when_input_changes(): + a = _compute_roster_etag(["a|A||1|async|online|abc"]) + b = _compute_roster_etag(["a|A||0|async|offline|abc"]) + assert a != b + + +def test_compute_roster_etag_empty_list(): + """No agents → still a valid etag (sha256 of empty string).""" + e = _compute_roster_etag([]) + assert e.startswith('W/"') + assert len(e) == len('W/"') + 16 + 1 # 16 hex chars + closing quote + + +def test_format_relative_buckets(): + now = datetime.now(timezone.utc) + assert _format_relative(now, None) == "never" + assert _format_relative(now, now - timedelta(seconds=30)) == "active now" + assert _format_relative(now, now - timedelta(minutes=15)) == "15m ago" + assert _format_relative(now, now - timedelta(hours=3)) == "3h ago" + assert _format_relative(now, now - timedelta(days=2)) == "2d ago" + + +def test_derive_online_state_thresholds(): + now = datetime.now(timezone.utc) + # Within 5 min → online. + assert _derive_online_state(now, now - timedelta(seconds=60), "online")[0] is True + # 5-30 min → away. + assert _derive_online_state(now, now - timedelta(minutes=15), "online")[1] == "away" + # >30 min → offline. + assert _derive_online_state(now, now - timedelta(hours=2), "online")[1] == "offline" + # NULL → offline. + assert _derive_online_state(now, None, "online")[1] == "offline" + # Caller override wins. + assert _derive_online_state(now, now, "offline")[1] == "offline" + + +def test_bucketed_seen_floors_to_5min(): + ts = datetime(2026, 5, 5, 17, 7, 32, tzinfo=timezone.utc) # 17:07:32 + bucketed = _bucketed_seen(ts) + # 17:07:32 → 17:05:00 epoch. + expected_epoch = int(datetime(2026, 5, 5, 17, 5, 0, tzinfo=timezone.utc).timestamp()) + assert bucketed == str(expected_epoch) + + +def test_bucketed_seen_none(): + assert _bucketed_seen(None) == "" + + +def test_bump_last_seen_stmt_constructs_update(): + """_bump_last_seen_stmt returns a SQLAlchemy UPDATE; sanity-check shape.""" + from datetime import datetime as dt, timezone as tz + stmt = _bump_last_seen_stmt("agt_test123", dt.now(tz.utc)) + # The compiled SQL targets the agents table and sets last_seen_at. + compiled = str(stmt.compile(compile_kwargs={"literal_binds": False})) + assert "agents" in compiled.lower() + assert "last_seen_at" in compiled.lower() + + +# ── Integration tests (HTTP + DB end-to-end) ───────────────────────── + + + + + +async def _create_agent(client, auth_headers, slug, **extra): + body = {"slug": slug, "display_name": slug.title()} + body.update(extra) + resp = await client.post("/v1/agents", json=body, headers=auth_headers) + assert resp.status_code in (200, 201), resp.text + return resp.json() + + +@pytest.mark.asyncio +async def test_roster_endpoint_returns_display_shape(client, auth_headers): + await _create_agent(client, auth_headers, "roster-a", metadata={"description": "First agent"}) + await _create_agent(client, auth_headers, "roster-b") + + resp = await client.get("/v1/agents/roster", headers=auth_headers) + assert resp.status_code == 200 + body = resp.json() + assert "generated_at" in body + assert "agents" in body + names = {a["name"] for a in body["agents"]} + assert {"roster-a", "roster-b"}.issubset(names) + + # Each entry has display-optimized fields (NO id, NO webhook_secret). + for entry in body["agents"]: + assert "name" in entry + assert "display_name" in entry + assert "online" in entry + assert "last_seen_relative" in entry + assert "preferred_contact" in entry + assert "status" in entry + # Display shape must NOT leak management surface fields. + assert "id" not in entry + assert "webhook_secret" not in entry + assert "user_id" not in entry + assert "created_at" not in entry + assert "deleted_at" not in entry + + # The agent with metadata.description surfaces it. + a_entry = next(a for a in body["agents"] if a["name"] == "roster-a") + assert a_entry["description"] == "First agent" + b_entry = next(a for a in body["agents"] if a["name"] == "roster-b") + assert b_entry["description"] is None + + +@pytest.mark.asyncio +async def test_last_seen_at_bumps_on_message_send(client, auth_headers, db_session): + sender = await _create_agent(client, auth_headers, "lsa-sender") + rcpt = await _create_agent(client, auth_headers, "lsa-rcpt") + + # Pre-condition: sender's last_seen_at is NULL (never wrote a message). + pre = (await db_session.execute(select(Agent).where(Agent.id == sender["id"]))).scalar_one() + assert pre.last_seen_at is None + + resp = await client.post( + "/v1/messages", + json={"to": rcpt["id"], "body": "hi"}, + headers={**auth_headers, "X-Cueapi-From-Agent": sender["id"]}, + ) + assert resp.status_code == 201 + + db_session.expire_all() + post = (await db_session.execute(select(Agent).where(Agent.id == sender["id"]))).scalar_one() + assert post.last_seen_at is not None + # Within last 5 seconds of "now". + age = (datetime.now(timezone.utc) - post.last_seen_at).total_seconds() + assert 0 <= age < 5 + + +@pytest.mark.asyncio +async def test_last_seen_at_bumps_on_inbox_poll(client, auth_headers, db_session): + # Two agents; sender writes a message so recipient has something to poll. + sender = await _create_agent(client, auth_headers, "lsa-poll-s") + rcpt = await _create_agent(client, auth_headers, "lsa-poll-r") + await client.post( + "/v1/messages", + json={"to": rcpt["id"], "body": "x"}, + headers={**auth_headers, "X-Cueapi-From-Agent": sender["id"]}, + ) + + pre = (await db_session.execute(select(Agent).where(Agent.id == rcpt["id"]))).scalar_one() + pre_seen = pre.last_seen_at # may be NULL + + resp = await client.get(f"/v1/agents/{rcpt['id']}/inbox", headers=auth_headers) + assert resp.status_code == 200 + + db_session.expire_all() + post = (await db_session.execute(select(Agent).where(Agent.id == rcpt["id"]))).scalar_one() + assert post.last_seen_at is not None + if pre_seen is not None: + assert post.last_seen_at >= pre_seen + age = (datetime.now(timezone.utc) - post.last_seen_at).total_seconds() + assert 0 <= age < 5 + + +@pytest.mark.asyncio +async def test_roster_online_derivation(client, auth_headers, db_session): + fresh = await _create_agent(client, auth_headers, "online-fresh") + stale = await _create_agent(client, auth_headers, "online-stale") + cold = await _create_agent(client, auth_headers, "online-cold") + + # Backdate the agents to specific recencies. + now = datetime.now(timezone.utc) + await db_session.execute( + update(Agent).where(Agent.id == fresh["id"]).values(last_seen_at=now - timedelta(seconds=30)) + ) + await db_session.execute( + update(Agent).where(Agent.id == stale["id"]).values(last_seen_at=now - timedelta(minutes=15)) + ) + await db_session.execute( + update(Agent).where(Agent.id == cold["id"]).values(last_seen_at=now - timedelta(hours=2)) + ) + await db_session.commit() + + resp = await client.get("/v1/agents/roster", headers=auth_headers) + assert resp.status_code == 200 + by_name = {a["name"]: a for a in resp.json()["agents"]} + + assert by_name["online-fresh"]["online"] is True + assert by_name["online-fresh"]["status"] == "online" + # 15 min ago → away (not online, not yet offline at 30 min boundary). + assert by_name["online-stale"]["online"] is False + assert by_name["online-stale"]["status"] == "away" + # 2h ago → offline. + assert by_name["online-cold"]["online"] is False + assert by_name["online-cold"]["status"] == "offline" + + +@pytest.mark.asyncio +async def test_caller_asserted_status_overrides_derivation(client, auth_headers, db_session): + """Agent voluntarily marked itself away — even with fresh activity, + the override sticks so the agent can signal 'I'm here but busy'.""" + a = await _create_agent(client, auth_headers, "override-away") + + # Mark recent activity. Commit BEFORE issuing the PATCH so the + # request handler's session doesn't deadlock on a row lock the + # test's session is holding (both target the same agent row). + now = datetime.now(timezone.utc) + await db_session.execute( + update(Agent).where(Agent.id == a["id"]).values(last_seen_at=now) + ) + await db_session.commit() + + # Override status to "away" via PATCH. + patch_resp = await client.patch( + f"/v1/agents/{a['id']}", json={"status": "away"}, headers=auth_headers + ) + assert patch_resp.status_code == 200 + + resp = await client.get("/v1/agents/roster", headers=auth_headers) + by_name = {x["name"]: x for x in resp.json()["agents"]} + assert by_name["override-away"]["status"] == "away" + assert by_name["override-away"]["online"] is False + + +@pytest.mark.asyncio +async def test_roster_excludes_soft_deleted(client, auth_headers): + keep = await _create_agent(client, auth_headers, "roster-keep") + drop = await _create_agent(client, auth_headers, "roster-drop") + + del_resp = await client.delete(f"/v1/agents/{drop['id']}", headers=auth_headers) + assert del_resp.status_code == 204 + + resp = await client.get("/v1/agents/roster", headers=auth_headers) + names = {a["name"] for a in resp.json()["agents"]} + assert "roster-keep" in names + assert "roster-drop" not in names + + +@pytest.mark.asyncio +async def test_roster_preferred_contact_derivation(client, auth_headers): + poll_only = await _create_agent(client, auth_headers, "pc-poll") + push_capable = await _create_agent( + client, auth_headers, "pc-push", webhook_url="https://example.com/wh" + ) + + resp = await client.get("/v1/agents/roster", headers=auth_headers) + by_name = {a["name"]: a for a in resp.json()["agents"]} + assert by_name["pc-poll"]["preferred_contact"] == "async" + assert by_name["pc-push"]["preferred_contact"] == "sync" + + +@pytest.mark.asyncio +async def test_last_seen_relative_formatting(client, auth_headers, db_session): + a = await _create_agent(client, auth_headers, "rel-test") + now = datetime.now(timezone.utc) + + cases = [ + (timedelta(seconds=30), "active now"), + (timedelta(minutes=5), "5m ago"), + (timedelta(hours=2), "2h ago"), + ] + + for delta, expected in cases: + await db_session.execute( + update(Agent).where(Agent.id == a["id"]).values(last_seen_at=now - delta) + ) + await db_session.commit() + resp = await client.get("/v1/agents/roster", headers=auth_headers) + entry = next(x for x in resp.json()["agents"] if x["name"] == "rel-test") + assert entry["last_seen_relative"] == expected, f"delta={delta}" + + # Never seen. + await db_session.execute( + update(Agent).where(Agent.id == a["id"]).values(last_seen_at=None) + ) + await db_session.commit() + resp = await client.get("/v1/agents/roster", headers=auth_headers) + entry = next(x for x in resp.json()["agents"] if x["name"] == "rel-test") + assert entry["last_seen_relative"] == "never" + + +@pytest.mark.asyncio +async def test_roster_etag_304_when_unchanged(client, auth_headers, db_session): + """ETag round-trip: first GET returns 200 + ETag; second GET with + matching If-None-Match returns 304 with no body.""" + a = await _create_agent(client, auth_headers, "etag-stable") + + # Pin last_seen_at to a deterministic past value so the bucket is stable. + now = datetime.now(timezone.utc) + await db_session.execute( + update(Agent).where(Agent.id == a["id"]).values(last_seen_at=now - timedelta(seconds=30)) + ) + await db_session.commit() + + first = await client.get("/v1/agents/roster", headers=auth_headers) + assert first.status_code == 200 + etag = first.headers.get("etag") + assert etag is not None + assert etag.startswith('W/"') + + second = await client.get( + "/v1/agents/roster", + headers={**auth_headers, "If-None-Match": etag}, + ) + assert second.status_code == 304 + assert second.headers.get("etag") == etag + + +@pytest.mark.asyncio +async def test_roster_etag_changes_when_agent_added(client, auth_headers, db_session): + """A new agent flips the ETag — clients can rely on conditional GET.""" + await _create_agent(client, auth_headers, "etag-base") + + first = await client.get("/v1/agents/roster", headers=auth_headers) + etag_before = first.headers.get("etag") + assert etag_before is not None + + await _create_agent(client, auth_headers, "etag-new-agent") + + second = await client.get( + "/v1/agents/roster", + headers={**auth_headers, "If-None-Match": etag_before}, + ) + assert second.status_code == 200 + etag_after = second.headers.get("etag") + assert etag_after != etag_before