Skip to content

Commit ace6e19

Browse files
feat(client): add transport abstraction for Client class
- Add transports subpackage with Transport protocol - Add HttpTransport for Streamable HTTP connections - Add SSETransport for legacy SSE connections - Move InMemoryTransport to transports subpackage - Update Client to accept Server, FastMCP, Transport, or URL string - Add _infer_transport() helper for automatic transport selection - URL strings automatically use HttpTransport (modern standard) - Add comprehensive tests for transport inference and Client usage This enables connecting to MCP servers over HTTP in addition to the existing in-memory transport for testing.
1 parent bcb07c2 commit ace6e19

File tree

11 files changed

+570
-44
lines changed

11 files changed

+570
-44
lines changed

src/mcp/client/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
"""MCP Client module."""
22

3-
from mcp.client.client import Client
3+
from mcp.client.client import Client, ClientTarget
44
from mcp.client.session import ClientSession
55

66
__all__ = [
77
"Client",
88
"ClientSession",
9+
"ClientTarget",
910
]

src/mcp/client/client.py

Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from pydantic import AnyUrl
1010

1111
import mcp.types as types
12-
from mcp.client._memory import InMemoryTransport
1312
from mcp.client.session import (
1413
ClientSession,
1514
ElicitationFnT,
@@ -18,21 +17,62 @@
1817
MessageHandlerFnT,
1918
SamplingFnT,
2019
)
20+
from mcp.client.transports import HttpTransport, InMemoryTransport, Transport
2121
from mcp.server import Server
2222
from mcp.server.fastmcp import FastMCP
2323
from mcp.shared.session import ProgressFnT
2424

2525
logger = logging.getLogger(__name__)
2626

27+
# Type alias for all accepted target types
28+
ClientTarget = Server[Any] | FastMCP | Transport | str
29+
30+
31+
def _infer_transport(
32+
target: ClientTarget,
33+
*,
34+
raise_exceptions: bool = False,
35+
) -> Transport:
36+
"""Infer the appropriate transport from the target type.
37+
38+
Args:
39+
target: The target to connect to. Can be:
40+
- Server or FastMCP instance: Uses InMemoryTransport
41+
- Transport instance: Uses the transport directly
42+
- str (URL): Uses HttpTransport (Streamable HTTP)
43+
raise_exceptions: For InMemoryTransport, whether to raise exceptions
44+
from the server. Ignored for other transport types.
45+
46+
Returns:
47+
A Transport instance ready to connect.
48+
49+
Raises:
50+
TypeError: If the target type is not recognized.
51+
"""
52+
# Already a transport - use directly
53+
if isinstance(target, Transport):
54+
return target
55+
56+
# Server or FastMCP - use in-memory transport for testing
57+
if isinstance(target, Server | FastMCP):
58+
return InMemoryTransport(target, raise_exceptions=raise_exceptions)
59+
60+
# URL string - use Streamable HTTP transport (modern standard)
61+
# Note: After type narrowing above, target is str here
62+
return HttpTransport(target)
63+
2764

2865
class Client:
2966
"""A high-level MCP client for connecting to MCP servers.
3067
31-
Currently supports in-memory transport for testing. Pass a Server or
32-
FastMCP instance directly to the constructor.
68+
Supports multiple transport types:
69+
- In-memory: Pass a Server or FastMCP instance directly (for testing)
70+
- HTTP: Pass a URL string or HttpTransport instance
71+
- SSE: Pass an SSETransport instance (legacy)
3372
34-
Example:
73+
Examples:
3574
```python
75+
# In-memory testing (recommended for unit tests)
3676
from mcp.client import Client
3777
from mcp.server.fastmcp import FastMCP
3878
@@ -44,21 +84,34 @@ def add(a: int, b: int) -> int:
4484
4585
async with Client(server) as client:
4686
result = await client.call_tool("add", {"a": 1, "b": 2})
87+
88+
# HTTP connection via URL string
89+
async with Client("http://localhost:8000/mcp") as client:
90+
result = await client.call_tool("my_tool", {...})
91+
92+
# HTTP connection with custom headers
93+
from mcp.client.transports import HttpTransport
94+
95+
transport = HttpTransport(
96+
"http://localhost:8000/mcp",
97+
headers={"Authorization": "Bearer token"},
98+
)
99+
async with Client(transport) as client:
100+
result = await client.call_tool("my_tool", {...})
101+
102+
# Legacy SSE connection
103+
from mcp.client.transports import SSETransport
104+
105+
async with Client(SSETransport("http://localhost:8000/sse")) as client:
106+
result = await client.call_tool("my_tool", {...})
47107
```
48108
"""
49109

50-
# TODO(felixweinberger): Expand to support all transport types (like FastMCP 2):
51-
# - Add ClientTransport base class with connect_session() method
52-
# - Add StreamableHttpTransport, SSETransport, StdioTransport
53-
# - Add infer_transport() to auto-detect transport from input type
54-
# - Accept URL strings, Path objects, config dicts in constructor
55-
# - Add auth support (OAuth, bearer tokens)
56-
57110
def __init__(
58111
self,
59-
server: Server[Any] | FastMCP,
112+
target: ClientTarget,
60113
*,
61-
# TODO(Marcelo): When do `raise_exceptions=True` actually raises?
114+
# TODO(Marcelo): When does `raise_exceptions=True` actually raise?
62115
raise_exceptions: bool = False,
63116
read_timeout_seconds: float | None = None,
64117
sampling_callback: SamplingFnT | None = None,
@@ -68,20 +121,24 @@ def __init__(
68121
client_info: types.Implementation | None = None,
69122
elicitation_callback: ElicitationFnT | None = None,
70123
) -> None:
71-
"""Initialize the client with a server.
124+
"""Initialize the client.
72125
73126
Args:
74-
server: The MCP server to connect to (Server or FastMCP instance)
75-
raise_exceptions: Whether to raise exceptions from the server
76-
read_timeout_seconds: Timeout for read operations
77-
sampling_callback: Callback for handling sampling requests
78-
list_roots_callback: Callback for handling list roots requests
79-
logging_callback: Callback for handling logging notifications
80-
message_handler: Callback for handling raw messages
81-
client_info: Client implementation info to send to server
82-
elicitation_callback: Callback for handling elicitation requests
127+
target: The target to connect to. Can be:
128+
- Server or FastMCP instance: Uses in-memory transport (for testing)
129+
- Transport instance: Uses the transport directly
130+
- str (URL): Uses HTTP transport (Streamable HTTP protocol)
131+
raise_exceptions: For in-memory transport, whether to raise exceptions
132+
from the server. Ignored for other transport types.
133+
read_timeout_seconds: Timeout for read operations.
134+
sampling_callback: Callback for handling sampling requests.
135+
list_roots_callback: Callback for handling list roots requests.
136+
logging_callback: Callback for handling logging notifications.
137+
message_handler: Callback for handling raw messages.
138+
client_info: Client implementation info to send to server.
139+
elicitation_callback: Callback for handling elicitation requests.
83140
"""
84-
self._server = server
141+
self._target = target
85142
self._raise_exceptions = raise_exceptions
86143
self._read_timeout_seconds = read_timeout_seconds
87144
self._sampling_callback = sampling_callback
@@ -100,8 +157,8 @@ async def __aenter__(self) -> Client:
100157
raise RuntimeError("Client is already entered; cannot reenter")
101158

102159
async with AsyncExitStack() as exit_stack:
103-
# Create transport and connect
104-
transport = InMemoryTransport(self._server, raise_exceptions=self._raise_exceptions)
160+
# Infer and connect transport
161+
transport = _infer_transport(self._target, raise_exceptions=self._raise_exceptions)
105162
read_stream, write_stream = await exit_stack.enter_async_context(transport.connect())
106163

107164
# Create session
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""Transport implementations for MCP clients.
2+
3+
This module provides transport abstractions for connecting to MCP servers
4+
using different protocols:
5+
6+
- InMemoryTransport: For testing servers without network overhead
7+
- HttpTransport: For Streamable HTTP connections (recommended for HTTP)
8+
- SSETransport: For legacy Server-Sent Events connections
9+
10+
Example:
11+
```python
12+
from mcp.client import Client
13+
from mcp.client.transports import HttpTransport, SSETransport
14+
15+
# Using Streamable HTTP (recommended)
16+
async with Client(HttpTransport("http://localhost:8000/mcp")) as client:
17+
result = await client.call_tool("my_tool", {...})
18+
19+
# Using legacy SSE
20+
async with Client(SSETransport("http://localhost:8000/sse")) as client:
21+
result = await client.call_tool("my_tool", {...})
22+
```
23+
"""
24+
25+
from mcp.client.transports.base import Transport
26+
from mcp.client.transports.http import HttpTransport
27+
from mcp.client.transports.memory import InMemoryTransport
28+
from mcp.client.transports.sse import SSETransport
29+
30+
__all__ = [
31+
"Transport",
32+
"HttpTransport",
33+
"InMemoryTransport",
34+
"SSETransport",
35+
]

src/mcp/client/transports/base.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""Base transport protocol for MCP clients."""
2+
3+
from __future__ import annotations
4+
5+
from collections.abc import AsyncIterator
6+
from contextlib import asynccontextmanager
7+
from typing import Protocol, runtime_checkable
8+
9+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
10+
11+
from mcp.shared.message import SessionMessage
12+
13+
14+
@runtime_checkable
15+
class Transport(Protocol):
16+
"""Protocol for MCP client transports.
17+
18+
All transports must implement a connect() async context manager that yields
19+
a tuple of (read_stream, write_stream) for bidirectional communication.
20+
21+
Example:
22+
```python
23+
class MyTransport:
24+
@asynccontextmanager
25+
async def connect(self):
26+
# Set up connection...
27+
yield read_stream, write_stream
28+
# Clean up...
29+
```
30+
"""
31+
32+
@asynccontextmanager
33+
async def connect(
34+
self,
35+
) -> AsyncIterator[
36+
tuple[
37+
MemoryObjectReceiveStream[SessionMessage | Exception],
38+
MemoryObjectSendStream[SessionMessage],
39+
]
40+
]:
41+
"""Connect to the server and yield streams for communication.
42+
43+
Yields:
44+
A tuple of (read_stream, write_stream) for bidirectional communication.
45+
"""
46+
...
47+
yield # type: ignore[misc]

src/mcp/client/transports/http.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
"""Streamable HTTP transport for MCP clients."""
2+
3+
from __future__ import annotations
4+
5+
from collections.abc import AsyncIterator
6+
from contextlib import asynccontextmanager
7+
8+
import httpx
9+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
10+
11+
from mcp.client.streamable_http import streamable_http_client
12+
from mcp.shared.message import SessionMessage
13+
14+
15+
class HttpTransport:
16+
"""Streamable HTTP transport for connecting to MCP servers over HTTP.
17+
18+
This transport uses the Streamable HTTP protocol, which is the recommended
19+
transport for HTTP-based MCP connections.
20+
21+
Example:
22+
```python
23+
from mcp.client import Client
24+
from mcp.client.transports import HttpTransport
25+
26+
# Basic usage
27+
async with Client(HttpTransport("http://localhost:8000/mcp")) as client:
28+
result = await client.call_tool("my_tool", {...})
29+
30+
# Or use the convenience URL syntax
31+
async with Client("http://localhost:8000/mcp") as client:
32+
result = await client.call_tool("my_tool", {...})
33+
34+
# With custom headers (e.g., authentication)
35+
transport = HttpTransport(
36+
"http://localhost:8000/mcp",
37+
headers={"Authorization": "Bearer token"},
38+
)
39+
async with Client(transport) as client:
40+
result = await client.call_tool("my_tool", {...})
41+
42+
# With a pre-configured httpx client
43+
http_client = httpx.AsyncClient(
44+
headers={"Authorization": "Bearer token"},
45+
timeout=30.0,
46+
)
47+
transport = HttpTransport("http://localhost:8000/mcp", httpx_client=http_client)
48+
async with Client(transport) as client:
49+
result = await client.call_tool("my_tool", {...})
50+
```
51+
"""
52+
53+
def __init__(
54+
self,
55+
url: str,
56+
*,
57+
headers: dict[str, str] | None = None,
58+
httpx_client: httpx.AsyncClient | None = None,
59+
terminate_on_close: bool = True,
60+
) -> None:
61+
"""Initialize the HTTP transport.
62+
63+
Args:
64+
url: The MCP server endpoint URL.
65+
headers: Optional headers to include in requests. For authentication,
66+
include an "Authorization" header or use httpx_client with auth
67+
configured. Ignored if httpx_client is provided.
68+
httpx_client: Optional pre-configured httpx.AsyncClient. If provided,
69+
the headers parameter is ignored. The client lifecycle is managed
70+
externally (not closed by this transport).
71+
terminate_on_close: If True, send a DELETE request to terminate the
72+
session when the context exits. Defaults to True.
73+
"""
74+
self._url = url
75+
self._headers = headers
76+
self._httpx_client = httpx_client
77+
self._terminate_on_close = terminate_on_close
78+
79+
@asynccontextmanager
80+
async def connect(
81+
self,
82+
) -> AsyncIterator[
83+
tuple[
84+
MemoryObjectReceiveStream[SessionMessage | Exception],
85+
MemoryObjectSendStream[SessionMessage],
86+
]
87+
]:
88+
"""Connect to the server and return streams for communication.
89+
90+
Yields:
91+
A tuple of (read_stream, write_stream) for bidirectional communication.
92+
"""
93+
# If headers are provided without a custom client, create one with those headers
94+
client = self._httpx_client
95+
if client is None and self._headers is not None:
96+
client = httpx.AsyncClient(headers=self._headers)
97+
98+
async with streamable_http_client(
99+
self._url,
100+
http_client=client,
101+
terminate_on_close=self._terminate_on_close,
102+
) as (read_stream, write_stream, _get_session_id):
103+
yield read_stream, write_stream

0 commit comments

Comments
 (0)