Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions alembic/versions/031_agents_last_seen_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Agent Directory productization (Phase A) — implicit online-state derivation.

Adds ``agents.last_seen_at`` so the server can derive an agent's
``online`` status from recent activity instead of requiring callers
to ``PATCH /v1/agents/{ref}`` with explicit status updates.

Hot paths that write ``last_seen_at = now()`` (added in this PR's
service-layer changes):

* ``create_message`` — sender's agent
* ``list_inbox`` — recipient's agent (poll-based delivery)
* push ``deliver_message`` worker callback — recipient's agent

Derivation rules (computed in the service layer, not stored):

* ``last_seen_at`` within 5 min → ``online``
* ``last_seen_at`` within 30 min → ``away``
* anything older / NULL → ``offline``

The existing ``status`` column stays as a caller-overrideable enum;
the new derivation is the default surface. Callers can still assert
``status=away`` (e.g., agent voluntarily marks itself away during a
long-running task) and the override wins over the derivation.

Migration 047 was the last messaging-related migration (per-message
send_at). 048 is independent of messaging — it shapes the Identity
primitive directly.

Revision ID: 031
Revises: 030
"""
from alembic import op
import sqlalchemy as sa


revision = "031"
down_revision = "030"


def upgrade():
# Nullable add — no backfill required. NULL means "no activity
# observed yet" which the derivation maps to ``offline``. Existing
# rows keep their caller-asserted status until the first hot-path
# write lands.
op.add_column(
"agents",
sa.Column("last_seen_at", sa.DateTime(timezone=True), nullable=True),
)
# No index needed — read path is per-tenant and per-agent, both
# already covered by the existing ``unique_user_agent_slug`` and
# ``ix_agents_slug`` indexes. The roster endpoint reads
# ``user_id``-scoped rows, which is already a btree-indexed FK.


def downgrade():
op.drop_column("agents", "last_seen_at")
6 changes: 6 additions & 0 deletions app/models/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class Agent(Base):
server_default="{}",
)
status = Column(String(16), nullable=False, default="online", server_default="online")
# Agent Directory Phase A (PR #630): timestamp of last observed activity.
# Written by hot paths (create_message sender, list_inbox recipient).
# NULL = no activity observed; derivation maps to "offline".
# 5min → online, 30min → away, older/NULL → offline.
# Caller-asserted ``status`` overrides the derivation.
last_seen_at = Column(DateTime(timezone=True), nullable=True)
deleted_at = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
updated_at = Column(
Expand Down
56 changes: 56 additions & 0 deletions app/routers/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
AgentCreate,
AgentListResponse,
AgentResponse,
AgentRosterEntry,
AgentRosterResponse,
AgentUpdate,
WebhookSecretResponse,
)
Expand All @@ -40,6 +42,7 @@
get_agent_owned,
get_webhook_secret,
list_agents,
list_roster,
rotate_webhook_secret,
soft_delete_agent,
to_response_dict,
Expand Down Expand Up @@ -119,6 +122,59 @@ async def list_agents_endpoint(
)


def _etag_matches(if_none_match_header, server_etag):
"""Pure helper: does the client's ``If-None-Match`` match the
server-computed weak ETag?

Trims whitespace; treats empty/None header as "no match". Refactored
out of ``get_roster_endpoint`` so the ETag matching logic is unit-
testable without going through the FastAPI Request object.
"""
if not if_none_match_header:
return False
return if_none_match_header.strip() == server_etag


@router.get("/roster", response_model=AgentRosterResponse)
async def get_roster_endpoint(
request: Request,
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Directory snapshot for session-boot prompt injection (Agent Directory Phase A).

Returns the caller's full agent roster in a display-optimized
shape — no pagination, no opaque IDs, no secrets, no timestamps,
plus derived ``online`` + ``last_seen_relative`` + ``preferred_contact``
fields. Soft-deleted agents are always excluded.

Distinct from ``GET /v1/agents`` (which is the management surface
with full ``AgentResponse`` shape). See PRD §Surface 5 for context.

Conditional GET: an ``If-None-Match`` header that matches the
server-computed weak ETag returns ``304 Not Modified`` with no body.
The ETag aligns to the 5-minute online/away/offline derivation
bucket so quiet windows produce stable hashes.

Parity port of cueapi/cueapi#630.
"""
result = await list_roster(db, user)
etag = result["etag"]

if _etag_matches(request.headers.get("if-none-match"), etag):
return JSONResponse(status_code=304, content=None, headers={"ETag": etag})

body = AgentRosterResponse(
generated_at=result["generated_at"],
agents=[AgentRosterEntry(**e) for e in result["agents"]],
)
return JSONResponse(
status_code=200,
content=body.model_dump(mode="json"),
headers={"ETag": etag, "Cache-Control": "private, max-age=300"},
)


@router.get("/{ref}", response_model=AgentResponse)
async def get_agent_endpoint(
ref: str,
Expand Down
35 changes: 35 additions & 0 deletions app/schemas/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,38 @@ class WebhookSecretResponse(BaseModel):
"""Response for the webhook-secret retrieval and rotation endpoints."""

webhook_secret: str


class AgentRosterEntry(BaseModel):
"""One agent in the directory snapshot returned by GET /v1/agents/roster.

Distinct from ``AgentResponse``: drops opaque IDs, secrets,
timestamps, and tenancy metadata, and adds derived ``online`` /
``last_seen_relative`` fields. Optimized for prompt injection at
session-boot — agents see "who else is here" natively without
needing to call a tool. See PRD §Surface 5.
"""

name: str = Field(..., description="Stable per-tenant slug; addressable as `<name>@<user_slug>`.")
display_name: str
description: Optional[str] = Field(default=None, description="From metadata.description if set.")
online: bool = Field(..., description="Derived from last_seen_at within 5 min.")
last_seen_relative: str = Field(
...,
description="Human-readable freshness: 'active now', '5m ago', 'offline 2h', 'never'.",
)
preferred_contact: Literal["sync", "async"] = Field(
...,
description="Derived: webhook_url IS NOT NULL → 'sync' (push-capable), else 'async' (poll-only).",
)
status: Literal["online", "offline", "away"] = Field(
...,
description="Caller-asserted status (PATCH /v1/agents/{ref}); overrides derivation when explicit.",
)


class AgentRosterResponse(BaseModel):
"""Response for GET /v1/agents/roster — full directory snapshot."""

generated_at: datetime
agents: List[AgentRosterEntry]
158 changes: 158 additions & 0 deletions app/services/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,161 @@ def to_response_dict(agent: Agent, *, include_secret: bool = False) -> Dict:
"created_at": agent.created_at,
"updated_at": agent.updated_at,
}


# ───────────────────────────────────────────────────────────────────
# Agent Directory Phase A (PR #630) — roster + online derivation
# ───────────────────────────────────────────────────────────────────

_ONLINE_THRESHOLD_SECONDS = 300 # 5 min → online
_AWAY_THRESHOLD_SECONDS = 1800 # 30 min → away (else offline)
_ETAG_BUCKET_SECONDS = 300 # Align ETag freshness to derivation thresholds.


def _format_relative(now, last_seen_at) -> str:
if last_seen_at is None:
return "never"
delta = (now - last_seen_at).total_seconds()
if delta < 60:
return "active now"
if delta < 3600:
return f"{int(delta / 60)}m ago"
if delta < 86400:
return f"{int(delta / 3600)}h ago"
return f"{int(delta / 86400)}d ago"


def _derive_online_state(now, last_seen_at, asserted_status: str):
"""Returns (online_bool, derived_status).

Caller-asserted ``status`` (PATCH /v1/agents/{ref}) overrides the
derivation when explicitly set to 'away' or 'offline' — lets agents
voluntarily mark themselves unavailable during long-running tasks.
"""
if asserted_status in ("away", "offline"):
return (False, asserted_status)
if last_seen_at is None:
return (False, "offline")
delta = (now - last_seen_at).total_seconds()
if delta <= _ONLINE_THRESHOLD_SECONDS:
return (True, "online")
if delta <= _AWAY_THRESHOLD_SECONDS:
return (False, "away")
return (False, "offline")


def _bucketed_seen(last_seen_at):
"""Floor last_seen_at to a 5-min bucket for ETag stability.

The roster's derivation crosses category boundaries at 5 / 30 min,
so an ETag that flips on every-second timestamp drift would defeat
its own purpose. Bucketing means a quiet 5-minute window produces a
stable ETag — clients polling at the suggested cadence will get 304.
"""
if last_seen_at is None:
return ""
epoch = int(last_seen_at.timestamp())
return str(epoch - (epoch % _ETAG_BUCKET_SECONDS))


def _build_roster_entry(agent, now):
"""Pure helper: ORM Agent row → (entry_dict, etag_part_string).

Refactored out of ``list_roster`` so the per-agent shaping (online
derivation, description extraction, preferred-contact derivation,
ETag-part construction) is unit-testable without going through the
DB + ASGI stack.

Takes any object with ``.slug``, ``.display_name``, ``.last_seen_at``,
``.status``, ``.webhook_url``, and ``.metadata_`` — duck-typed so the
test can pass a SimpleNamespace.
"""
online, derived_status = _derive_online_state(
now, agent.last_seen_at, agent.status
)
description = None
meta = agent.metadata_ or {}
if isinstance(meta, dict) and isinstance(meta.get("description"), str):
description = meta["description"]

preferred_contact = "sync" if agent.webhook_url else "async"
entry = {
"name": agent.slug,
"display_name": agent.display_name,
"description": description,
"online": online,
"last_seen_relative": _format_relative(now, agent.last_seen_at),
"preferred_contact": preferred_contact,
"status": derived_status,
}
etag_part = "|".join([
agent.slug,
agent.display_name,
description or "",
"1" if online else "0",
preferred_contact,
derived_status,
_bucketed_seen(agent.last_seen_at),
])
return entry, etag_part


def _compute_roster_etag(etag_parts):
"""Pure helper: list of per-agent etag-part strings → weak ETag."""
import hashlib
digest = hashlib.sha256("\n".join(etag_parts).encode("utf-8")).hexdigest()
return f'W/"{digest[:16]}"'


async def list_roster(db: AsyncSession, user: AuthenticatedUser) -> Dict:
"""Build the roster snapshot for GET /v1/agents/roster.

Returns ``{"generated_at": now, "agents": [...], "etag": "..."}``.
Always-full list (no pagination), always excludes soft-deleted.
Display-optimized for prompt injection — see PRD §Surface 5.

The returned ETag is a content hash over the *material* fields
(excludes ``generated_at`` and the relative-time string, both of
which drift every second). Roster-router serves 304 when the
client's ``If-None-Match`` matches.
"""
from datetime import datetime, timezone

now = datetime.now(timezone.utc)

rows_q = (
select(Agent)
.where(Agent.user_id == user.id, Agent.deleted_at.is_(None))
.order_by(Agent.slug)
)
rows = (await db.execute(rows_q)).scalars().all()

entries = []
etag_parts = []
for agent in rows:
entry, etag_part = _build_roster_entry(agent, now)
entries.append(entry)
etag_parts.append(etag_part)

return {
"generated_at": now,
"agents": entries,
"etag": _compute_roster_etag(etag_parts),
}


async def touch_last_seen(db: AsyncSession, agent_id: str) -> None:
"""Update agent.last_seen_at = now() for hot-path activity.

Called from create_message (sender side) + list_inbox (recipient
side). Best-effort; failure logged but doesn't block the main flow.
"""
from datetime import datetime, timezone
from sqlalchemy import update
now = datetime.now(timezone.utc)
try:
await db.execute(
update(Agent).where(Agent.id == agent_id).values(last_seen_at=now)
)
except Exception:
pass # Best-effort — don't break the hot path.
26 changes: 26 additions & 0 deletions app/services/inbox_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ def _http_error(status: int, code: str, message: str) -> HTTPException:
)


def _bump_last_seen_stmt(agent_id, now):
"""Pure helper: build the SQL UPDATE statement that bumps an agent's
``last_seen_at`` to ``now``.

Refactored out of ``list_inbox`` so the §Surface-5 hot-path hook is
unit-testable without going through DB + ASGI. Returns the SQLAlchemy
statement; caller does ``await db.execute(stmt)``. Same pattern works
for both the queued→delivered transition path and the no-transition
path inside ``list_inbox``.

Agent Directory Phase A (PR #630).
"""
return update(Agent).where(Agent.id == agent_id).values(last_seen_at=now)


def _parse_state_filter(
states: Optional[str], *, default: Tuple[str, ...] = DEFAULT_STATES
) -> Tuple[str, ...]:
Expand Down Expand Up @@ -216,6 +231,17 @@ async def list_inbox(
.returning(Message.id)
)
await db.execute(upd_q)
# Agent Directory Phase A (PR #630): touch recipient's
# last_seen_at in the same transaction as the queued→delivered
# flip. Even when no queued messages exist, the poll proves
# activity — see the else branch below.
await db.execute(_bump_last_seen_stmt(agent.id, upd_now))
await db.commit()
else:
# No queued→delivered transition this call (filter excluded
# ``queued``), but we still observed activity from the recipient
# — bump their last_seen_at in its own transaction.
await db.execute(_bump_last_seen_stmt(agent.id, now_gate))
await db.commit()

# Total (after the transition).
Expand Down
Loading
Loading