Skip to content

Commit 281fd47

Browse files
Add SSE polling support (SEP-1699) (#1654)
1 parent 2cd178a commit 281fd47

File tree

21 files changed

+1461
-45
lines changed

21 files changed

+1461
-45
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# MCP SSE Polling Demo Client
2+
3+
Demonstrates client-side auto-reconnect for the SSE polling pattern (SEP-1699).
4+
5+
## Features
6+
7+
- Connects to SSE polling demo server
8+
- Automatically reconnects when server closes SSE stream
9+
- Resumes from Last-Event-ID to avoid missing messages
10+
- Respects server-provided retry interval
11+
12+
## Usage
13+
14+
```bash
15+
# First start the server:
16+
uv run mcp-sse-polling-demo --port 3000
17+
18+
# Then run this client:
19+
uv run mcp-sse-polling-client --url http://localhost:3000/mcp
20+
21+
# Custom options:
22+
uv run mcp-sse-polling-client --url http://localhost:3000/mcp --items 20 --checkpoint-every 5
23+
```
24+
25+
## Options
26+
27+
- `--url`: Server URL (default: <http://localhost:3000/mcp>)
28+
- `--items`: Number of items to process (default: 10)
29+
- `--checkpoint-every`: Checkpoint interval (default: 3)
30+
- `--log-level`: Logging level (default: DEBUG)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""SSE Polling Demo Client - demonstrates auto-reconnect for long-running tasks."""
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""
2+
SSE Polling Demo Client
3+
4+
Demonstrates the client-side auto-reconnect for SSE polling pattern.
5+
6+
This client connects to the SSE Polling Demo server and calls process_batch,
7+
which triggers periodic server-side stream closes. The client automatically
8+
reconnects using Last-Event-ID and resumes receiving messages.
9+
10+
Run with:
11+
# First start the server:
12+
uv run mcp-sse-polling-demo --port 3000
13+
14+
# Then run this client:
15+
uv run mcp-sse-polling-client --url http://localhost:3000/mcp
16+
"""
17+
18+
import asyncio
19+
import logging
20+
21+
import click
22+
from mcp import ClientSession
23+
from mcp.client.streamable_http import streamablehttp_client
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
async def run_demo(url: str, items: int, checkpoint_every: int) -> None:
29+
"""Run the SSE polling demo."""
30+
print(f"\n{'=' * 60}")
31+
print("SSE Polling Demo Client")
32+
print(f"{'=' * 60}")
33+
print(f"Server URL: {url}")
34+
print(f"Processing {items} items with checkpoints every {checkpoint_every}")
35+
print(f"{'=' * 60}\n")
36+
37+
async with streamablehttp_client(url) as (read_stream, write_stream, _):
38+
async with ClientSession(read_stream, write_stream) as session:
39+
# Initialize the connection
40+
print("Initializing connection...")
41+
await session.initialize()
42+
print("Connected!\n")
43+
44+
# List available tools
45+
tools = await session.list_tools()
46+
print(f"Available tools: {[t.name for t in tools.tools]}\n")
47+
48+
# Call the process_batch tool
49+
print(f"Calling process_batch(items={items}, checkpoint_every={checkpoint_every})...\n")
50+
print("-" * 40)
51+
52+
result = await session.call_tool(
53+
"process_batch",
54+
{
55+
"items": items,
56+
"checkpoint_every": checkpoint_every,
57+
},
58+
)
59+
60+
print("-" * 40)
61+
if result.content:
62+
content = result.content[0]
63+
text = getattr(content, "text", str(content))
64+
print(f"\nResult: {text}")
65+
else:
66+
print("\nResult: No content")
67+
print(f"{'=' * 60}\n")
68+
69+
70+
@click.command()
71+
@click.option(
72+
"--url",
73+
default="http://localhost:3000/mcp",
74+
help="Server URL",
75+
)
76+
@click.option(
77+
"--items",
78+
default=10,
79+
help="Number of items to process",
80+
)
81+
@click.option(
82+
"--checkpoint-every",
83+
default=3,
84+
help="Checkpoint interval",
85+
)
86+
@click.option(
87+
"--log-level",
88+
default="INFO",
89+
help="Logging level",
90+
)
91+
def main(url: str, items: int, checkpoint_every: int, log_level: str) -> None:
92+
"""Run the SSE Polling Demo client."""
93+
logging.basicConfig(
94+
level=getattr(logging, log_level.upper()),
95+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
96+
)
97+
# Suppress noisy HTTP client logging
98+
logging.getLogger("httpx").setLevel(logging.WARNING)
99+
logging.getLogger("httpcore").setLevel(logging.WARNING)
100+
101+
asyncio.run(run_demo(url, items, checkpoint_every))
102+
103+
104+
if __name__ == "__main__":
105+
main()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[project]
2+
name = "mcp-sse-polling-client"
3+
version = "0.1.0"
4+
description = "Demo client for SSE polling with auto-reconnect"
5+
readme = "README.md"
6+
requires-python = ">=3.10"
7+
authors = [{ name = "Anthropic, PBC." }]
8+
keywords = ["mcp", "sse", "polling", "client"]
9+
license = { text = "MIT" }
10+
dependencies = ["click>=8.2.0", "mcp"]
11+
12+
[project.scripts]
13+
mcp-sse-polling-client = "mcp_sse_polling_client.main:main"
14+
15+
[build-system]
16+
requires = ["hatchling"]
17+
build-backend = "hatchling.build"
18+
19+
[tool.hatch.build.targets.wheel]
20+
packages = ["mcp_sse_polling_client"]
21+
22+
[tool.pyright]
23+
include = ["mcp_sse_polling_client"]
24+
venvPath = "."
25+
venv = ".venv"
26+
27+
[tool.ruff.lint]
28+
select = ["E", "F", "I"]
29+
ignore = []
30+
31+
[tool.ruff]
32+
line-length = 120
33+
target-version = "py310"
34+
35+
[dependency-groups]
36+
dev = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]

examples/servers/everything-server/mcp_everything_server/server.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
from mcp.server.fastmcp import Context, FastMCP
1515
from mcp.server.fastmcp.prompts.base import UserMessage
1616
from mcp.server.session import ServerSession
17+
from mcp.server.streamable_http import EventCallback, EventMessage, EventStore
1718
from mcp.types import (
1819
AudioContent,
1920
Completion,
2021
CompletionArgument,
2122
CompletionContext,
2223
EmbeddedResource,
2324
ImageContent,
25+
JSONRPCMessage,
2426
PromptReference,
2527
ResourceTemplateReference,
2628
SamplingMessage,
@@ -31,6 +33,43 @@
3133

3234
logger = logging.getLogger(__name__)
3335

36+
# Type aliases for event store
37+
StreamId = str
38+
EventId = str
39+
40+
41+
class InMemoryEventStore(EventStore):
42+
"""Simple in-memory event store for SSE resumability testing."""
43+
44+
def __init__(self) -> None:
45+
self._events: list[tuple[StreamId, EventId, JSONRPCMessage | None]] = []
46+
self._event_id_counter = 0
47+
48+
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
49+
"""Store an event and return its ID."""
50+
self._event_id_counter += 1
51+
event_id = str(self._event_id_counter)
52+
self._events.append((stream_id, event_id, message))
53+
return event_id
54+
55+
async def replay_events_after(self, last_event_id: EventId, send_callback: EventCallback) -> StreamId | None:
56+
"""Replay events after the specified ID."""
57+
target_stream_id = None
58+
for stream_id, event_id, _ in self._events:
59+
if event_id == last_event_id:
60+
target_stream_id = stream_id
61+
break
62+
if target_stream_id is None:
63+
return None
64+
last_event_id_int = int(last_event_id)
65+
for stream_id, event_id, message in self._events:
66+
if stream_id == target_stream_id and int(event_id) > last_event_id_int:
67+
# Skip priming events (None message)
68+
if message is not None:
69+
await send_callback(EventMessage(message, event_id))
70+
return target_stream_id
71+
72+
3473
# Test data
3574
TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
3675
TEST_AUDIO_BASE64 = "UklGRiYAAABXQVZFZm10IBAAAAABAAEAQB8AAAB9AAACABAAZGF0YQIAAAA="
@@ -39,8 +78,13 @@
3978
resource_subscriptions: set[str] = set()
4079
watched_resource_content = "Watched resource content"
4180

81+
# Create event store for SSE resumability (SEP-1699)
82+
event_store = InMemoryEventStore()
83+
4284
mcp = FastMCP(
4385
name="mcp-conformance-test-server",
86+
event_store=event_store,
87+
retry_interval=100, # 100ms retry interval for SSE polling
4488
)
4589

4690

@@ -263,6 +307,19 @@ def test_error_handling() -> str:
263307
raise RuntimeError("This tool intentionally returns an error for testing")
264308

265309

310+
@mcp.tool()
311+
async def test_reconnection(ctx: Context[ServerSession, None]) -> str:
312+
"""Tests SSE polling by closing stream mid-call (SEP-1699)"""
313+
await ctx.info("Before disconnect")
314+
315+
await ctx.close_sse_stream()
316+
317+
await asyncio.sleep(0.2) # Wait for client to reconnect
318+
319+
await ctx.info("After reconnect")
320+
return "Reconnection test completed"
321+
322+
266323
# Resources
267324
@mcp.resource("test://static-text")
268325
def static_text_resource() -> str:

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/event_store.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class EventEntry:
2424

2525
event_id: EventId
2626
stream_id: StreamId
27-
message: JSONRPCMessage
27+
message: JSONRPCMessage | None
2828

2929

3030
class InMemoryEventStore(EventStore):
@@ -48,7 +48,7 @@ def __init__(self, max_events_per_stream: int = 100):
4848
# event_id -> EventEntry for quick lookup
4949
self.event_index: dict[EventId, EventEntry] = {}
5050

51-
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage) -> EventId:
51+
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
5252
"""Stores an event with a generated event ID."""
5353
event_id = str(uuid4())
5454
event_entry = EventEntry(event_id=event_id, stream_id=stream_id, message=message)
@@ -88,7 +88,9 @@ async def replay_events_after(
8888
found_last = False
8989
for event in stream_events:
9090
if found_last:
91-
await send_callback(EventMessage(event.message, event.event_id))
91+
# Skip priming events (None message)
92+
if event.message is not None:
93+
await send_callback(EventMessage(event.message, event.event_id))
9294
elif event.event_id == last_event_id:
9395
found_last = True
9496

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# MCP SSE Polling Demo Server
2+
3+
Demonstrates the SSE polling pattern with server-initiated stream close for long-running tasks (SEP-1699).
4+
5+
## Features
6+
7+
- Priming events (automatic with EventStore)
8+
- Server-initiated stream close via `close_sse_stream()` callback
9+
- Client auto-reconnect with Last-Event-ID
10+
- Progress notifications during long-running tasks
11+
- Configurable retry interval
12+
13+
## Usage
14+
15+
```bash
16+
# Start server on default port
17+
uv run mcp-sse-polling-demo --port 3000
18+
19+
# Custom retry interval (milliseconds)
20+
uv run mcp-sse-polling-demo --port 3000 --retry-interval 100
21+
```
22+
23+
## Tool: process_batch
24+
25+
Processes items with periodic checkpoints that trigger SSE stream closes:
26+
27+
- `items`: Number of items to process (1-100, default: 10)
28+
- `checkpoint_every`: Close stream after this many items (1-20, default: 3)
29+
30+
## Client
31+
32+
Use the companion `mcp-sse-polling-client` to test:
33+
34+
```bash
35+
uv run mcp-sse-polling-client --url http://localhost:3000/mcp
36+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""SSE Polling Demo Server - demonstrates close_sse_stream for long-running tasks."""
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Entry point for the SSE Polling Demo server."""
2+
3+
from .server import main
4+
5+
if __name__ == "__main__":
6+
main()

0 commit comments

Comments
 (0)