Skip to content

Propagate contextvars.Context through anyio streams without modifying SessionMessage#2298

Merged
Kludex merged 16 commits intomainfrom
try-no-context-field
Mar 31, 2026
Merged

Propagate contextvars.Context through anyio streams without modifying SessionMessage#2298
Kludex merged 16 commits intomainfrom
try-no-context-field

Conversation

@Kludex
Copy link
Copy Markdown
Member

@Kludex Kludex commented Mar 16, 2026

Summary

MCP server handlers currently lose the sender's contextvars.Context. When a client sets context variables (e.g. OpenTelemetry trace/span IDs) before sending a request, the server handler runs in the receive loop's context instead, breaking trace propagation.

This PR propagates contextvars.Context through anyio memory streams without adding any field to SessionMessage, addressing Kludex's review comment on the previous approach (#1996).

How it works

  • ContextSendStream / ContextReceiveStream wrappers capture the sender's context at send() time and expose it via last_context on the receive side
  • BaseSession._receive_loop reads last_context and passes it to RequestResponder, which carries it to the server handler boundary
  • lowlevel/server.py uses context.run(tg.start_soon, handler) so handler tasks inherit the sender's context
  • Client transport post_writer functions restore sender context the same way
  • Protocol-based ReadStream / WriteStream replace concrete MemoryObjectSendStream / MemoryObjectReceiveStream types, so both raw and context-aware streams are accepted
  • create_context_streams[T](n) mirrors anyio's bracket syntax with proper generic propagation

Additional changes

  • Switch coverage config from exclude_lines to exclude_also to inherit coverage.py's built-in defaults (which handle ... stubs and pragma: no cover)

Fixes #1969
Part of #421

Test plan

  • test_context_propagation: sets a contextvar via a context manager (which resets on exit), calls a tool, verifies the server handler sees the sender's value. Fails on main, passes with this PR.
  • All existing tests pass
  • pyright passes with 0 errors

Kludex added 2 commits March 16, 2026 08:49
… SessionMessage

Introduce context-aware stream wrappers (ContextSendStream / ContextReceiveStream)
that capture the sender's contextvars.Context at send() time and expose it on the
receive side via last_context. This enables OpenTelemetry trace propagation,
per-request auth via ContextVars, and other context-dependent use cases across the
anyio memory stream boundary - without adding any field to SessionMessage.

Key changes:
- New _context_streams module with ContextSendStream, ContextReceiveStream, and
  create_context_streams factory (mirrors anyio's bracket syntax API)
- Protocol-based ReadStream/WriteStream in _transport.py, replacing concrete
  MemoryObjectReceiveStream/MemoryObjectSendStream in all parameter types
- All transport stream creation sites use create_context_streams
- BaseSession._receive_loop and client post_writers restore sender context
  via ctx.run(tg.start_soon, handler, message)
- RequestResponder carries context for the session-to-server handler boundary

Github-Issue:#1996
Comment on lines +116 to +129
class _CreateContextStreams:
"""Callable that supports ``create_context_streams[T](n)`` bracket syntax.

Matches anyio's ``create_memory_object_stream`` API style.
"""

def __getitem__(self, _item: Any) -> _CreateContextStreams:
return self

def __call__(self, max_buffer_size: float = 0) -> tuple[ContextSendStream[Any], ContextReceiveStream[Any]]:
return _create_context_streams(max_buffer_size)


create_context_streams = _CreateContextStreams()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: T_Item TypeVar at line 23 is defined but never used (only T is used throughout). Additionally, _CreateContextStreams.__getitem__ returns unparameterized self and __call__ returns tuple[ContextSendStream[Any], ContextReceiveStream[Any]], so create_context_streams[SessionMessage](0) silently discards the type parameter — a static type safety regression from the original anyio.create_memory_object_stream[T]() which properly propagates T to type checkers.

Extended reasoning...

Unused TypeVar

T_Item = TypeVar("T_Item") is defined at line 23 of _context_streams.py but is never referenced anywhere in the file. Only T is used in Generic[T] for ContextSendStream, ContextReceiveStream, and _Envelope. This is leftover dead code from an earlier iteration that should be removed.

Type parameter silently discarded

The _CreateContextStreams class (lines 116-129) implements __getitem__ and __call__ to mimic anyio's create_memory_object_stream[T](n) bracket syntax. However, the type parameter is completely ignored:

def __getitem__(self, _item: Any) -> _CreateContextStreams:
    return self  # type parameter discarded

def __call__(self, ...) -> tuple[ContextSendStream[Any], ContextReceiveStream[Any]]:
    return _create_context_streams(max_buffer_size)  # always returns Any

__getitem__ returns the unparameterized self (not a generic alias), and __call__ always returns Any-typed streams. This means create_context_streams[SessionMessage](0) and create_context_streams[int](0) return identical types to static type checkers.

Concrete proof

Consider the call site in sse.py:

read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0)

Step-by-step:

  1. create_context_streams is an instance of _CreateContextStreams
  2. create_context_streams[SessionMessage | Exception] calls __getitem__(SessionMessage | Exception) → returns self (same _CreateContextStreams instance, no type info retained)
  3. (0) calls __call__(0) → returns tuple[ContextSendStream[Any], ContextReceiveStream[Any]]

The SessionMessage | Exception type parameter is silently lost. In contrast, anyio's create_memory_object_stream is a class inheriting from tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]], so create_memory_object_stream[SessionMessage](0) correctly returns tuple[MemoryObjectSendStream[SessionMessage], MemoryObjectReceiveStream[SessionMessage]] to type checkers.

Impact

This is purely a static typing concern — there is zero runtime impact, and pyright passes with 0 errors because Any is compatible with everything. The bracket syntax creates a false impression of type safety. Call sites happen to work because variables are assigned to concretely typed fields, providing sufficient type context.

Fix

Remove the unused T_Item TypeVar. For the type safety regression, either use @overload declarations or make _CreateContextStreams properly generic (e.g., using __class_getitem__ to return a properly parameterized callable, similar to anyio's approach of inheriting from tuple).

Kludex added 2 commits March 16, 2026 09:13
… widening

- Replace task group + ctx.run(tg.start_soon, ...) with direct
  await sender_ctx.run(handler, msg) to avoid ExceptionGroup wrapping
  that would prevent ClosedResourceError from being caught
- Move ReadStream/WriteStream protocols to mcp.shared._stream_protocols
  so shared/server modules don't depend on client internals
- Restore write stream type narrowing in MessageStream (SessionMessage
  only, not SessionMessage | Exception)
- Remove unused T_Item TypeVar
ctx.run(async_fn) only sets context during coroutine creation, not
execution. For session._receive_loop, pass sender_ctx explicitly to
the handler which stores it on RequestResponder.context. For client
post_writers (sse/streamable_http), restore ctx.run(tg.start_soon, ...)
so the httpx calls actually run in the sender's context.
Copy link
Copy Markdown
Contributor

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All my previous bug reports have been addressed by commits 32565b3 and b321dd2 — nice work. However, this is a large PR (18 files) introducing new core abstractions for context propagation that warrants human review of the overall design before merging.

Extended reasoning...

Overview

This PR adds contextvars.Context propagation through anyio memory streams to support OpenTelemetry trace propagation from client to server handlers. It introduces two new modules (_context_streams.py and _stream_protocols.py), modifies BaseSession._receive_loop and RequestResponder to carry context, updates all transport stream creation sites (stdio, SSE, WebSocket, StreamableHTTP), and changes type annotations from concrete MemoryObjectSendStream/MemoryObjectReceiveStream to protocol-based ReadStream/WriteStream.

Security risks

No security concerns identified. The changes are purely about context propagation for observability — no auth, crypto, or permission logic is touched.

Level of scrutiny

This PR touches core infrastructure code in shared/session.py (the _receive_loop and RequestResponder), all transport implementations, and introduces new abstractions that every transport now depends on. The design decisions — using envelope-based context capture in streams, protocol-based stream typing, explicit context parameter passing — are significant architectural choices that should be reviewed by a maintainer familiar with the codebase. A human should verify that the OTel propagation chain actually works end-to-end and that the new stream protocols are the right abstraction for the long term.

Other factors

All previously identified bugs have been fixed in follow-up commits. The author addressed feedback about ExceptionGroup wrapping, Context.run() with async functions, reverse dependencies, type widening, and unused TypeVars. Tests pass and pyright reports 0 errors. The code quality is solid, but the scope and architectural impact warrant human oversight.

Comment on lines +19 to +51
class ReadStream(Protocol[T_co]): # pragma: no branch
"""Protocol for reading items from a stream.

Consumers that need the sender's context should use
``getattr(stream, 'last_context', None)``.
"""

async def receive(self) -> T_co: ... # pragma: no branch
async def aclose(self) -> None: ... # pragma: no branch
def __aiter__(self) -> ReadStream[T_co]: ... # pragma: no branch
async def __anext__(self) -> T_co: ... # pragma: no branch
async def __aenter__(self) -> Self: ... # pragma: no branch
async def __aexit__( # pragma: no branch
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None: ...


@runtime_checkable
class WriteStream(Protocol[T_contra]): # pragma: no branch
"""Protocol for writing items to a stream."""

async def send(self, item: T_contra, /) -> None: ... # pragma: no branch
async def aclose(self) -> None: ... # pragma: no branch
async def __aenter__(self) -> Self: ... # pragma: no branch
async def __aexit__( # pragma: no branch
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None: ...
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are there a bunch of no branch pragmas here

- Remove `# pragma: no branch` and `@runtime_checkable` from stream protocols
- Replace `_CreateContextStreams` with a proper generic class inheriting from
  `tuple[ContextSendStream[T], ContextReceiveStream[T]]`, matching anyio's
  `create_memory_object_stream` pattern so bracket syntax propagates types
- Update `MessageStream` type alias to match actual stream creation
await self._handle_incoming(message)
continue

sender_ctx: contextvars.Context | None = getattr(self._read_stream, "last_context", None)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you using getattr? We shoudn't be using it!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think we should.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that we have this "special" stream now, but the type should be ReadStream (to not be a breaking change).

Kludex added 7 commits March 31, 2026 10:16
# Conflicts:
#	src/mcp/client/sse.py
#	src/mcp/client/streamable_http.py
#	src/mcp/server/lowlevel/server.py
Coverage branch tracking reports missing arcs on Protocol stub methods
(def -> exit) because they are never called at runtime.
…l/python-sdk into try-no-context-field

# Conflicts:
#	pyproject.toml
No longer needed now that exclude_also inherits coverage.py's built-in
default pattern which handles ... stubs including branch arcs.
Copy link
Copy Markdown
Contributor

@maxisbey maxisbey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a unit test which actually tests contextvar propogation?

Kludex added 3 commits March 31, 2026 11:44
The test sets a contextvar via a context manager that resets it on exit,
so only a properly snapshotted context will preserve the value into the
server handler. This test fails on main and passes with context streams.
@Kludex Kludex merged commit e6235d1 into main Mar 31, 2026
29 checks passed
@Kludex Kludex deleted the try-no-context-field branch March 31, 2026 16:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Propagate ContextVars to Transport Layer in MCP Clients

2 participants