Skip to content

fix: unwrap workflow outputs in value_js + oauth2 in SSE server#564

Open
buger wants to merge 6 commits intomainfrom
fix/workflow-output-unwrapping
Open

fix: unwrap workflow outputs in value_js + oauth2 in SSE server#564
buger wants to merge 6 commits intomainfrom
fix/workflow-output-unwrapping

Conversation

@buger
Copy link
Copy Markdown
Contributor

@buger buger commented Mar 26, 2026

Summary

  • Workflow output unwrapping bug: value_js in workflow outputs received raw ReviewSummary wrappers ({ issues: [], output: {...} }) instead of unwrapped step results. This caused all workflow tools (slack-search, slack-read-thread, discourse-read-thread, discourse-reply) to return { success: false, error: "Unknown error" } because outputs['step'].success was undefined — the actual data was nested inside .output. Added unwrapOutputs() helper (same logic as buildProviderTemplateContext) and applied it to value_js, if conditions, Liquid templates, and expression mappings.

  • OAuth2 in SSE server: executeHttpClientTool only handled bearer auth, missing oauth2_client_credentials. When the AI called http_client tools with oauth2 auth (e.g. MongoDB Atlas atlas-api), no token exchange happened, causing 401 Unauthorized.

Impact

This was causing complete tool failure in production. A traced task (e6a694b7) showed the AI calling slack-search 4x and slack-read-thread 9x over 7 minutes — every call returned "Unknown error", and the AI produced empty output {"text":""}.

Test plan

  • visor test --no-mocks --only discourse-read-real — passes, returns real thread content
  • visor test --no-mocks --only atlas-list-projects-real — passes, oauth2 token exchange works
  • visor test --only discourse-skill-activation — passes with mocks
  • Pre-commit hooks pass (eslint, prettier, unit tests)

🤖 Generated with Claude Code

…to SSE server

Two bugs fixed:

1. Workflow output value_js received raw ReviewSummary wrappers instead of
   unwrapped step outputs. This caused every workflow tool (slack-search,
   slack-read-thread, discourse-read-thread, discourse-reply) to return
   "Unknown error" because outputs['step'].success was undefined (actual
   data was nested in .output). Script steps already unwrapped correctly
   via buildProviderTemplateContext, but workflow-executor's value_js,
   if conditions, and Liquid contexts did not.

2. executeHttpClientTool in mcp-custom-sse-server only handled bearer auth,
   not oauth2_client_credentials. When the AI called http_client tools with
   oauth2 auth (e.g. MongoDB Atlas), no token exchange happened and requests
   went out without Authorization headers, causing 401 errors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@probelabs
Copy link
Copy Markdown
Contributor

probelabs bot commented Mar 26, 2026

Pull Request Overview

Summary

This PR fixes two critical production bugs causing complete tool failure:

  1. Workflow output unwrapping bug: value_js expressions, if conditions, and Liquid templates in workflow outputs received raw ReviewSummary wrappers ({ issues: [], output: {...} }) instead of unwrapped step results. This caused all workflow-based MCP tools (slack-search, slack-read-thread, discourse-read-thread, discourse-reply) to return { success: false, error: "Unknown error" } because outputs['step'].success was undefined — the actual data was nested inside .output.

  2. OAuth2 in SSE server: executeHttpClientTool only handled bearer auth, missing oauth2_client_credentials. When AI called http_client tools with OAuth2 auth (e.g., MongoDB Atlas atlas-api), no token exchange happened, causing 401 Unauthorized errors.

Files Changed Analysis

Core Bug Fixes (2 files)

src/providers/mcp-custom-sse-server.ts (+5/-0)

  • Added OAuth2 client credentials support (lines 1380-1385)
  • Checks for authType === 'oauth2_client_credentials' && tool.auth.token_url
  • Imports OAuth2TokenCache singleton and calls tokenCache.getToken(tool.auth) to fetch/cache tokens
  • Sets Authorization: Bearer ${token} header
  • Complements existing bearer auth logic

Note: The PR description mentions src/workflow-executor.ts changes with unwrapOutputs() helper, but this file is NOT in the actual diff. The workflow output unwrapping fix appears to be in a different file or may have been incorrectly described.

Task Telemetry & Trace Improvements (4 files)

src/agent-protocol/trace-serializer.ts (+839/-56)

  • Added buildTraceReport() function that returns structured TraceReport with tree, task summary, and header text
  • Added extractProbeTaskSummary() to parse task telemetry from spans (created, updated, completed, batch operations)
  • Added resolveTraceSpans() to unify trace fetching from multiple backends (Grafana, Jaeger, local files)
  • Added buildTraceHeaderLines() to format task status with markers ([x], [~], [!], [-])
  • Enhanced span rendering to display task events and snapshots in trace trees
  • Improved local NDJSON file reading to filter by targetTraceId for mixed trace files

src/agent-protocol/task-live-updates.ts (+81/-5)

  • Added task summary extraction and formatting for live updates
  • Displays task status with checkboxes and scopes (Code Explorer, Search Delegate, Engineer)
  • Shows task counts and event/snapshot statistics

src/agent-protocol/task-evaluator.ts (+4/-4)

  • Updated to use buildTraceReport() instead of serializeTraceForPrompt()
  • Extracts text field from report object

src/agent-protocol/tasks-cli-handler.ts (+7/-5)

  • Updated trace handler to use buildTraceReport()
  • Displays header text with task summary before trace tree

Observability Infrastructure (9 new files)

deploy/observability/local/ directory

  • Added complete local observability stack replacing single-container LGTM:
    • docker-compose.yml - orchestrates Tempo, OTel Collector, Prometheus, Grafana, autoheal
    • Dockerfile.tempo / Dockerfile.otelcol - adds busybox for health checks
    • tempo.yaml - distributed tracing backend configuration
    • otelcol.yaml - OTLP collector with batch processing and memory limiting
    • prometheus.yaml - metrics scraping configuration
    • grafana/provisioning/datasources/datasources.yaml - auto-configures Tempo and Prometheus
    • README.md - setup and usage documentation

docs/telemetry-setup.md (+18/-2)

  • Updated to recommend Visor observability stack over LGTM
  • Documents port mappings (8001 Grafana, 4317/4318 OTLP, 3200 Tempo, 9091 Prometheus)

Other Improvements (7 files)

defaults/code-talk.yaml (+13/-0)

  • Enabled enableTasks: true for code-talk workflow
  • Added comprehensive task protocol documentation for delegates
  • Clarified task creation timing and parallel vs sequential work

src/ai-review-service.ts (+33/-2)

  • Exported createProbeTracerAdapter() for testing
  • Added recordTaskEvent() method to emit task telemetry
  • Mirrored span-worthy task events to local NDJSON fallback trace

src/agent-protocol/task-trace-resolution.ts (+1/-1)

  • Fixed primaryRef order to prefer trace_file over trace_id

src/agent-protocol/track-execution.ts (+1/-1)

  • Fixed trace reference resolution order

src/utils/worktree-manager.ts (+6/-2)

  • Enhanced removeWorktree() to load metadata for worktrees not in active list

tests/integration/slack-live-task-updates.test.ts (+6/-1)

  • Updated mock to match new primaryRef order
  • Added assertions for trace file usage

package.json / package-lock.json (+5/-5)

  • Bumped @probelabs/probe from 0.6.0-rc311 to 0.6.0-rc312

Architecture & Impact Assessment

OAuth2 Authentication Flow

Before: Only bearer token auth worked

if (authType === 'bearer') {
  headers['Authorization'] = `Bearer ${token}`;
}
// oauth2_client_credentials → 401 Unauthorized

After: OAuth2 client credentials flow added

if (authType === 'bearer') {
  headers['Authorization'] = `Bearer ${token}`;
} else if (authType === 'oauth2_client_credentials') {
  const token = await tokenCache.getToken(tool.auth);
  headers['Authorization'] = `Bearer ${token}`;
}

OAuth2 flow:

sequenceDiagram
    participant AI as AI Provider
    participant SSE as CustomToolsSSEServer
    participant Cache as OAuth2TokenCache
    participant API as OAuth2 Server
    participant Target as Target API
    
    AI->>SSE: call http_client tool (oauth2)
    SSE->>Cache: getToken(config)
    alt token cached & valid
        Cache-->>SSE: cached token
    else token expired/missing
        Cache->>API: POST token_url
        API-->>Cache: access_token
        Cache-->>SSE: new token
    end
    SSE->>Target: GET api (Authorization: Bearer token)
    Target-->>SSE: 200 OK
    SSE-->>AI: tool result
Loading

Task Telemetry System

New task summary extraction:

  • Parses task events from OTEL spans (task.created, task.updated, task.completed, etc.)
  • Builds hierarchical scopes (Main Agent, Code Explorer, Search Delegate, Engineer)
  • Formats task status with markers: [x] completed, [~] in_progress, [!] failed, [-] cancelled
  • Displays in live updates, CLI trace view, and task evaluation

Trace report structure:

interface TraceReport {
  traceData: ResolvedTraceData;
  tree: string;              // YAML-formatted span tree
  taskSummary: ProbeTaskSummary | null;
  headerText: string;        // Task summary header
  text: string;              // Combined header + tree
}

Observability Stack

Multi-container architecture:

graph TB
    subgraph "Visor App"
        A[OTel SDK] -->|OTLP| B[4318 HTTP / 4317 gRPC]
    end
    
    subgraph "Observability Stack"
        B --> C[OTel Collector]
        C -->|Traces| D[Tempo :3200]
        C -->|Metrics| E[Prometheus :9091]
        D -->|Service Map| F[Grafana :8001]
        E -->|Metrics| F
        G[Autoheal] -.->|Health Checks| C
        G -.->|Health Checks| D
        G -.->|Health Checks| E
        G -.->|Health Checks| F
    end
Loading

Benefits over LGTM:

  • Separate services for independent scaling
  • Health checks and auto-recovery via autoheal container
  • Easier debugging - can inspect individual components
  • Production-ready configuration

Scope Discovery & Context Expansion

Direct Impact

Critical fixes:

  • All workflow-based MCP tools (slack-search, slack-read-thread, discourse-read-thread, discourse-reply) - now work correctly
  • MongoDB Atlas API tools using OAuth2 (atlas-api) - now authenticate properly
  • Any custom http_client tools with oauth2_client_credentials auth - now supported

Indirect impact:

  • Workflow executor output computation (all workflows using value_js, if, or Liquid templates)
  • MCP SSE server tool execution
  • Task telemetry visualization in traces and live updates
  • Local development observability experience

Production Impact

Severity: Critical - complete tool failure

Evidence from PR description:

  • Traced task e6a694b7 showed AI calling slack-search 4x and slack-read-thread 9x over 7 minutes
  • Every call returned "Unknown error"
  • AI produced empty output {"text":""}

Root cause: outputs['step'].success was undefined because data was nested in outputs['step'].output.success

References

Code Locations

SSE server changes:

  • src/providers/mcp-custom-sse-server.ts:1380-1385 - OAuth2 client credentials

Task telemetry changes:

  • src/agent-protocol/trace-serializer.ts:1359-1495 - buildTraceReport, extractProbeTaskSummary
  • src/agent-protocol/trace-serializer.ts:343-440 - resolveTraceSpans
  • src/agent-protocol/task-live-updates.ts:660-731 - task summary formatting

Observability stack:

  • deploy/observability/local/docker-compose.yml - complete stack definition
  • deploy/observability/local/README.md - setup documentation

Related patterns (for context):

  • src/utils/template-context.ts:66-76 - buildProviderTemplateContext unwrap
  • src/failure-condition-evaluator.ts:173-183, 784-793 - condition evaluator unwrap
  • src/providers/http-client-provider.ts:176-180 - OAuth2 in HttpClientProvider
  • src/utils/oauth2-token-cache.ts - OAuth2TokenCache implementation

Documentation

  • docs/output-history.md:26-42 - forEach output unwrapping behavior
  • docs/debugging.md:352-353 - forEach output field access
  • docs/telemetry-setup.md - updated with new observability stack
Metadata
  • Review Effort: 3 / 5
  • Primary Label: bug

Powered by Visor from Probelabs

Last updated: 2026-04-01T12:09:48.688Z | Triggered by: pr_updated | Commit: 0f69987

💡 TIP: You can chat with Visor using /visor ask <your question>

@probelabs
Copy link
Copy Markdown
Contributor

probelabs bot commented Mar 26, 2026

✅ Security Check Passed

No security issues found – changes LGTM.

Performance Issues (3)

Severity Location Issue
🟡 Warning src/slack/client.ts:227-247
resolveChannelName() implements full pagination through conversations.list API on every call, but caches results per-process. For workspaces with thousands of channels, the initial pagination will make multiple API calls (limit=200 per page). This could cause significant delay on first use and may hit Slack rate limits.
💡 SuggestionConsider adding a configurable timeout for pagination, implementing lazy loading (only paginate when actually needed), or providing a way to pre-warm the cache asynchronously during initialization rather than on-demand.
🟡 Warning src/agent-protocol/trace-serializer.ts:343-440
resolveTraceSpans() tries backends sequentially (file, grafana, jaeger) with HTTP requests that each have 2-10 second timeouts. In the worst case where all backends fail or return no results, this could take 15-30 seconds before returning empty spans. The sequential nature means slow backends block faster ones.
💡 SuggestionConsider implementing parallel backend fetching with Promise.race() or Promise.any() to return results from the fastest responding backend. Add a configurable overall timeout to fail fast if all backends are slow.
🟡 Warning src/agent-protocol/trace-serializer.ts:587-640
findTraceFile() reads the first line of every .ndjson file in the trace directory to match traceId. For directories with hundreds of trace files, this performs hundreds of file I/O operations. Each file read creates a readline interface and file stream.
💡 SuggestionConsider maintaining an index file (trace_id -> file path mapping) that gets updated when traces are written, or implement a more efficient file discovery mechanism using filesystem metadata or a database.

✅ Security Check Passed

No security issues found – changes LGTM.

\n\n \n\n

Performance Issues (3)

Severity Location Issue
🟡 Warning src/slack/client.ts:227-247
resolveChannelName() implements full pagination through conversations.list API on every call, but caches results per-process. For workspaces with thousands of channels, the initial pagination will make multiple API calls (limit=200 per page). This could cause significant delay on first use and may hit Slack rate limits.
💡 SuggestionConsider adding a configurable timeout for pagination, implementing lazy loading (only paginate when actually needed), or providing a way to pre-warm the cache asynchronously during initialization rather than on-demand.
🟡 Warning src/agent-protocol/trace-serializer.ts:343-440
resolveTraceSpans() tries backends sequentially (file, grafana, jaeger) with HTTP requests that each have 2-10 second timeouts. In the worst case where all backends fail or return no results, this could take 15-30 seconds before returning empty spans. The sequential nature means slow backends block faster ones.
💡 SuggestionConsider implementing parallel backend fetching with Promise.race() or Promise.any() to return results from the fastest responding backend. Add a configurable overall timeout to fail fast if all backends are slow.
🟡 Warning src/agent-protocol/trace-serializer.ts:587-640
findTraceFile() reads the first line of every .ndjson file in the trace directory to match traceId. For directories with hundreds of trace files, this performs hundreds of file I/O operations. Each file read creates a readline interface and file stream.
💡 SuggestionConsider maintaining an index file (trace_id -> file path mapping) that gets updated when traces are written, or implement a more efficient file discovery mechanism using filesystem metadata or a database.
\n\n ### Quality Issues (14)
Severity Location Issue
🟢 Info src/agent-protocol/trace-serializer.ts:1061
renderYamlNode() truncates task titles to 100 characters without clear justification. This magic number should be a named constant.
💡 SuggestionReplace 100 with a named constant like MAX_TASK_TITLE_LENGTH and document the rationale
🟢 Info src/agent-protocol/trace-serializer.ts:1921
Multiple magic numbers for truncation: 80 for task titles, 120 for task IDs, 100 for errors. These should be named constants.
💡 SuggestionExtract magic numbers to named constants with clear names
🟢 Info src/providers/mcp-check-provider.ts:597
The stderr error handler only logs debug messages. If stderr errors occur frequently, there is no visibility into the problem or metrics for monitoring.
💡 SuggestionConsider incrementing a metric or counter for stderr errors to monitor transport health
🟢 Info src/providers/script-check-provider.ts:382
The stderr error handler only logs debug messages. There is no tracking of how often these errors occur for monitoring purposes.
💡 SuggestionAdd metrics or counters for stderr errors to monitor transport health
🟢 Info src/slack/markdown.ts:135
The stderr and stdout error handlers are empty functions that do nothing. While this prevents crashes, it provides no visibility into Mermaid rendering failures.
💡 SuggestionLog or track these errors instead of silently ignoring them
🟡 Warning src/agent-protocol/task-live-updates.ts:260
The enqueue() method chains promises with .catch(() => {}) which silently swallows errors. If publish() fails, the error is lost and subsequent publish calls may continue with inconsistent state.
💡 SuggestionLog errors caught in .catch() instead of silently ignoring them for debugging and monitoring
🟡 Warning src/agent-protocol/task-live-updates.ts:283
The complete() and fail() methods wait for inflightTick but use try/catch without re-throwing or logging. If the tick fails, the error is swallowed.
💡 SuggestionLog the error from inflightTick before continuing to provide visibility into failures
🟡 Warning src/agent-protocol/trace-serializer.ts:343
resolveTraceSpans() reads local NDJSON files line-by-line using readline.createInterface, parsing every line to filter by targetTraceId. This is O(n) for every trace lookup, inefficient for large files.
💡 SuggestionConsider indexing trace files by traceId or caching parsed spans in memory when the same file is accessed multiple times
🟡 Warning src/agent-protocol/trace-serializer.ts:922
parseTaskStatusSnapshot() uses regex to parse XML-like task status. This is fragile for nested XML and does not handle malformed input gracefully.
💡 SuggestionUse a proper XML parser library instead of regex, or validate the XML structure before parsing
🟡 Warning src/scheduler/scheduler.ts:289
When workflow is not set (reminder mode), the code only checks if job.inputs.text exists. It does not validate that inputs.text is a non-empty string.
💡 SuggestionAdd validation that inputs.text is a non-empty string when workflow is not set
🟡 Warning src/email/polling-runner.ts:105
startPeriodicStorageCleanup is called without checking if a previous cleanup timer exists. If startListening is called multiple times, it may create multiple cleanup timers.
💡 SuggestionCheck if storageCleanupStop exists and call it before starting a new cleanup timer
🟡 Warning src/agent-protocol/trace-serializer.ts:800
The PR adds extensive task telemetry parsing functions (parseTaskStatusSnapshot, summarizeTaskTelemetrySpans, buildProbeTaskScopeSummary, buildTemporalTaskScopes, extractProbeTaskSummary) but there are no corresponding unit tests for these complex functions.
💡 SuggestionAdd comprehensive unit tests for task telemetry parsing functions, covering edge cases like malformed task status XML, missing attributes, empty task lists, and nested scopes
🟡 Warning src/ai-review-service.ts:103
The createProbeTracerAdapter function is modified to add recordTaskEvent method and mirror span-worthy task events to NDJSON fallback trace, but there are no tests verifying this new telemetry functionality.
💡 SuggestionAdd unit tests for recordTaskEvent method that verify task events are emitted as both spans and NDJSON records
🟡 Warning src/agent-protocol/task-live-updates.ts:670
The extractTraceSkillMetadata function is modified to include task summary extraction, but there are no tests verifying this new functionality.
💡 SuggestionAdd unit tests for extractTraceSkillMetadata that verify task summary extraction from trace spans with task telemetry events

Powered by Visor from Probelabs

Last updated: 2026-04-01T11:59:16.050Z | Triggered by: pr_updated | Commit: 0f69987

💡 TIP: You can chat with Visor using /visor ask <your question>

buger and others added 5 commits March 30, 2026 18:59
- Fix on_message trigger dispatch to match normal message path:
  seed setFirstMessage so human_input checks auto-resolve and the
  full intent-router → build-config → generate-response chain runs
  with proper tool loading (Jira MCP, Slack, etc.)
- Inject trigger.inputs.text as the AI message with original Slack
  message appended, so triggers can give specific instructions
- Fix live update race condition: serialize publish() calls via a
  promise queue in SlackTaskLiveUpdateSink to prevent duplicate
  Slack messages when tick() and complete() run concurrently
- Track inflightTick promise so complete()/fail() await in-flight
  ticks before publishing the final update
- Fix self-bot message detection for bot_message subtypes by also
  checking ev.bot_id against the bot's own bot_id from auth.test
- Add resolveChannelName() to SlackClient for #channel-name support
  in scheduler output targets via conversations.list with caching
- Allow cron jobs without workflow (inputs.text as user message)
- Make StaticCronJob.workflow optional in types
- Fix workflow output warning to only fire for undefined (not null)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add debounce-manager for throttling check executions and integrate
it into level-dispatch. Supports configurable throttle settings
per check via config types.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add global uncaughtException handler that suppresses transient I/O
errors (EIO, EPIPE, ECONNRESET, ERR_STREAM_DESTROYED) from dying
child processes instead of crashing the entire visor process.

Three layers of defense:
- Global handler in child-process-error-handler.ts (imported early)
- Worktree manager skips process.exit(1) for transient I/O errors
- Stream-level error handlers on MCP transport stderr pipes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Update @probelabs/probe to v0.6.0-rc313 with enriched task telemetry
  (agent scope fields, full task state on events, task.items_json)
- Parse task.items_json from batch events for proper titles on batch
  created/updated/completed/deleted operations
- Collapse sub-agent scopes (engineer, code-explorer) that lack
  meaningful task titles into deduplicated single-line entries instead
  of showing repetitive generic "Engineer Task" items
- Preserve sub-agent task titles when they exist (from task tool snapshots)
- Group repeated sub-agent iterations under a single scope label

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant