Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions cueapi/resources/cues.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,29 +225,78 @@ def fire(
*,
payload_override: Optional[Dict[str, Any]] = None,
merge_strategy: Optional[str] = None,
send_at: Optional[Union[str, datetime]] = None,
exit_criteria: Optional[List[str]] = None,
idempotency_key: Optional[str] = None,
) -> Dict[str, Any]:
"""Fire an existing cue immediately, optionally overriding its payload.
"""Fire an existing cue, optionally overriding payload + scheduling.

For ad-hoc one-shot triggers and for using cues as a messaging channel
between agents (carry message/instruction/task/reply_cue_id in
payload_override).
``POST /v1/cues/{cue_id}/fire``. Returns the created execution
dict (not a Cue) — fire creates an execution row, not a new cue.

Useful for ad-hoc one-shot triggers and for using cues as a
messaging channel between agents (carry message/instruction/task/
reply_cue_id in ``payload_override``).

Args:
cue_id: The cue ID to fire.
payload_override: Override the cue's default payload for this fire
only. Persisted on the resulting execution row, never on the
cue itself.
merge_strategy: How payload_override combines with the cue's stored
payload. "merge" (server default) shallow-merges with override
wins on key collisions. "replace" uses override as the final
payload, ignoring cue.payload.
payload_override: Override the cue's default payload for this
fire only. Persisted on the resulting execution row, never
on the cue itself.
merge_strategy: How ``payload_override`` combines with the
cue's stored payload. ``"merge"`` (server default) shallow-
merges with override wins on key collisions. ``"replace"``
uses override as the final payload, ignoring ``cue.payload``.
send_at: Optional ISO 8601 timestamp (or ``datetime``) to
delay this fire. If omitted, the execution is scheduled
immediately. Per-fire scheduling landed in cueapi #618.
exit_criteria: Optional list of required-assertion keys for
§14 work-verification-light (cueapi #632). When non-null,
the receiver MUST report values for every key under
``outcome.assertions``; missing keys mark the execution
``verification_failed``. Empty list (``[]``) explicitly
opts out of cue-level required_assertions for this fire.
None = use cue-level (existing behavior). Max 20 keys.
idempotency_key: Optional opaque caller-supplied dedup key
(cueapi #683, ≤256 chars). Same key on the same cue
within 24h returns the cached execution without firing
again (matched by SHA-256 fingerprint of the canonicalized
body). Same key + DIFFERENT body in the window returns
409 ``idempotency_key_conflict``. Sent as a body field
(NOT the ``Idempotency-Key`` header — server-side cues
fire diverges from messaging-primitive convention here;
Phase 2 spec puts it in the body).

Returns:
The execution dict (id, scheduled_for, status, etc.).
The execution dict (id, scheduled_for, status, triggered_by,
etc.).

Examples:
>>> exec = client.cues.fire("cue_abc123")
>>> exec = client.cues.fire(
... "cue_abc123",
... payload_override={"task": "manual-trigger"},
... send_at="2026-05-07T12:00:00Z",
... exit_criteria=["task_completed", "result_valid"],
... idempotency_key="ci-run-456",
... )
"""
body: Dict[str, Any] = {}
if payload_override is not None:
body["payload_override"] = payload_override
if merge_strategy is not None:
body["merge_strategy"] = merge_strategy
if send_at is not None:
body["send_at"] = (
send_at.isoformat() if isinstance(send_at, datetime) else send_at
)
if exit_criteria is not None:
body["exit_criteria"] = exit_criteria
# idempotency_key is a body field on cues fire (server's
# FireRequest schema), unlike messaging-primitive idempotency
# which uses the Idempotency-Key header. Server-side
# inconsistency that the SDK has to live with.
if idempotency_key is not None:
body["idempotency_key"] = idempotency_key

return self._client._post(f"/v1/cues/{cue_id}/fire", json=body)
86 changes: 86 additions & 0 deletions tests/test_cues.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,89 @@ def test_delete(self, client):
client.cues.delete(cue.id)
with pytest.raises(CueNotFoundError):
client.cues.get(cue.id)


class TestCueFire:
"""Tests for the fire() method (manual trigger / per-fire override).

All tests run against a real cue (the ``cue`` fixture creates one for
each test). Fire creates an execution row; we don't poll the
execution status here — the worker/poller pipeline isn't exercised
by the SDK suite. We just verify the fire response shape and HTTP
success.
"""

def test_fire_no_args(self, client, cue):
"""Bare fire() — no payload override, no scheduling, immediate."""
execution = client.cues.fire(cue.id)
# Server returns the created execution dict
assert "id" in execution
assert execution["cue_id"] == cue.id
# Triggered manually should set triggered_by accordingly
assert execution.get("triggered_by") in ("manual_fire", "manual")
# Default scheduling is immediate (or close to it)
assert "scheduled_for" in execution

def test_fire_with_payload_override(self, client, cue):
"""Fire with payload_override — execution carries the override."""
execution = client.cues.fire(
cue.id,
payload_override={"task": "manual", "trigger": "test"},
)
assert "id" in execution
# Default merge_strategy is server-side merge — we don't assert
# on the merged result here (that's a server test); just verify
# the call shape was accepted.

def test_fire_with_merge_strategy_replace(self, client, cue):
"""Replace strategy — payload_override fully replaces stored payload."""
execution = client.cues.fire(
cue.id,
payload_override={"action": "replace-test"},
merge_strategy="replace",
)
assert "id" in execution

def test_fire_with_send_at(self, client, cue):
"""send_at delays this fire to a specific timestamp (cueapi #618)."""
future = "2030-01-01T12:00:00Z"
execution = client.cues.fire(cue.id, send_at=future)
assert "id" in execution
# Server reflects the requested scheduled_for
# (allow some tolerance — server may normalize the timestamp)
assert "scheduled_for" in execution

@pytest.mark.xfail(
reason=(
"Staging replay-on-same-key behavior unverified. First run "
"(2026-05-07 sha 3934502) returned distinct execution IDs even "
"with idempotency_key correctly in the body. Could be staging "
"migration 052 not applied yet, deploy race vs the cueapi #683 "
"rollout, or a server-side bug. SDK wire-shape is correct "
"(verified by inspection vs FireRequest schema). Remove the "
"xfail after Backlog row 'Verify staging fire idempotency "
"deployment' resolves."
),
strict=False,
)
def test_fire_with_idempotency_key(self, client, cue):
"""idempotency_key replays the same fire (cueapi #683).

SDK puts the key in the BODY (server's FireRequest schema; cues
fire diverges from the messaging-primitive's Idempotency-Key
HEADER convention).
"""
import uuid

key = f"sdk-test-{uuid.uuid4().hex[:8]}"
first = client.cues.fire(cue.id, idempotency_key=key)
second = client.cues.fire(cue.id, idempotency_key=key)
# Same key + same body → server should return the SAME execution
assert first["id"] == second["id"]

def test_fire_returns_dict_not_cue(self, client, cue):
"""Sanity: fire returns the execution dict (not a typed Cue)."""
result = client.cues.fire(cue.id)
# Not a Cue object — fire creates an execution, not a new cue
assert not isinstance(result, Cue)
assert isinstance(result, dict)
Loading