From a49523d99377411f87cf680e4ecb898db3ec9741 Mon Sep 17 00:00:00 2001 From: mikemolinet Date: Mon, 11 May 2026 09:07:06 -0700 Subject: [PATCH] feat(cues): per-fire send_at scheduling on POST /v1/cues/{id}/fire (parity port of cueapi/cueapi#618) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-port of closed [PR #45](https://github.com/cueapi/cueapi-core/pull/45) which was on a stale base ~8880 deletions behind main. Fresh against current main HEAD. Closes §13 / Phase 12.1.7 (cue side). Optional `send_at` timestamp on the fire body delays dispatch until the time elapses. ## Why this is small The dispatch loop in `worker/poller.py:dispatch_outbox` already gates on `DispatchOutbox.scheduled_at` (added in slice 3b for messages, just merged via PR #77 for messages send_at). This PR plumbs `send_at` from FireRequest through to `Execution.scheduled_for` and `DispatchOutbox.scheduled_at`. No poller changes required. ## What lands - **app/schemas/cue.py** — new `FireRequest` Pydantic model with optional `send_at: Optional[datetime]` field. - **app/routers/cues.py** — `fire_cue` endpoint accepts optional `body: Optional[FireRequest] = None`. Computes effective `scheduled_for` (future send_at → that timestamp; past or omitted → now). Sets `DispatchOutbox.scheduled_at` on the outbox row when scheduled. - **tests/test_fire_send_at.py** — 6 tests verbatim from private (5 active + 1 skipped). The skipped test relies on `payload_override` which is a separate parity port (cueapi/cueapi#589/#590 — not yet in cueapi-core); marked with a `@pytest.mark.skip` + reason pointer so it un-skips automatically when those ports land. ## Semantics (per private cueapi#618) - `send_at` omitted (or no body) → existing behavior: dispatch immediately, outbox.scheduled_at = NULL. - `send_at` in the future → execution.scheduled_for = send_at, outbox.scheduled_at = send_at, dispatcher gates until that time. - `send_at` in the past → forgiving fallback to "fire now". No error. Same shape as send_at omitted. Idempotent — caller doesn't have to worry about clock skew or being a few ms late. ## Tests 5 new tests pass (omitted, future-delays-dispatch, past-falls-back, invalid-timestamp-422, worker-transport-no-outbox). 1 skipped (composes-with-payload-override, depends on PR #589/#590 port). Full local suite: 834 passed + 18 xfailed (pre-existing) + 4 skipped (1 new, 3 pre-existing). Zero regressions. ## Re-port note Re-port of closed PR #45. Fresh against current main after PR #74 + #75 + #77 merged earlier in this session. --- app/routers/cues.py | 42 +++++++- app/schemas/cue.py | 17 +++ tests/test_fire_send_at.py | 209 +++++++++++++++++++++++++++++++++++++ 3 files changed, 263 insertions(+), 5 deletions(-) create mode 100644 tests/test_fire_send_at.py diff --git a/app/routers/cues.py b/app/routers/cues.py index cefa183..927a3b5 100644 --- a/app/routers/cues.py +++ b/app/routers/cues.py @@ -7,7 +7,7 @@ from app.auth import AuthenticatedUser, get_current_user from app.database import get_db -from app.schemas.cue import CueCreate, CueDetailResponse, CueListResponse, CueResponse, CueUpdate +from app.schemas.cue import CueCreate, CueDetailResponse, CueListResponse, CueResponse, CueUpdate, FireRequest from app.services.cue_service import create_cue, delete_cue, get_cue, list_cues, update_cue router = APIRouter(prefix="/v1/cues", tags=["cues"]) @@ -91,10 +91,17 @@ async def delete( @router.post("/{cue_id}/fire", status_code=200) async def fire_cue( cue_id: str, + body: Optional[FireRequest] = None, user: AuthenticatedUser = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): - """Manually fire a cue — creates an execution immediately regardless of schedule.""" + """Manually fire a cue — creates an execution immediately regardless of schedule. + + Optional body fields: + - ``send_at`` — schedule the fire for a future UTC timestamp. + Past timestamps treated as fire-now (forgiving fallback). + See FireRequest for full semantics (parity port of cueapi/cueapi#618). + """ import uuid as uuid_mod from datetime import datetime, timezone from sqlalchemy import select @@ -108,8 +115,25 @@ async def fire_cue( raise HTTPException(status_code=404, detail={"error": {"code": "cue_not_found", "message": "Cue not found", "status": 404}}) now = datetime.now(timezone.utc) + + # §13 (Phase 12.1.7): per-fire scheduling. ``send_at`` in the future + # delays dispatch until that time; past timestamps are forgiving and + # treated as "fire now" (idempotent — caller doesn't need to worry + # about clock skew or being a few ms late). The dispatch loop in + # worker/poller.py:dispatch_outbox already gates on + # ``DispatchOutbox.scheduled_at`` (NULL or <= now), so we just plumb + # the timestamp through. ``Execution.scheduled_for`` records the + # intended fire time for audit; the worker doesn't actually use this + # for gating (the outbox does), but listings + dashboards show it. + requested_at = body.send_at if body and body.send_at else None + effective_scheduled_for = requested_at if requested_at and requested_at > now else now + is_scheduled = effective_scheduled_for > now + execution_id = uuid_mod.uuid4() - execution = Execution(id=execution_id, cue_id=cue.id, scheduled_for=now, status="pending", triggered_by="manual_fire") + execution = Execution( + id=execution_id, cue_id=cue.id, scheduled_for=effective_scheduled_for, + status="pending", triggered_by="manual_fire", + ) db.add(execution) if cue.callback_transport == "webhook" and cue.callback_url: @@ -118,11 +142,15 @@ async def fire_cue( ws = user_row.scalar_one_or_none() or "" outbox = DispatchOutbox( execution_id=execution_id, cue_id=cue.id, task_type="deliver", + # §13: when send_at is in the future, set scheduled_at so the + # dispatcher gates dispatch until then. NULL = dispatch + # immediately (existing behavior). + scheduled_at=effective_scheduled_for if is_scheduled else None, payload={ "execution_id": str(execution_id), "cue_id": cue.id, "cue_name": cue.name, "user_id": str(user.id), "callback_url": cue.callback_url, "callback_method": cue.callback_method, "callback_headers": cue.callback_headers or {}, - "payload": cue.payload or {}, "scheduled_for": now.isoformat(), + "payload": cue.payload or {}, "scheduled_for": effective_scheduled_for.isoformat(), "retry_max_attempts": cue.retry_max_attempts, "retry_backoff_minutes": cue.retry_backoff_minutes or [1, 5, 15], "webhook_secret": ws, @@ -131,4 +159,8 @@ async def fire_cue( db.add(outbox) await db.commit() - return {"id": str(execution_id), "cue_id": cue.id, "scheduled_for": now.isoformat(), "status": "pending", "triggered_by": "manual_fire"} + return { + "id": str(execution_id), "cue_id": cue.id, + "scheduled_for": effective_scheduled_for.isoformat(), + "status": "pending", "triggered_by": "manual_fire", + } diff --git a/app/schemas/cue.py b/app/schemas/cue.py index f613031..5047ee8 100644 --- a/app/schemas/cue.py +++ b/app/schemas/cue.py @@ -63,6 +63,23 @@ class OnFailureConfig(BaseModel): pause: bool = False +class FireRequest(BaseModel): + """Body for POST /v1/cues/{id}/fire — all fields optional.""" + + send_at: Optional[datetime] = Field( + default=None, + description=( + "Optional UTC timestamp to schedule this fire for the future. " + "When set, the resulting execution sits in `pending` until " + "`send_at <= now()`, then enters the normal dispatch path. " + "Same shape as the existing per-cue schedule, but per-fire. " + "Past timestamps are treated as 'fire now' (idempotent + " + "forgiving — no error, just dispatches immediately). Phase " + "12.1.7 / roadmap §13 (parity port of cueapi/cueapi#618)." + ), + ) + + class CueCreate(BaseModel): name: str = Field(..., max_length=255) description: Optional[str] = None diff --git a/tests/test_fire_send_at.py b/tests/test_fire_send_at.py new file mode 100644 index 0000000..02e0dba --- /dev/null +++ b/tests/test_fire_send_at.py @@ -0,0 +1,209 @@ +"""Tests for §13 (Phase 12.1.7): per-fire scheduling on POST /v1/cues/{id}/fire. + +Roadmap doc §13: optional `send_at` timestamp on fire that delays dispatch +until the time elapsed. Same shape as cue's per-cue schedule, but per-fire. + +These tests pin: + +1. No `send_at` (or omitted) → dispatch immediately (existing behavior). +2. `send_at` in the future → execution's ``scheduled_for`` is set to + send_at; outbox row has ``scheduled_at`` set so the dispatcher's + existing ``scheduled_at IS NULL OR scheduled_at <= now()`` filter + gates dispatch until then. +3. `send_at` in the past → forgiving fallback to "fire now" (no error). + ``scheduled_for`` set to now; outbox ``scheduled_at`` left NULL. +4. `send_at` composes with `payload_override` — both fields can coexist + on the same fire. +5. Response payload reflects the effective ``scheduled_for``. +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy import select + +from app.models.dispatch_outbox import DispatchOutbox +from app.models.execution import Execution + + +async def _create_cue(client, auth_headers, name="send-at-test"): + resp = await client.post( + "/v1/cues", + json={ + "name": name, + "schedule": {"type": "recurring", "cron": "0 * * * *"}, + "callback": {"url": "https://example.com/webhook"}, + "payload": {"task": "send_at_default"}, + }, + headers=auth_headers, + ) + assert resp.status_code == 201, resp.text + return resp.json()["id"] + + +@pytest.mark.asyncio +async def test_fire_no_send_at_dispatches_immediately(client, auth_headers, db_session): + """Existing behavior preserved: no body or no send_at → outbox.scheduled_at NULL.""" + cue_id = await _create_cue(client, auth_headers, "send-at-immediate") + + # No body + resp = await client.post(f"/v1/cues/{cue_id}/fire", headers=auth_headers) + assert resp.status_code == 200 + exec_id = resp.json()["id"] + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id)) + ) + ).scalar_one() + assert outbox.scheduled_at is None, ( + "no send_at → outbox.scheduled_at must be NULL so dispatcher fires immediately" + ) + + +@pytest.mark.asyncio +async def test_fire_send_at_future_delays_dispatch(client, auth_headers, db_session): + cue_id = await _create_cue(client, auth_headers, "send-at-future") + future = datetime.now(timezone.utc) + timedelta(hours=2) + + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={"send_at": future.isoformat()}, + headers=auth_headers, + ) + assert resp.status_code == 200 + body = resp.json() + exec_id = body["id"] + + # Response reflects the future scheduled_for. + parsed = datetime.fromisoformat(body["scheduled_for"]) + assert abs((parsed - future).total_seconds()) < 1.0 + + # Execution row + outbox row both reflect the schedule. + execution = ( + await db_session.execute(select(Execution).where(Execution.id == uuid.UUID(exec_id))) + ).scalar_one() + assert abs((execution.scheduled_for - future).total_seconds()) < 1.0 + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id)) + ) + ).scalar_one() + assert outbox.scheduled_at is not None, "send_at in future → outbox.scheduled_at must be set" + assert abs((outbox.scheduled_at - future).total_seconds()) < 1.0 + + +@pytest.mark.asyncio +async def test_fire_send_at_past_falls_back_to_now(client, auth_headers, db_session): + """Past timestamps are forgiving — no error, treated as 'fire now'. + + Idempotent: callers don't need to worry about clock skew or being + a few ms late after computing a send_at locally. + """ + cue_id = await _create_cue(client, auth_headers, "send-at-past") + past = datetime.now(timezone.utc) - timedelta(hours=1) + + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={"send_at": past.isoformat()}, + headers=auth_headers, + ) + assert resp.status_code == 200 + exec_id = resp.json()["id"] + + outbox = ( + await db_session.execute( + select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id)) + ) + ).scalar_one() + assert outbox.scheduled_at is None, ( + "send_at in past → forgiving fallback; outbox.scheduled_at must be NULL" + ) + + +@pytest.mark.skip( + reason="payload_override on FireRequest is a separate parity port (cueapi/cueapi#589/#590). " + "Once those land in cueapi-core, un-skip this test to verify send_at + payload_override compose." +) +@pytest.mark.asyncio +async def test_fire_send_at_composes_with_payload_override(client, auth_headers, db_session): + cue_id = await _create_cue(client, auth_headers, "send-at-with-override") + future = datetime.now(timezone.utc) + timedelta(hours=1) + + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={ + "send_at": future.isoformat(), + "payload_override": {"task": "scheduled_team_comm", "from": "test-agent"}, + }, + headers=auth_headers, + ) + assert resp.status_code == 200 + exec_id = resp.json()["id"] + + execution = ( + await db_session.execute(select(Execution).where(Execution.id == uuid.UUID(exec_id))) + ).scalar_one() + # Both behaviors apply. + assert abs((execution.scheduled_for - future).total_seconds()) < 1.0 + assert execution.payload_override == { + "task": "scheduled_team_comm", + "from": "test-agent", + } + + +@pytest.mark.asyncio +async def test_fire_send_at_invalid_timestamp_returns_422(client, auth_headers): + """Pydantic catches malformed datetime strings.""" + cue_id = await _create_cue(client, auth_headers, "send-at-invalid") + + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={"send_at": "not-a-date"}, + headers=auth_headers, + ) + assert resp.status_code in (400, 422) + + +@pytest.mark.asyncio +async def test_fire_send_at_worker_transport_no_outbox(client, auth_headers, db_session): + """Worker-transport cues don't create an outbox row, but ``scheduled_for`` + on the Execution row still reflects send_at so worker pull endpoints + can filter by it (not done here — just pin the Execution shape).""" + create = await client.post( + "/v1/cues", + json={ + "name": "send-at-worker", + "schedule": {"type": "recurring", "cron": "0 * * * *"}, + "transport": "worker", + "payload": {"task": "scheduled_worker_task"}, + }, + headers=auth_headers, + ) + assert create.status_code == 201, create.text + cue_id = create.json()["id"] + + future = datetime.now(timezone.utc) + timedelta(minutes=30) + resp = await client.post( + f"/v1/cues/{cue_id}/fire", + json={"send_at": future.isoformat()}, + headers=auth_headers, + ) + assert resp.status_code == 200 + exec_id = resp.json()["id"] + + execution = ( + await db_session.execute(select(Execution).where(Execution.id == uuid.UUID(exec_id))) + ).scalar_one() + assert abs((execution.scheduled_for - future).total_seconds()) < 1.0 + + # No outbox row for worker transport. + outbox = ( + await db_session.execute( + select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id)) + ) + ).scalar_one_or_none() + assert outbox is None