Skip to content

fix: handle premature stream termination for Anthropic (#1868)#2047

Open
gautamsirdeshmukh wants to merge 1 commit intostrands-agents:mainfrom
gautamsirdeshmukh:main
Open

fix: handle premature stream termination for Anthropic (#1868)#2047
gautamsirdeshmukh wants to merge 1 commit intostrands-agents:mainfrom
gautamsirdeshmukh:main

Conversation

@gautamsirdeshmukh
Copy link
Copy Markdown

Problem

The Anthropic provider's stream method tries to read event.message.usage from the last iterated stream event to extract token usage metadata. However, if the stream terminates prematurely and the last stream event's .message attribute has not yet been populated with usage, this line crashes with an AttributeError.

Solution

Instead of checking event.message.usage for the last stream event, we now call Anthropic SDK's stream.get_final_message() method, which returns a "message snapshot" accumulated from all received events rather than relying on the last event's state. This call is wrapped in a try/except/else block, so that if it fails (which is only possible when zero events were received) we log a warning instead of crashing.

Possible Concerns

One may worry that merely logging a warning when get_final_message() call fails could lead to undercounted token usage. However, this method only fails when the stream yields zero events - in which case, there is no usage data to report anyway. If one or more events were received, the Anthropic SDK guarantees that the snapshot contains usage data (initialized by the mandatory message_start event), and get_final_message() will succeed.

Related Issues

#1868

Documentation PR

N/A

Type of Change

Bug fix

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare (added a few tests to cover premature termination + empty stream cases)
  • I also performed a demo of 6 scenarios to compare behavior of old vs new code (for Sonnet and Opus)
======================================================================
  #1868 Fix: Before vs After — claude-sonnet-4-6
======================================================================

  ──────────────────────────────────────────────────────────────────
  Scenario 1: Normal completion
  Full stream with message_stop. Both paths should succeed.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ Last event type: message_stop
    ✅ event.message.usage → {'input_tokens': 100, 'output_tokens': 50}

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=50, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 50}

  ──────────────────────────────────────────────────────────────────
  Scenario 2: Premature termination (TextEvent)
  Stream dies after TextEvent. TextEvent has no .message attribute.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'text']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: text
    ❌ event.message.usage → CRASH: AttributeError: 'TextEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=25, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 25}

  ──────────────────────────────────────────────────────────────────
  Scenario 3: Premature termination (ContentBlockDeltaEvent)
  Stream dies after content_block_delta. No .message on this event type.
  Stream events: ['message_start', 'content_block_delta']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_delta
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockDeltaEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=10, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 10}

  ──────────────────────────────────────────────────────────────────
  Scenario 4: Premature termination (ContentBlockStartEvent)
  Stream dies right after content_block_start. No content delivered.
  Stream events: ['message_start', 'content_block_start']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_start
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStartEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=200, output_tokens=0, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 200, 'output_tokens': 0}

  ──────────────────────────────────────────────────────────────────
  Scenario 5: Content delivered, dies before message_stop
  Full content block delivered but stream dies before message_stop.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_stop
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStopEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=150, output_tokens=40, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 150, 'output_tokens': 40}

  ──────────────────────────────────────────────────────────────────
  Scenario 6: Empty stream (zero events)
  Immediate connection failure. No events received at all.
  Stream events: []
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: (none)
    ❌ event.message.usage → CRASH: UnboundLocalError: cannot access local variable 'event' where it is not associated with a value

  NEW (stream.get_final_message() snapshot):
    ⚠️  Events: []
    ⚠️  get_final_message() → raised exception (caught)
    ⚠️  snapshot.usage → not available (warning logged)

======================================================================
  #1868 Fix: Before vs After — claude-opus-4-6
======================================================================

  ──────────────────────────────────────────────────────────────────
  Scenario 1: Normal completion
  Full stream with message_stop. Both paths should succeed.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ Last event type: message_stop
    ✅ event.message.usage → {'input_tokens': 100, 'output_tokens': 50}

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=50, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 50}

  ──────────────────────────────────────────────────────────────────
  Scenario 2: Premature termination (TextEvent)
  Stream dies after TextEvent. TextEvent has no .message attribute.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'text']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: text
    ❌ event.message.usage → CRASH: AttributeError: 'TextEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=25, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 25}

  ──────────────────────────────────────────────────────────────────
  Scenario 3: Premature termination (ContentBlockDeltaEvent)
  Stream dies after content_block_delta. No .message on this event type.
  Stream events: ['message_start', 'content_block_delta']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_delta
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockDeltaEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=10, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 10}

  ──────────────────────────────────────────────────────────────────
  Scenario 4: Premature termination (ContentBlockStartEvent)
  Stream dies right after content_block_start. No content delivered.
  Stream events: ['message_start', 'content_block_start']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_start
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStartEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=200, output_tokens=0, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 200, 'output_tokens': 0}

  ──────────────────────────────────────────────────────────────────
  Scenario 5: Content delivered, dies before message_stop
  Full content block delivered but stream dies before message_stop.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_stop
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStopEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=150, output_tokens=40, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 150, 'output_tokens': 40}

  ──────────────────────────────────────────────────────────────────
  Scenario 6: Empty stream (zero events)
  Immediate connection failure. No events received at all.
  Stream events: []
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: (none)
    ❌ event.message.usage → CRASH: UnboundLocalError: cannot access local variable 'event' where it is not associated with a value

  NEW (stream.get_final_message() snapshot):
    ⚠️  Events: []
    ⚠️  get_final_message() → raised exception (caught)
    ⚠️  snapshot.usage → not available (warning logged)

======================================================================
  Done.
======================================================================

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@gautamsirdeshmukh
Copy link
Copy Markdown
Author

gautamsirdeshmukh commented Apr 3, 2026

Looking into why 1-3 integ tests continue to time out intermittently (this change to the Anthropic stream method has zero connection to the failed cases for multi-agent executions, concurrency for tests may need to be optimized).

Edit: Looks like #2044 is seeing the same test failure. Certainly unrelated to either change, as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants