Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions app/routers/cues.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from app.auth import AuthenticatedUser, get_current_user
from app.database import get_db
from app.schemas.cue import CueCreate, CueDetailResponse, CueListResponse, CueResponse, CueUpdate
from app.schemas.cue import CueCreate, CueDetailResponse, CueListResponse, CueResponse, CueUpdate, FireRequest
from app.services.cue_service import create_cue, delete_cue, get_cue, list_cues, update_cue

router = APIRouter(prefix="/v1/cues", tags=["cues"])
Expand Down Expand Up @@ -91,10 +91,17 @@ async def delete(
@router.post("/{cue_id}/fire", status_code=200)
async def fire_cue(
cue_id: str,
body: Optional[FireRequest] = None,
user: AuthenticatedUser = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Manually fire a cue — creates an execution immediately regardless of schedule."""
"""Manually fire a cue — creates an execution immediately regardless of schedule.

Optional body fields:
- ``send_at`` — schedule the fire for a future UTC timestamp.
Past timestamps treated as fire-now (forgiving fallback).
See FireRequest for full semantics (parity port of cueapi/cueapi#618).
"""
import uuid as uuid_mod
from datetime import datetime, timezone
from sqlalchemy import select
Expand All @@ -108,8 +115,25 @@ async def fire_cue(
raise HTTPException(status_code=404, detail={"error": {"code": "cue_not_found", "message": "Cue not found", "status": 404}})

now = datetime.now(timezone.utc)

# §13 (Phase 12.1.7): per-fire scheduling. ``send_at`` in the future
# delays dispatch until that time; past timestamps are forgiving and
# treated as "fire now" (idempotent — caller doesn't need to worry
# about clock skew or being a few ms late). The dispatch loop in
# worker/poller.py:dispatch_outbox already gates on
# ``DispatchOutbox.scheduled_at`` (NULL or <= now), so we just plumb
# the timestamp through. ``Execution.scheduled_for`` records the
# intended fire time for audit; the worker doesn't actually use this
# for gating (the outbox does), but listings + dashboards show it.
requested_at = body.send_at if body and body.send_at else None
effective_scheduled_for = requested_at if requested_at and requested_at > now else now
is_scheduled = effective_scheduled_for > now

execution_id = uuid_mod.uuid4()
execution = Execution(id=execution_id, cue_id=cue.id, scheduled_for=now, status="pending", triggered_by="manual_fire")
execution = Execution(
id=execution_id, cue_id=cue.id, scheduled_for=effective_scheduled_for,
status="pending", triggered_by="manual_fire",
)
db.add(execution)

if cue.callback_transport == "webhook" and cue.callback_url:
Expand All @@ -118,11 +142,15 @@ async def fire_cue(
ws = user_row.scalar_one_or_none() or ""
outbox = DispatchOutbox(
execution_id=execution_id, cue_id=cue.id, task_type="deliver",
# §13: when send_at is in the future, set scheduled_at so the
# dispatcher gates dispatch until then. NULL = dispatch
# immediately (existing behavior).
scheduled_at=effective_scheduled_for if is_scheduled else None,
payload={
"execution_id": str(execution_id), "cue_id": cue.id, "cue_name": cue.name,
"user_id": str(user.id), "callback_url": cue.callback_url,
"callback_method": cue.callback_method, "callback_headers": cue.callback_headers or {},
"payload": cue.payload or {}, "scheduled_for": now.isoformat(),
"payload": cue.payload or {}, "scheduled_for": effective_scheduled_for.isoformat(),
"retry_max_attempts": cue.retry_max_attempts,
"retry_backoff_minutes": cue.retry_backoff_minutes or [1, 5, 15],
"webhook_secret": ws,
Expand All @@ -131,4 +159,8 @@ async def fire_cue(
db.add(outbox)

await db.commit()
return {"id": str(execution_id), "cue_id": cue.id, "scheduled_for": now.isoformat(), "status": "pending", "triggered_by": "manual_fire"}
return {
"id": str(execution_id), "cue_id": cue.id,
"scheduled_for": effective_scheduled_for.isoformat(),
"status": "pending", "triggered_by": "manual_fire",
}
17 changes: 17 additions & 0 deletions app/schemas/cue.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ class OnFailureConfig(BaseModel):
pause: bool = False


class FireRequest(BaseModel):
"""Body for POST /v1/cues/{id}/fire — all fields optional."""

send_at: Optional[datetime] = Field(
default=None,
description=(
"Optional UTC timestamp to schedule this fire for the future. "
"When set, the resulting execution sits in `pending` until "
"`send_at <= now()`, then enters the normal dispatch path. "
"Same shape as the existing per-cue schedule, but per-fire. "
"Past timestamps are treated as 'fire now' (idempotent + "
"forgiving — no error, just dispatches immediately). Phase "
"12.1.7 / roadmap §13 (parity port of cueapi/cueapi#618)."
),
)


class CueCreate(BaseModel):
name: str = Field(..., max_length=255)
description: Optional[str] = None
Expand Down
209 changes: 209 additions & 0 deletions tests/test_fire_send_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
"""Tests for §13 (Phase 12.1.7): per-fire scheduling on POST /v1/cues/{id}/fire.

Roadmap doc §13: optional `send_at` timestamp on fire that delays dispatch
until the time elapsed. Same shape as cue's per-cue schedule, but per-fire.

These tests pin:

1. No `send_at` (or omitted) → dispatch immediately (existing behavior).
2. `send_at` in the future → execution's ``scheduled_for`` is set to
send_at; outbox row has ``scheduled_at`` set so the dispatcher's
existing ``scheduled_at IS NULL OR scheduled_at <= now()`` filter
gates dispatch until then.
3. `send_at` in the past → forgiving fallback to "fire now" (no error).
``scheduled_for`` set to now; outbox ``scheduled_at`` left NULL.
4. `send_at` composes with `payload_override` — both fields can coexist
on the same fire.
5. Response payload reflects the effective ``scheduled_for``.
"""
from __future__ import annotations

import uuid
from datetime import datetime, timedelta, timezone

import pytest
from sqlalchemy import select

from app.models.dispatch_outbox import DispatchOutbox
from app.models.execution import Execution


async def _create_cue(client, auth_headers, name="send-at-test"):
resp = await client.post(
"/v1/cues",
json={
"name": name,
"schedule": {"type": "recurring", "cron": "0 * * * *"},
"callback": {"url": "https://example.com/webhook"},
"payload": {"task": "send_at_default"},
},
headers=auth_headers,
)
assert resp.status_code == 201, resp.text
return resp.json()["id"]


@pytest.mark.asyncio
async def test_fire_no_send_at_dispatches_immediately(client, auth_headers, db_session):
"""Existing behavior preserved: no body or no send_at → outbox.scheduled_at NULL."""
cue_id = await _create_cue(client, auth_headers, "send-at-immediate")

# No body
resp = await client.post(f"/v1/cues/{cue_id}/fire", headers=auth_headers)
assert resp.status_code == 200
exec_id = resp.json()["id"]

outbox = (
await db_session.execute(
select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id))
)
).scalar_one()
assert outbox.scheduled_at is None, (
"no send_at → outbox.scheduled_at must be NULL so dispatcher fires immediately"
)


@pytest.mark.asyncio
async def test_fire_send_at_future_delays_dispatch(client, auth_headers, db_session):
cue_id = await _create_cue(client, auth_headers, "send-at-future")
future = datetime.now(timezone.utc) + timedelta(hours=2)

resp = await client.post(
f"/v1/cues/{cue_id}/fire",
json={"send_at": future.isoformat()},
headers=auth_headers,
)
assert resp.status_code == 200
body = resp.json()
exec_id = body["id"]

# Response reflects the future scheduled_for.
parsed = datetime.fromisoformat(body["scheduled_for"])
assert abs((parsed - future).total_seconds()) < 1.0

# Execution row + outbox row both reflect the schedule.
execution = (
await db_session.execute(select(Execution).where(Execution.id == uuid.UUID(exec_id)))
).scalar_one()
assert abs((execution.scheduled_for - future).total_seconds()) < 1.0

outbox = (
await db_session.execute(
select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id))
)
).scalar_one()
assert outbox.scheduled_at is not None, "send_at in future → outbox.scheduled_at must be set"
assert abs((outbox.scheduled_at - future).total_seconds()) < 1.0


@pytest.mark.asyncio
async def test_fire_send_at_past_falls_back_to_now(client, auth_headers, db_session):
"""Past timestamps are forgiving — no error, treated as 'fire now'.

Idempotent: callers don't need to worry about clock skew or being
a few ms late after computing a send_at locally.
"""
cue_id = await _create_cue(client, auth_headers, "send-at-past")
past = datetime.now(timezone.utc) - timedelta(hours=1)

resp = await client.post(
f"/v1/cues/{cue_id}/fire",
json={"send_at": past.isoformat()},
headers=auth_headers,
)
assert resp.status_code == 200
exec_id = resp.json()["id"]

outbox = (
await db_session.execute(
select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id))
)
).scalar_one()
assert outbox.scheduled_at is None, (
"send_at in past → forgiving fallback; outbox.scheduled_at must be NULL"
)


@pytest.mark.skip(
reason="payload_override on FireRequest is a separate parity port (cueapi/cueapi#589/#590). "
"Once those land in cueapi-core, un-skip this test to verify send_at + payload_override compose."
)
@pytest.mark.asyncio
async def test_fire_send_at_composes_with_payload_override(client, auth_headers, db_session):
cue_id = await _create_cue(client, auth_headers, "send-at-with-override")
future = datetime.now(timezone.utc) + timedelta(hours=1)

resp = await client.post(
f"/v1/cues/{cue_id}/fire",
json={
"send_at": future.isoformat(),
"payload_override": {"task": "scheduled_team_comm", "from": "test-agent"},
},
headers=auth_headers,
)
assert resp.status_code == 200
exec_id = resp.json()["id"]

execution = (
await db_session.execute(select(Execution).where(Execution.id == uuid.UUID(exec_id)))
).scalar_one()
# Both behaviors apply.
assert abs((execution.scheduled_for - future).total_seconds()) < 1.0
assert execution.payload_override == {
"task": "scheduled_team_comm",
"from": "test-agent",
}


@pytest.mark.asyncio
async def test_fire_send_at_invalid_timestamp_returns_422(client, auth_headers):
"""Pydantic catches malformed datetime strings."""
cue_id = await _create_cue(client, auth_headers, "send-at-invalid")

resp = await client.post(
f"/v1/cues/{cue_id}/fire",
json={"send_at": "not-a-date"},
headers=auth_headers,
)
assert resp.status_code in (400, 422)


@pytest.mark.asyncio
async def test_fire_send_at_worker_transport_no_outbox(client, auth_headers, db_session):
"""Worker-transport cues don't create an outbox row, but ``scheduled_for``
on the Execution row still reflects send_at so worker pull endpoints
can filter by it (not done here — just pin the Execution shape)."""
create = await client.post(
"/v1/cues",
json={
"name": "send-at-worker",
"schedule": {"type": "recurring", "cron": "0 * * * *"},
"transport": "worker",
"payload": {"task": "scheduled_worker_task"},
},
headers=auth_headers,
)
assert create.status_code == 201, create.text
cue_id = create.json()["id"]

future = datetime.now(timezone.utc) + timedelta(minutes=30)
resp = await client.post(
f"/v1/cues/{cue_id}/fire",
json={"send_at": future.isoformat()},
headers=auth_headers,
)
assert resp.status_code == 200
exec_id = resp.json()["id"]

execution = (
await db_session.execute(select(Execution).where(Execution.id == uuid.UUID(exec_id)))
).scalar_one()
assert abs((execution.scheduled_for - future).total_seconds()) < 1.0

# No outbox row for worker transport.
outbox = (
await db_session.execute(
select(DispatchOutbox).where(DispatchOutbox.execution_id == uuid.UUID(exec_id))
)
).scalar_one_or_none()
assert outbox is None