Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -808,10 +808,21 @@ Request additional information from users. This example shows an Elicitation dur

<!-- snippet-source examples/snippets/servers/elicitation.py -->
```python
"""Elicitation examples demonstrating form and URL mode elicitation.

Form mode elicitation collects structured, non-sensitive data through a schema.
URL mode elicitation directs users to external URLs for sensitive operations
like OAuth flows, credential collection, or payment processing.
"""

import uuid

from pydantic import BaseModel, Field

from mcp.server.fastmcp import Context, FastMCP
from mcp.server.session import ServerSession
from mcp.shared.exceptions import UrlElicitationRequiredError
from mcp.types import ElicitRequestURLParams

mcp = FastMCP(name="Elicitation Example")

Expand All @@ -828,7 +839,10 @@ class BookingPreferences(BaseModel):

@mcp.tool()
async def book_table(date: str, time: str, party_size: int, ctx: Context[ServerSession, None]) -> str:
"""Book a table with date availability check."""
"""Book a table with date availability check.

This demonstrates form mode elicitation for collecting non-sensitive user input.
"""
# Check if date is available
if date == "2024-12-25":
# Date unavailable - ask user for alternative
Expand All @@ -845,6 +859,54 @@ async def book_table(date: str, time: str, party_size: int, ctx: Context[ServerS

# Date available
return f"[SUCCESS] Booked for {date} at {time}"


@mcp.tool()
async def secure_payment(amount: float, ctx: Context[ServerSession, None]) -> str:
"""Process a secure payment requiring URL confirmation.

This demonstrates URL mode elicitation using ctx.elicit_url() for
operations that require out-of-band user interaction.
"""
elicitation_id = str(uuid.uuid4())

result = await ctx.elicit_url(
message=f"Please confirm payment of ${amount:.2f}",
url=f"https://payments.example.com/confirm?amount={amount}&id={elicitation_id}",
elicitation_id=elicitation_id,
)

if result.action == "accept":
# In a real app, the payment confirmation would happen out-of-band
# and you'd verify the payment status from your backend
return f"Payment of ${amount:.2f} initiated - check your browser to complete"
elif result.action == "decline":
return "Payment declined by user"
return "Payment cancelled"


@mcp.tool()
async def connect_service(service_name: str, ctx: Context[ServerSession, None]) -> str:
"""Connect to a third-party service requiring OAuth authorization.

This demonstrates the "throw error" pattern using UrlElicitationRequiredError.
Use this pattern when the tool cannot proceed without user authorization.
"""
elicitation_id = str(uuid.uuid4())

# Raise UrlElicitationRequiredError to signal that the client must complete
# a URL elicitation before this request can be processed.
# The MCP framework will convert this to a -32042 error response.
raise UrlElicitationRequiredError(
[
ElicitRequestURLParams(
mode="url",
message=f"Authorization required to connect to {service_name}",
url=f"https://{service_name}.example.com/oauth/authorize?elicit={elicitation_id}",
elicitationId=elicitation_id,
)
]
)
```

_Full example: [examples/snippets/servers/elicitation.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/elicitation.py)_
Expand Down
15 changes: 13 additions & 2 deletions examples/clients/simple-auth-client/mcp_simple_auth_client/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,15 @@ def get_state(self):
class SimpleAuthClient:
"""Simple MCP client with auth support."""

def __init__(self, server_url: str, transport_type: str = "streamable-http"):
def __init__(
self,
server_url: str,
transport_type: str = "streamable-http",
client_metadata_url: str | None = None,
):
self.server_url = server_url
self.transport_type = transport_type
self.client_metadata_url = client_metadata_url
self.session: ClientSession | None = None

async def connect(self):
Expand Down Expand Up @@ -185,12 +191,14 @@ async def _default_redirect_handler(authorization_url: str) -> None:
webbrowser.open(authorization_url)

# Create OAuth authentication handler using the new interface
# Use client_metadata_url to enable CIMD when the server supports it
oauth_auth = OAuthClientProvider(
server_url=self.server_url,
client_metadata=OAuthClientMetadata.model_validate(client_metadata_dict),
storage=InMemoryTokenStorage(),
redirect_handler=_default_redirect_handler,
callback_handler=callback_handler,
client_metadata_url=self.client_metadata_url,
)

# Create transport with auth handler based on transport type
Expand Down Expand Up @@ -334,6 +342,7 @@ async def main():
# Most MCP streamable HTTP servers use /mcp as the endpoint
server_url = os.getenv("MCP_SERVER_PORT", 8000)
transport_type = os.getenv("MCP_TRANSPORT_TYPE", "streamable-http")
client_metadata_url = os.getenv("MCP_CLIENT_METADATA_URL")
server_url = (
f"http://localhost:{server_url}/mcp"
if transport_type == "streamable-http"
Expand All @@ -343,9 +352,11 @@ async def main():
print("🚀 Simple MCP Auth Client")
print(f"Connecting to: {server_url}")
print(f"Transport type: {transport_type}")
if client_metadata_url:
print(f"Client metadata URL: {client_metadata_url}")

# Start connection flow - OAuth will be handled automatically
client = SimpleAuthClient(server_url, transport_type)
client = SimpleAuthClient(server_url, transport_type, client_metadata_url)
await client.connect()


Expand Down
72 changes: 72 additions & 0 deletions examples/servers/everything-server/mcp_everything_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,21 @@
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.fastmcp.prompts.base import UserMessage
from mcp.server.session import ServerSession
from mcp.server.streamable_http import (
EventCallback,
EventId,
EventMessage,
EventStore,
StreamId,
)
from mcp.types import (
AudioContent,
Completion,
CompletionArgument,
CompletionContext,
EmbeddedResource,
ImageContent,
JSONRPCMessage,
PromptReference,
ResourceTemplateReference,
SamplingMessage,
Expand All @@ -39,8 +47,47 @@
resource_subscriptions: set[str] = set()
watched_resource_content = "Watched resource content"


# Simple in-memory event store for SSE polling resumability (SEP-1699)
class SimpleEventStore(EventStore):
"""Simple in-memory event store for testing resumability."""

def __init__(self) -> None:
self._events: list[tuple[StreamId, EventId, JSONRPCMessage]] = []
self._event_id_counter = 0

async def store_event(self, stream_id: StreamId, message: JSONRPCMessage) -> EventId:
"""Store an event and return its ID."""
self._event_id_counter += 1
event_id = str(self._event_id_counter)
self._events.append((stream_id, event_id, message))
return event_id

async def replay_events_after(
self,
last_event_id: EventId,
send_callback: EventCallback,
) -> StreamId | None:
"""Replay events after the specified ID."""
target_stream_id = None
found = False
for stream_id, event_id, message in self._events:
if event_id == last_event_id:
target_stream_id = stream_id
found = True
continue
if found and stream_id == target_stream_id:
await send_callback(EventMessage(message=message, event_id=event_id))
return target_stream_id


# Create event store for resumability
event_store = SimpleEventStore()

mcp = FastMCP(
name="mcp-conformance-test-server",
event_store=event_store,
sse_retry_interval=3000, # 3 seconds
)


Expand Down Expand Up @@ -257,6 +304,31 @@ async def test_elicitation_sep1330_enums(ctx: Context[ServerSession, None]) -> s
return f"Elicitation not supported or error: {str(e)}"


@mcp.tool()
async def test_reconnection(ctx: Context[ServerSession, None]) -> str:
"""Tests SSE polling via server-initiated disconnect (SEP-1699)

This tool closes the SSE stream mid-call, requiring the client to reconnect
with Last-Event-ID to receive the remaining events.
"""
# Send notification before disconnect
await ctx.info("Notification before disconnect")

# Use the close_sse_stream callback if available
# This is None if not on streamable HTTP transport or no event store configured
if ctx.close_sse_stream:
# Trigger server-initiated SSE disconnect with optional retry interval
await ctx.close_sse_stream(retry_interval=3000) # 3 seconds

# Wait for client to reconnect
await asyncio.sleep(0.2)

# Send notification after disconnect (will be replayed via event store)
await ctx.info("Notification after disconnect")

return "Reconnection test completed successfully"


@mcp.tool()
def test_error_handling() -> str:
"""Tests error response handling"""
Expand Down
103 changes: 103 additions & 0 deletions examples/snippets/clients/sse_polling_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""
SSE Polling Example Client

Demonstrates client-side behavior during server-initiated SSE disconnect.

Key features:
- Automatic reconnection when server closes SSE stream
- Event replay via Last-Event-ID header (handled internally by the transport)
- Progress notifications via logging callback

This client connects to the SSE polling server and calls the `long-task` tool.
The server disconnects at 50% progress, and the client automatically reconnects
to receive remaining progress updates.

Run:
# First start the server:
uv run examples/snippets/servers/sse_polling_server.py

# Then run this client:
uv run examples/snippets/clients/sse_polling_client.py
"""

import asyncio
import logging

from mcp import ClientSession
from mcp.client.streamable_http import StreamableHTTPReconnectionOptions, streamablehttp_client
from mcp.types import LoggingMessageNotificationParams, TextContent

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


async def main() -> None:
print("SSE Polling Example Client")
print("=" * 50)
print()

# Track notifications received via the logging callback
notifications_received: list[str] = []

async def logging_callback(params: LoggingMessageNotificationParams) -> None:
"""Called when a log message notification is received from the server."""
data = params.data
if data:
data_str = str(data)
notifications_received.append(data_str)
print(f"[Progress] {data_str}")

# Configure reconnection behavior
reconnection_options = StreamableHTTPReconnectionOptions(
initial_reconnection_delay=1.0, # Start with 1 second
max_reconnection_delay=30.0, # Cap at 30 seconds
reconnection_delay_grow_factor=1.5, # Exponential backoff
max_retries=5, # Try up to 5 times
)

print("[Client] Connecting to server...")

async with streamablehttp_client(
"http://localhost:3001/mcp",
reconnection_options=reconnection_options,
) as (read_stream, write_stream, get_session_id):
# Create session with logging callback to receive progress notifications
async with ClientSession(
read_stream,
write_stream,
logging_callback=logging_callback,
) as session:
# Initialize the session
await session.initialize()
session_id = get_session_id()
print(f"[Client] Connected! Session ID: {session_id}")

# List available tools
tools = await session.list_tools()
tool_names = [t.name for t in tools.tools]
print(f"[Client] Available tools: {tool_names}")
print()

# Call the long-running task
print("[Client] Calling long-task tool...")
print("[Client] The server will disconnect at 50% and we'll auto-reconnect")
print()

# Call the tool
result = await session.call_tool("long-task", {})

print()
print("[Client] Task completed!")
if result.content and isinstance(result.content[0], TextContent):
print(f"[Result] {result.content[0].text}")
else:
print("[Result] No content")
print()
print(f"[Summary] Received {len(notifications_received)} progress notifications")


if __name__ == "__main__":
asyncio.run(main())
Loading