-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Add basic OpenTelemetry tracing for client and server requests #2381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bbdd99b
e1ebbc7
7795f34
e2d4b03
0e0b746
54f8787
a700ab9
2369988
f43dc4a
0391854
1ee1dfd
d9dcc85
f79767e
8ae416e
b47c9c5
e00d336
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| """OpenTelemetry helpers for MCP.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Iterator | ||
| from contextlib import contextmanager | ||
| from typing import Any | ||
|
|
||
| from opentelemetry.context import Context | ||
| from opentelemetry.propagate import extract, inject | ||
| from opentelemetry.trace import SpanKind, get_tracer | ||
|
|
||
| _tracer = get_tracer("mcp-python-sdk") | ||
|
|
||
|
|
||
| @contextmanager | ||
| def otel_span( | ||
| name: str, | ||
| *, | ||
| kind: SpanKind, | ||
| attributes: dict[str, Any] | None = None, | ||
| context: Context | None = None, | ||
| ) -> Iterator[Any]: | ||
| """Create an OTel span.""" | ||
| with _tracer.start_as_current_span(name, kind=kind, attributes=attributes, context=context) as span: | ||
| yield span | ||
|
|
||
|
|
||
| def inject_trace_context(meta: dict[str, Any]) -> None: | ||
| """Inject W3C trace context (traceparent/tracestate) into a `_meta` dict.""" | ||
| inject(meta) | ||
|
|
||
|
|
||
| def extract_trace_context(meta: dict[str, Any]) -> Context: | ||
| """Extract W3C trace context from a `_meta` dict.""" | ||
| return extract(meta) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,11 @@ | |
|
|
||
| import anyio | ||
| from anyio.streams.memory import MemoryObjectSendStream | ||
| from opentelemetry.trace import SpanKind | ||
| from pydantic import BaseModel, TypeAdapter | ||
| from typing_extensions import Self | ||
|
|
||
| from mcp.shared._otel import inject_trace_context, otel_span | ||
| from mcp.shared._stream_protocols import ReadStream, WriteStream | ||
| from mcp.shared.exceptions import MCPError | ||
| from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage | ||
|
|
@@ -268,24 +270,36 @@ async def send_request( | |
| self._progress_callbacks[request_id] = progress_callback | ||
|
|
||
| try: | ||
| jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data) | ||
| await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata)) | ||
|
|
||
| # request read timeout takes precedence over session read timeout | ||
| timeout = request_read_timeout_seconds or self._session_read_timeout_seconds | ||
|
|
||
| try: | ||
| with anyio.fail_after(timeout): | ||
| response_or_error = await response_stream_reader.receive() | ||
| except TimeoutError: | ||
| class_name = request.__class__.__name__ | ||
| message = f"Timed out while waiting for response to {class_name}. Waited {timeout} seconds." | ||
| raise MCPError(code=REQUEST_TIMEOUT, message=message) | ||
|
|
||
| if isinstance(response_or_error, JSONRPCError): | ||
| raise MCPError.from_jsonrpc_error(response_or_error) | ||
| else: | ||
| return result_type.model_validate(response_or_error.result, by_name=False) | ||
| target = request_data.get("params", {}).get("name") | ||
| span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}" | ||
|
|
||
| with otel_span( | ||
| span_name, | ||
| kind=SpanKind.CLIENT, | ||
| attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id}, | ||
| ): | ||
| # Inject W3C trace context into _meta (SEP-414). | ||
| meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {}) | ||
| inject_trace_context(meta) | ||
|
Comment on lines
+281
to
+283
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Nit: Extended reasoning...What the bug isAt line 280-282 of meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {})
inject_trace_context(meta)This creates both a How it manifestsConsider a {"jsonrpc": "2.0", "id": 0, "method": "ping"}After this PR, the {"jsonrpc": "2.0", "id": 0, "method": "ping", "params": {"_meta": {}}}When no OTel SDK is installed (the default case, since Why existing code doesn't prevent itThe ImpactThe practical impact is low. Suggested fixOnly create the tmp: dict[str, Any] = {}
inject_trace_context(tmp)
if tmp:
request_data.setdefault("params", {}).setdefault("_meta", {}).update(tmp)Step-by-step proof
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An empty |
||
|
|
||
| jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data) | ||
| await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata)) | ||
|
|
||
| # request read timeout takes precedence over session read timeout | ||
| timeout = request_read_timeout_seconds or self._session_read_timeout_seconds | ||
|
|
||
| try: | ||
| with anyio.fail_after(timeout): | ||
| response_or_error = await response_stream_reader.receive() | ||
| except TimeoutError: | ||
| class_name = request.__class__.__name__ | ||
| message = f"Timed out while waiting for response to {class_name}. Waited {timeout} seconds." | ||
| raise MCPError(code=REQUEST_TIMEOUT, message=message) | ||
|
|
||
| if isinstance(response_or_error, JSONRPCError): | ||
| raise MCPError.from_jsonrpc_error(response_or_error) | ||
| else: | ||
| return result_type.model_validate(response_or_error.result, by_name=False) | ||
|
|
||
| finally: | ||
| self._response_streams.pop(request_id, None) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import pytest | ||
| from logfire.testing import CaptureLogfire | ||
|
|
||
| from mcp import types | ||
| from mcp.client.client import Client | ||
| from mcp.server.mcpserver import MCPServer | ||
|
|
||
| pytestmark = pytest.mark.anyio | ||
|
|
||
|
|
||
| # Logfire warns about propagated trace context by default (distributed_tracing=None). | ||
claude[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # This is expected here since we're testing cross-boundary context propagation. | ||
| @pytest.mark.filterwarnings("ignore::RuntimeWarning") | ||
| async def test_client_and_server_spans(capfire: CaptureLogfire): | ||
| """Verify that calling a tool produces client and server spans with correct attributes.""" | ||
| server = MCPServer("test") | ||
|
|
||
| @server.tool() | ||
| def greet(name: str) -> str: | ||
| """Greet someone.""" | ||
| return f"Hello, {name}!" | ||
|
|
||
| async with Client(server) as client: | ||
| result = await client.call_tool("greet", {"name": "World"}) | ||
|
|
||
| assert isinstance(result.content[0], types.TextContent) | ||
| assert result.content[0].text == "Hello, World!" | ||
|
|
||
| spans = capfire.exporter.exported_spans_as_dict() | ||
| span_names = {s["name"] for s in spans} | ||
|
|
||
| assert "MCP send tools/call greet" in span_names | ||
| assert "MCP handle tools/call greet" in span_names | ||
|
|
||
| client_span = next(s for s in spans if s["name"] == "MCP send tools/call greet") | ||
| server_span = next(s for s in spans if s["name"] == "MCP handle tools/call greet") | ||
|
|
||
| assert client_span["attributes"]["mcp.method.name"] == "tools/call" | ||
| assert server_span["attributes"]["mcp.method.name"] == "tools/call" | ||
|
|
||
| # Server span should be in the same trace as the client span (context propagation). | ||
| assert server_span["context"]["trace_id"] == client_span["context"]["trace_id"] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡
# pragma: no branchon line 482 is incorrect —PingRequesthasparams: RequestParams | None = None, so when a ping arrivesreq.paramsisNoneand the false branch is genuinely taken. Per CLAUDE.md, this pragma should only be used for coverage.py->exitarc misreports on nestedasync with, not for real conditional branches. Remove the pragma to avoid masking missing test coverage for theNone-params path.Extended reasoning...
What the bug is
The PR adds
# pragma: no branchto the conditionif hasattr(req, "params") and req.params is not None:atserver.py:482. This pragma tells coverage.py that the false branch of this conditional is never taken, suppressing any coverage warning for the missing branch.Why this is incorrect
PingRequestis defined asRequest[RequestParams | None, ...]withparams: RequestParams | None = None(seetypes/_types.py:589). When the server handles a ping request,req.paramsISNone, so the condition evaluates toFalseand the false branch is genuinely executed —task_metadatastaysNoneand execution falls through to constructing theServerRequestContext.Step-by-step proof
PingRequestwith defaultparams=None._handle_requestwith this request.hasattr(req, "params")isTrue(PingRequest always has aparamsattribute), butreq.params is not NoneisFalse.False, so the body (task_metadata = getattr(req.params, "task", None)) is skipped.task_metadataremainsNonefrom its initialization on line 481.# pragma: no branchtells coverage.py this false branch never happens, which is wrong.Project convention violated
The project's CLAUDE.md (lines 55-56) explicitly states that
# pragma: no branchshould only be used when coverage.py misreports the->exitarc for nestedasync withblocks on Python 3.11+. This is a genuine conditional branch, not an async-with exit arc issue.Impact
The impact is limited to test coverage reporting — this has zero runtime effect. The pragma was not present in the original code (visible in the diff) and was newly added by this PR. It could mask the fact that tests don't exercise the
None-params path through this conditional, defeating the purpose of the project's 100% branch coverage requirement.Fix
Simply remove the
# pragma: no branchcomment from line 482. If coverage then reports the false branch as uncovered, add a test that exercises aPingRequest(which hasparams=None) to cover it properly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-existing pragma, not introduced by this PR.