Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions examples/clients/sse-polling-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# MCP SSE Polling Demo Client

Demonstrates client-side auto-reconnect for the SSE polling pattern (SEP-1699).

## Features

- Connects to SSE polling demo server
- Automatically reconnects when server closes SSE stream
- Resumes from Last-Event-ID to avoid missing messages
- Respects server-provided retry interval

## Usage

```bash
# First start the server:
uv run mcp-sse-polling-demo --port 3000

# Then run this client:
uv run mcp-sse-polling-client --url http://localhost:3000/mcp

# Custom options:
uv run mcp-sse-polling-client --url http://localhost:3000/mcp --items 20 --checkpoint-every 5
```

## Options

- `--url`: Server URL (default: <http://localhost:3000/mcp>)
- `--items`: Number of items to process (default: 10)
- `--checkpoint-every`: Checkpoint interval (default: 3)
- `--log-level`: Logging level (default: DEBUG)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""SSE Polling Demo Client - demonstrates auto-reconnect for long-running tasks."""
105 changes: 105 additions & 0 deletions examples/clients/sse-polling-client/mcp_sse_polling_client/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
SSE Polling Demo Client

Demonstrates the client-side auto-reconnect for SSE polling pattern.

This client connects to the SSE Polling Demo server and calls process_batch,
which triggers periodic server-side stream closes. The client automatically
reconnects using Last-Event-ID and resumes receiving messages.

Run with:
# First start the server:
uv run mcp-sse-polling-demo --port 3000

# Then run this client:
uv run mcp-sse-polling-client --url http://localhost:3000/mcp
"""

import asyncio
import logging

import click
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

logger = logging.getLogger(__name__)


async def run_demo(url: str, items: int, checkpoint_every: int) -> None:
"""Run the SSE polling demo."""
print(f"\n{'=' * 60}")
print("SSE Polling Demo Client")
print(f"{'=' * 60}")
print(f"Server URL: {url}")
print(f"Processing {items} items with checkpoints every {checkpoint_every}")
print(f"{'=' * 60}\n")

async with streamablehttp_client(url) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
# Initialize the connection
print("Initializing connection...")
await session.initialize()
print("Connected!\n")

# List available tools
tools = await session.list_tools()
print(f"Available tools: {[t.name for t in tools.tools]}\n")

# Call the process_batch tool
print(f"Calling process_batch(items={items}, checkpoint_every={checkpoint_every})...\n")
print("-" * 40)

result = await session.call_tool(
"process_batch",
{
"items": items,
"checkpoint_every": checkpoint_every,
},
)

print("-" * 40)
if result.content:
content = result.content[0]
text = getattr(content, "text", str(content))
print(f"\nResult: {text}")
else:
print("\nResult: No content")
print(f"{'=' * 60}\n")


@click.command()
@click.option(
"--url",
default="http://localhost:3000/mcp",
help="Server URL",
)
@click.option(
"--items",
default=10,
help="Number of items to process",
)
@click.option(
"--checkpoint-every",
default=3,
help="Checkpoint interval",
)
@click.option(
"--log-level",
default="INFO",
help="Logging level",
)
def main(url: str, items: int, checkpoint_every: int, log_level: str) -> None:
"""Run the SSE Polling Demo client."""
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Suppress noisy HTTP client logging
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)

asyncio.run(run_demo(url, items, checkpoint_every))


if __name__ == "__main__":
main()
36 changes: 36 additions & 0 deletions examples/clients/sse-polling-client/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[project]
name = "mcp-sse-polling-client"
version = "0.1.0"
description = "Demo client for SSE polling with auto-reconnect"
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "Anthropic, PBC." }]
keywords = ["mcp", "sse", "polling", "client"]
license = { text = "MIT" }
dependencies = ["click>=8.2.0", "mcp"]

[project.scripts]
mcp-sse-polling-client = "mcp_sse_polling_client.main:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["mcp_sse_polling_client"]

[tool.pyright]
include = ["mcp_sse_polling_client"]
venvPath = "."
venv = ".venv"

[tool.ruff.lint]
select = ["E", "F", "I"]
ignore = []

[tool.ruff]
line-length = 120
target-version = "py310"

[dependency-groups]
dev = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]
57 changes: 57 additions & 0 deletions examples/servers/everything-server/mcp_everything_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.fastmcp.prompts.base import UserMessage
from mcp.server.session import ServerSession
from mcp.server.streamable_http import EventCallback, EventMessage, EventStore
from mcp.types import (
AudioContent,
Completion,
CompletionArgument,
CompletionContext,
EmbeddedResource,
ImageContent,
JSONRPCMessage,
PromptReference,
ResourceTemplateReference,
SamplingMessage,
Expand All @@ -31,6 +33,43 @@

logger = logging.getLogger(__name__)

# Type aliases for event store
StreamId = str
EventId = str


class InMemoryEventStore(EventStore):
"""Simple in-memory event store for SSE resumability testing."""

def __init__(self) -> None:
self._events: list[tuple[StreamId, EventId, JSONRPCMessage | None]] = []
self._event_id_counter = 0

async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
"""Store an event and return its ID."""
self._event_id_counter += 1
event_id = str(self._event_id_counter)
self._events.append((stream_id, event_id, message))
return event_id

async def replay_events_after(self, last_event_id: EventId, send_callback: EventCallback) -> StreamId | None:
"""Replay events after the specified ID."""
target_stream_id = None
for stream_id, event_id, _ in self._events:
if event_id == last_event_id:
target_stream_id = stream_id
break
if target_stream_id is None:
return None
last_event_id_int = int(last_event_id)
for stream_id, event_id, message in self._events:
if stream_id == target_stream_id and int(event_id) > last_event_id_int:
# Skip priming events (None message)
if message is not None:
await send_callback(EventMessage(message, event_id))
return target_stream_id


# Test data
TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
TEST_AUDIO_BASE64 = "UklGRiYAAABXQVZFZm10IBAAAAABAAEAQB8AAAB9AAACABAAZGF0YQIAAAA="
Expand All @@ -39,8 +78,13 @@
resource_subscriptions: set[str] = set()
watched_resource_content = "Watched resource content"

# Create event store for SSE resumability (SEP-1699)
event_store = InMemoryEventStore()

mcp = FastMCP(
name="mcp-conformance-test-server",
event_store=event_store,
retry_interval=100, # 100ms retry interval for SSE polling
)


Expand Down Expand Up @@ -263,6 +307,19 @@ def test_error_handling() -> str:
raise RuntimeError("This tool intentionally returns an error for testing")


@mcp.tool()
async def test_reconnection(ctx: Context[ServerSession, None]) -> str:
"""Tests SSE polling by closing stream mid-call (SEP-1699)"""
await ctx.info("Before disconnect")

await ctx.close_sse_stream()

await asyncio.sleep(0.2) # Wait for client to reconnect

await ctx.info("After reconnect")
return "Reconnection test completed"


# Resources
@mcp.resource("test://static-text")
def static_text_resource() -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class EventEntry:

event_id: EventId
stream_id: StreamId
message: JSONRPCMessage
message: JSONRPCMessage | None


class InMemoryEventStore(EventStore):
Expand All @@ -48,7 +48,7 @@ def __init__(self, max_events_per_stream: int = 100):
# event_id -> EventEntry for quick lookup
self.event_index: dict[EventId, EventEntry] = {}

async def store_event(self, stream_id: StreamId, message: JSONRPCMessage) -> EventId:
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
"""Stores an event with a generated event ID."""
event_id = str(uuid4())
event_entry = EventEntry(event_id=event_id, stream_id=stream_id, message=message)
Expand Down Expand Up @@ -88,7 +88,9 @@ async def replay_events_after(
found_last = False
for event in stream_events:
if found_last:
await send_callback(EventMessage(event.message, event.event_id))
# Skip priming events (None message)
if event.message is not None:
await send_callback(EventMessage(event.message, event.event_id))
elif event.event_id == last_event_id:
found_last = True

Expand Down
36 changes: 36 additions & 0 deletions examples/servers/sse-polling-demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# MCP SSE Polling Demo Server

Demonstrates the SSE polling pattern with server-initiated stream close for long-running tasks (SEP-1699).

## Features

- Priming events (automatic with EventStore)
- Server-initiated stream close via `close_sse_stream()` callback
- Client auto-reconnect with Last-Event-ID
- Progress notifications during long-running tasks
- Configurable retry interval

## Usage

```bash
# Start server on default port
uv run mcp-sse-polling-demo --port 3000

# Custom retry interval (milliseconds)
uv run mcp-sse-polling-demo --port 3000 --retry-interval 100
```

## Tool: process_batch

Processes items with periodic checkpoints that trigger SSE stream closes:

- `items`: Number of items to process (1-100, default: 10)
- `checkpoint_every`: Close stream after this many items (1-20, default: 3)

## Client

Use the companion `mcp-sse-polling-client` to test:

```bash
uv run mcp-sse-polling-client --url http://localhost:3000/mcp
```
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""SSE Polling Demo Server - demonstrates close_sse_stream for long-running tasks."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Entry point for the SSE Polling Demo server."""

from .server import main

if __name__ == "__main__":
main()
Loading