feat(sdk): introduce SnapshotReplayAgent for deterministic conversati…#3029
feat(sdk): introduce SnapshotReplayAgent for deterministic conversati…#3029vivekvjnk wants to merge 1 commit into
Conversation
…on replay - Add SnapshotReplayAgent to support LLM-free regression testing by replaying recorded conversation event streams. - Implement robust session discovery that automatically resolves nested conversation subdirectories within a base persistence path. - Support explicit session targeting via 'replay_conversation_id'. - Implement 'Clean Replay Logs' by automatically stripping LLM reasoning, thoughts, and summaries from replayed events to focus on tool interaction. - Ensure deterministic execution by re-executing tool calls against the live environment while comparing for observation drift. - Provide comprehensive example code and technical documentation for the record-and-replay workflow. Co-authored-by: Google Antigravity
|
[Automatic Post]: I have assigned @neubig as a reviewer based on the repository MAINTAINERS file. Thanks in advance for the help! |
|
[Automatic Post]: This PR seems to be currently waiting for review. @neubig, could you please take a look when you have a chance? |
neubig
left a comment
There was a problem hiding this comment.
Thanks for the contribution @vivekvjnk — neat idea and a well-structured write-up! 🙌
Overall direction looks fine. Posting feedback rather than approving because there are a couple of test-isolation and example-runtime issues worth tightening before merge, plus a couple of subtle behaviors in step() and snapshot discovery to think about. Nothing scary, just stuff a reviewer is going to ping you on anyway.
This review was generated by an AI agent (OpenHands) on behalf of @xingyaoww.
| llm = LLM(model="test", api_key=SecretStr("test")) | ||
|
|
||
| # Patch the class-level helper used inside step() | ||
| ConversationState.get_unmatched_actions = staticmethod(lambda x: []) # type: ignore[method-assign] |
There was a problem hiding this comment.
🟠 Important — test isolation leak. Assigning to a class attribute like this permanently mutates ConversationState for the rest of the pytest session. Other tests already call ConversationState.get_unmatched_actions(...) (e.g. tests/sdk/conversation/local/test_confirmation_mode.py), so if pytest happens to schedule them after this one, they'll see the stub and fail mysteriously. Same issue at line 159.
Use the monkeypatch fixture so it's auto-restored:
def test_replay_agent_step_no_drift(tmp_path, monkeypatch):
monkeypatch.setattr(
ConversationState,
"get_unmatched_actions",
staticmethod(lambda x: []),
)
...That also lets you drop the # type: ignore[method-assign].
| ) | ||
| agent._initialize(conv.state) | ||
| tools = DummyTool.create() | ||
| agent._tools = {tool.name: tool for tool in tools} |
There was a problem hiding this comment.
🟡 Suggestion — private attribute mutation. agent._tools = {...} reaches into a private attribute that the base Agent populates from tools + _initialize(). If that internal contract changes, these tests silently break (or worse, silently keep passing while not exercising what you think they are).
A cleaner option: register DummyTool once via the normal tool registry and pass tools=[Tool(name=DummyTool.name)] to the agent constructor, so _initialize() builds _tools for you. If you really need to inject a per-test executor, consider exposing a tiny test seam in Agent instead.
| log_data = json.loads(f.read().strip()) | ||
| drift_data = log_data["Drift from expected observation"] | ||
| assert isinstance(drift_data, dict) | ||
| assert drift_data["Drift Present"] is True |
There was a problem hiding this comment.
🟠 Important — on-disk replay path is untested. All three tests cover only the in-memory replay_snapshot= route. The actual replay_persistence + LocalFileStore + EventLog flow (including _resolve_persistence_path's subdirectory discovery) has zero coverage, even though the example and the bulk of the docstring are about that path.
A minimal end-to-end test would be: drive a tiny LocalConversation with a stubbed-LLM Agent so it emits a real ActionEvent/ObservationEvent pair to disk, then construct a SnapshotReplayAgent(replay_persistence=tmp_path) and assert the loader picks them up. Worth adding before this gets used in CI.
| else: | ||
| # Discovery logic: find the actual conversation subdirectory | ||
| resolved_path = ( | ||
| self._resolve_persistence_path(self.replay_persistence) | ||
| or self.replay_persistence | ||
| ) |
There was a problem hiding this comment.
🟠 Important — non-deterministic snapshot selection. _resolve_persistence_path walks os.listdir(...) (arbitrary order) and returns the first subdir that happens to contain events/. If replay_persistence ever ends up holding more than one conversation (very common since persistence_dir is shared across runs), which session you replay depends on filesystem iteration order. Two reasonable fixes:
- Require
replay_conversation_idwhenever the base path contains >1 candidate, and fail loudly otherwise. - Sort the candidates and pick deterministically (e.g. most-recently-modified) — and document it.
The current behavior is going to bite someone in CI.
| elif isinstance(event, MessageEvent): | ||
| logger.debug(f"Replaying MessageEvent: {event.llm_message}...") | ||
| if event.source == "agent": | ||
| # Strip reasoning from agent messages during replay | ||
| new_msg = event.llm_message.model_copy( | ||
| update={ | ||
| "reasoning_content": None, | ||
| "thinking_blocks": [], | ||
| "responses_reasoning_item": None, | ||
| } | ||
| ) | ||
| event = event.model_copy(update={"llm_message": new_msg}) | ||
| on_event(event) | ||
| state.execution_status = ConversationExecutionStatus.FINISHED |
There was a problem hiding this comment.
🟠 Important — silent early termination on the first agent MessageEvent. _prepare_replay_events keeps all agent MessageEvents in _replay_events, but here step() sets execution_status = FINISHED on the first one. If a snapshot has e.g. [Action, Observation, AgentMessage, Action, Observation, AgentMessage] (any conversation that involves an intermediate agent reply), the second Action and onward are never replayed, and the drift log is silently incomplete.
Either:
- Only add the last agent
MessageEventto_replay_events(and emit any earlier ones viaon_eventwithout flipping status), or - Don't mark
FINISHEDhere and let the conversation's normal end-of-script path (line 158) handle it.
Worth a unit test covering a multi-message snapshot.
| from openhands.sdk.agent.replay_agent import SnapshotReplayAgent | ||
| from openhands.tools.terminal import TerminalTool | ||
|
|
||
| from dotenv import load_dotenv |
There was a problem hiding this comment.
🟠 Important — python-dotenv isn't a project dependency. The example will ModuleNotFoundError for anyone running it from a clean checkout. The other examples/01_standalone_sdk/*.py scripts read env vars directly — please follow that pattern:
| from dotenv import load_dotenv |
(i.e. just delete lines 8 and 10 — the os.getenv(...) calls below already do the right thing if env vars are set externally).
| #import logging module, then set logging level to DEBUG to see detailed logs during replay | ||
| import logging | ||
| logging.basicConfig( | ||
| level=logging.DEBUG, | ||
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | ||
| ) |
There was a problem hiding this comment.
🟡 Suggestion — module-level logging.basicConfig(level=DEBUG) is loud. Setting the root logger to DEBUG at import time floods the console with every dependency's debug output (litellm, httpx, etc.) and makes the example output hard to read. Consider removing it, or guarding behind something like if os.getenv("DEBUG"). The SDK already wires up its own logger via get_logger.
|
|
||
| # --- PHASE 1: RECORDING --- | ||
| print("--- Phase 1: Recording Session ---") | ||
| shapshot_agent = Agent( |
There was a problem hiding this comment.
🟡 Suggestion — typo:
| shapshot_agent = Agent( | |
| snapshot_agent = Agent( |
(also update the agent=shapshot_agent reference on line 52)
| else: | ||
| print("\nError: Drift log was not created.") | ||
|
|
||
| print("\nValidation complete!") |
There was a problem hiding this comment.
🟡 Suggestion — example isn't picked up by the example-runner CI. tests/examples/test_examples.py::_TARGET_DIRECTORIES only globs *.py directly in the listed dirs, plus a few hard-coded subdir+main.py pairs. This file lives at 49_snapshot_replay_agent/hello_world_snapshot_replay.py, so it'll never run in CI and is likely to bit-rot. To wire it in:
- Rename the file to
main.py(matching37_llm_profile_store/main.py,43_mixed_marketplace_skills/main.py). - Add
EXAMPLES_ROOT / "01_standalone_sdk" / "49_snapshot_replay_agent"to_TARGET_DIRECTORIESintests/examples/test_examples.py. - Print
EXAMPLE_COST: {cost}at the end of the script (see02_custom_tools.py:229for the standard pattern), which the runner asserts on.
Otherwise this example is documentation-by-trust-me. 🙂
|
|
||
| The replay reuses the base `Agent`'s `_parallel_executor` and `_ActionBatch` machinery. Tools that are safe to run in parallel continue to run in parallel during replay, matching the actual production concurrency profile. | ||
|
|
||
| ### 4.5 Structured Drift Logging (JSONL) |
There was a problem hiding this comment.
🟡 Nit — duplicate section number. Both this and line 270 are labeled 4.5. This one should probably be 4.5 Structured Drift Logging → 4.6 Structured Drift Logging, and renumber the rest.
SnapshotReplayAgent — Technical Documentation
1. Introduction
The Root Problem: Testing Agentic Systems Is Expensive and Non-Deterministic
Modern AI agents powered by large language models (LLMs) pose a fundamental challenge for regression testing. Each test run involves:
As a result, verifying that a refactored tool, a changed system prompt, or a new environment configuration does not break agent behavior requires either expensive end-to-end runs or brittle mocks that don't reflect real tool interactions.
The specific question this work addresses is:
This enables a class of tests called agentic regression tests: deterministic, LLM-free, environment-aware replays that surface behavioral drift in tools without requiring an LLM at all.
2. Approach
Why Not Mock the LLM?
The first candidate approach is to intercept LLM network calls (e.g., via LiteLLM callbacks) and return pre-recorded LLM responses. This has two structural problems:
Event-Sourced Replay: The Adopted Strategy
The OpenHands SDK's
Conversationobject already captures every agent interaction as a structured event stream:ActionEventObservationEventMessageEventThe entire history of a run — decisions, executions, observations — is persisted as an ordered list of these events. This is the snapshot.
The replay strategy is:
persistence_dir) or kept in memory.ActionEvents in order, re-executing each tool call against the live environment.ObservationEventagainst the corresponding recorded one.The LLM is never called. The agent replays the recorded decisions but always collects fresh observations from the real environment.
3. Implementation
3.1 Architecture Overview
SnapshotReplayAgentextends the baseAgentclass, which means it inherits:tools_map)_parallel_executor)_execute_action_event()runner used by production agentsThe only thing it overrides is the
step()method — replacing the LLM call with event-sourced replay logic.3.2 Snapshot Loading (
model_post_init)When the agent is constructed,
model_post_initfires and attempts to load the snapshot. The loading follows a priority chain and includes path resolution:On success,
_prepare_replay_events()splits the loaded events into two internal structures:_replay_events— an ordered list ofActionEvents and agentMessageEvents that the agent will step through one at a time._expected_observations— a dict keyed byaction_id, mapping each original action to its recordedObservationEvent(s). These are the ground truth for drift comparison.3.3 The
step()Method — Replay Execution LoopEach call to
step()advances the replay by one event. The logic is:3.4 Action Execution with Drift Detection (
_execute_actions_with_drift_check)This is the core of the replay engine:
3.5 Drift Detection (
_check_drift)Two observation lists are considered drifted if:
Metadata fields excluded from comparison:
idtimestampaction_idtool_call_idEverything else — the actual tool output payload — is compared directly.
3.6 Drift Log Format
Each action produces one JSONL line in the drift log:
No drift:
{ "Action": { "tool_name": "bash", "action": { "command": "ls /app" }, ... }, "Actual Observation": [{ "observation": { "output": "main.py tests/" }, ... }], "Drift from expected observation": "No drift" }Drift detected:
{ "Action": { "tool_name": "bash", "action": { "command": "cat main.py" }, ... }, "Actual Observation": [{ "observation": { "output": "import fastapi ..." }, ... }], "Drift from expected observation": { "Drift Present": true, "Expected": [{ "observation": { "output": "import flask ..." }, ... }] } }4. Feature Reference
4.1 Dual Snapshot Source
replay_persistence: strpersistence_dirfrom a previousConversationrun.replay_snapshot: list[Event]The persistence directory takes priority. The in-memory list is used as a fallback.
4.2 Graceful Fallback to Real LLM
If
replay_mode=Truebut no snapshot is found (empty list or failed disk load), the agent falls back to real LLM execution and logs a warning. This means it is always safe to setreplay_mode=True— it will never silently fail.4.3 Clean Replay Logs
The agent automatically strips LLM-specific metadata (summaries, internal reasoning content, thinking blocks) from
ActionEvents andMessageEvents before replaying them. This results in a cleaner UI/terminal output that highlights only the replayed tool actions and their live observations.4.4 Session Discovery and Target IDs
The agent supports two ways to locate a specific session within a shared persistence repository:
events/folder.replay_conversation_idis provided, the agent calculates the exact path using the SDK's standard conversation directory logic.4.5 Parallel Tool Execution Preserved
The replay reuses the base
Agent's_parallel_executorand_ActionBatchmachinery. Tools that are safe to run in parallel continue to run in parallel during replay, matching the actual production concurrency profile.4.5 Structured Drift Logging (JSONL)
The drift log is append-only JSONL (one JSON object per line, one per action). Each entry contains:
ActionObservationEvent(s)"No drift"or a dict with"Drift Present": trueand the"Expected"observationsThis format is designed to be machine-parseable for CI dashboards and human-readable for debugging.
4.6 Non-Deterministic Field Exclusion
Timestamps, UUIDs, and run-linking IDs are automatically stripped before drift comparison, eliminating false positives from fields that are expected to change between runs.
4.7 Iterative Refinement Support
The replay respects iterative refinement logic via
batch.finalize(check_iterative_refinement=...), so replays of sessions that used self-correction loops behave consistently with production runs.5. Test Cases
5.1 Test Fixtures and Helpers
Before the tests, two shared helper functions are defined:
_make_events() → tuple[ActionEvent, ObservationEvent]Builds a canonical matched pair of events used across all step tests:
Why
action_event.idforaction_id?ActionEventis a frozen Pydantic model — itsidUUID is generated at construction time and cannot be mutated. This helper captures the ID at construction and links the observation to it correctly._make_conv_mock(events) → MagicMockBuilds a minimal mock
LocalConversationwith a state holding the event list:Why mock?
LocalConversationandConversationStaterequire a full workspace, agent, and persistent state. For pure unit tests of the replay logic, a minimal mock is far simpler and doesn't introduce side effects.Test Tool Fixtures
DummyToolmirrors the real tool pattern in the SDK: it has typedAction/Observationmodels, aToolExecutorsubclass, and acreate()factory. The executor deterministically returns"Done {value}".5.2 Test:
test_replay_agent_initializationPurpose: Verify the snapshot-loading decision logic in
model_post_init.What it tests:
replay_snapshot=[],_actual_replay_modeshould beFalse.replay_snapshot(even just aMessageEvent),_actual_replay_modeshould beTrue.Result: ✅ Passes. Confirms the guard logic that prevents silent failures when no snapshot is available.
5.3 Test:
test_replay_agent_step_no_driftPurpose: End-to-end replay step where the live tool produces the same output as the snapshot.
Scenario:
ActionEvent(dummy, value="hello")+ObservationEvent(result="Done hello")DummyToolexecutor also returns"Done hello""No drift"Key decisions:
ConversationState.get_unmatched_actionsis patched to return[]sostep()proceeds to replay rather than flushing a pending batch.agent._initialize(state)is called to warm up internal private attributes (tool map, executor) without going through a fullLocalConversation.agent._toolsis set directly after_initialize()to injectDummyToolwithout having it registered in the tool spec system.Result: ✅ Passes.
5.4 Test:
test_replay_agent_step_with_driftPurpose: Verify drift is detected and correctly logged when the live tool produces a different result than the snapshot.
Scenario:
ObservationEvent(result="Done hello")DriftingDummyToolreturns"Unexpected result"instead{"Drift Present": true, "Expected": [...]}Key decisions:
DriftingDummyToolis defined inline within the test to keep the scope contained. It inheritsDummyToolbut swaps the executor."Drift from expected observation"key specifically, validating the structured format.Result: ✅ Passes.
5.5 Test Results Summary
test_replay_agent_initializationtest_replay_agent_step_no_drifttest_replay_agent_step_with_driftRun command:
Output:
All pre-commit checks (ruff lint, ruff format, pyright type checking, import dependency rules, tool subclass registration) also pass cleanly.
6. File Locations
openhands-sdk/openhands/sdk/agent/replay_agent.pytests/sdk/agent/test_replay_agent.py