Skip to content

Implement MCP Streamable HTTP Server #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
3562dbe
Add mcp http streamable server
tendant Apr 29, 2025
d82bf78
update client
tendant Apr 29, 2025
5f5303c
Add minimal server and client
tendant Apr 29, 2025
508c5ad
Merge branch 'mark3labs:main' into http-streamable
tendant May 2, 2025
b464eae
Add session tools
tendant May 1, 2025
650f3c9
update session to new session tools
tendant May 1, 2025
51e0674
wip: fix test case
tendant May 2, 2025
4e91225
clean up output
tendant May 2, 2025
e160f19
Add StatelessMode
tendant May 2, 2025
dfab0e0
refactor w.(http.Flusher).Flush() out of go func().
tendant May 7, 2025
ae8be03
fix(server/sse): potential goroutine leak in Heartbeat sender (#236)
cryo-zd May 3, 2025
48c8bb6
Fix stdio test compilation issues in CI (#240)
ezynda3 May 4, 2025
12c57bb
refactor(server/sse): rename WithBasePath to WithStaticBasePath for c…
rwjblue-glean May 4, 2025
701927b
fix(MCPServer): Session tool handler not used due to variable shadowi…
cryo-zd May 4, 2025
ff9a85b
test: build mockstdio_server with isolated cache to prevent flaky CI …
rwjblue-glean May 4, 2025
5734235
fix: Use detached context for SSE message handling (#244)
yash025 May 5, 2025
a12f1cd
Format
ezynda3 May 6, 2025
55897af
support audio content type (#250)
dugenkui03 May 8, 2025
556b070
refactor(server): extract shared HTTP transport configuration options…
rwjblue-glean May 8, 2025
d8c570f
Update matching for Accept Header
tendant May 7, 2025
cfbbab2
clean up unused code
tendant May 7, 2025
61dd0fb
refactor to use StreamableHTTPOption
tendant May 8, 2025
5a4c9a5
Update to use new configuration
tendant May 8, 2025
91461cd
Merge branch 'main' into http-streamable
tendant May 8, 2025
8bd2b46
clean up code and add last event id
tendant May 11, 2025
635bcc3
Add test for origin validation
tendant May 11, 2025
829c0a8
update handling of notification during request processing
tendant May 11, 2025
e016e85
Update closing of sse stream.
tendant May 11, 2025
11618e0
fix go goroutine on every call issue
tendant May 11, 2025
59d251a
Fixed Listener Leak in Start Method
tendant May 11, 2025
ce991c3
Fixed Concurrency Issue in SetSessionTools
tendant May 11, 2025
86feb0d
Removed Unused `lastEventID` Field
tendant May 11, 2025
2821ef0
Update Stream Management & clean up unused code
tendant May 11, 2025
a863825
fix tests
tendant May 12, 2025
6f9af6c
Update settings
tendant May 12, 2025
3628ca2
Update eventId
tendant May 12, 2025
5f92546
Fix for Notification Handler
tendant May 12, 2025
f3dbeb0
clean up unused code, base path logic and cors headers
tendant May 12, 2025
de2a2a0
Close session channels on DELETE to avoid goroutine leaks
tendant May 12, 2025
4b79c50
Update event ID for Get
tendant May 12, 2025
c7740fd
Improved Origin Validation Security
tendant May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions README-streamable-http.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# MCP Streamable HTTP Implementation

This is an implementation of the [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) Streamable HTTP transport for Go. It follows the [MCP Streamable HTTP transport specification](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports).

## Features

- Implementation of the MCP Streamable HTTP transport specification
- Support for both client and server sides
- Session management with unique session IDs
- Support for SSE (Server-Sent Events) streaming
- Support for direct JSON responses
- Basic resumability with event IDs
- Support for notifications
- Support for session termination
- Origin header validation for security

## Current Limitations

- Limited batching support
- Basic resumability support (improved but not complete)
- No support for server -> client requests
- Limited support for continuously listening for server notifications

## Server Implementation

The server implementation is in `server/streamable_http.go`. It provides the Streamable HTTP transport for the server side.

### Key Components

- `StreamableHTTPServer`: The main server implementation that handles HTTP requests and responses
- `streamableHTTPSession`: Represents an active session with a client
- `EventStore`: Interface for storing and retrieving events for resumability
- `InMemoryEventStore`: A simple in-memory implementation of the EventStore interface

### Server Options

- `WithSessionIDGenerator`: Sets a custom session ID generator
- `WithStatelessMode`: Enables stateless mode (no sessions)
- `WithEnableJSONResponse`: Enables direct JSON responses instead of SSE streams
- `WithEventStore`: Sets a custom event store for resumability
- `WithStreamableHTTPContextFunc`: Sets a function to customize the context

## Client Implementation

The client implementation is in `client/transport/streamable_http.go`. It provides the Streamable HTTP transport for the client side.

### Client Options

- `WithHTTPHeaders`: Sets custom HTTP headers for all requests
- `WithHTTPTimeout`: Sets the timeout for HTTP requests and streams

## Usage

For complete examples, see:
- Server example: `examples/streamable_http_server/main.go`
- Client example: `examples/streamable_http_client/main.go`
- Complete client example: `examples/streamable_http_client_complete/main.go`

## Protocol Details

The Streamable HTTP transport follows the MCP Streamable HTTP transport specification:

1. **Session Management**: Sessions are created during initialization and maintained through a session ID header.
2. **SSE Streaming**: Server-Sent Events (SSE) are used for streaming responses and notifications.
3. **Direct JSON Responses**: For simple requests, direct JSON responses can be used instead of SSE.
4. **Resumability**: Events can be stored and replayed if a client reconnects with a Last-Event-ID header.
5. **Session Termination**: Sessions can be explicitly terminated with a DELETE request.
6. **Multiple Sessions**: The server supports multiple concurrent independent sessions.

## HTTP Methods

- **POST**: Used for sending JSON-RPC requests and notifications
- **GET**: Used for establishing a standalone SSE stream for receiving notifications
- **DELETE**: Used for terminating a session

## HTTP Headers

- **Mcp-Session-Id**: Used to identify a session
- **Accept**: Used to indicate support for SSE (`text/event-stream`)
- **Last-Event-Id**: Used for resumability
- **Origin**: Validated by the server for security

## Implementation Notes

- The server implementation supports both stateful and stateless modes.
- In stateful mode, a session ID is generated and maintained for each client.
- In stateless mode, no session ID is generated, and no session state is maintained.
- The client implementation supports reconnecting and resuming after disconnection.
- The server implementation supports multiple concurrent clients.
- Each client instance typically manages a single session at a time.
56 changes: 46 additions & 10 deletions client/transport/streamable_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,18 @@ func WithHTTPTimeout(timeout time.Duration) StreamableHTTPCOption {
//
// https://modelcontextprotocol.io/specification/2025-03-26/basic/transports
//
// The current implementation does not support the following features:
// - batching
// - continuously listening for server notifications when no request is in flight
// (https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#listening-for-messages-from-the-server)
// - resuming stream
// (https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#resumability-and-redelivery)
// - server -> client request
// Current limitations:
// - Limited batching support
// - Basic resumability support (improved but not complete)
// - No support for server -> client requests
// - Limited support for continuously listening for server notifications
type StreamableHTTP struct {
baseURL *url.URL
httpClient *http.Client
headers map[string]string

sessionID atomic.Value // string
sessionID atomic.Value // string
lastEventID atomic.Value // string for resumability

notificationHandler func(mcp.JSONRPCNotification)
notifyMu sync.RWMutex
Expand All @@ -75,7 +74,8 @@ func NewStreamableHTTP(baseURL string, options ...StreamableHTTPCOption) (*Strea
headers: make(map[string]string),
closed: make(chan struct{}),
}
smc.sessionID.Store("") // set initial value to simplify later usage
smc.sessionID.Store("") // set initial value to simplify later usage
smc.lastEventID.Store("") // initialize lastEventID

for _, opt := range options {
opt(smc)
Expand Down Expand Up @@ -166,10 +166,20 @@ func (c *StreamableHTTP) SendRequest(
// Set headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")

// Add session ID if available
sessionID := c.sessionID.Load()
if sessionID != "" {
req.Header.Set(headerKeySessionID, sessionID.(string))
}

// Add Last-Event-Id header for resumability if available
lastEventID := c.lastEventID.Load()
if lastEventID != nil && lastEventID.(string) != "" {
req.Header.Set("Last-Event-Id", lastEventID.(string))
}

// Add custom headers
for k, v := range c.headers {
req.Header.Set(k, v)
}
Expand Down Expand Up @@ -294,7 +304,7 @@ func (c *StreamableHTTP) readSSE(ctx context.Context, reader io.ReadCloser, hand
defer reader.Close()

br := bufio.NewReader(reader)
var event, data string
var event, data, id string

for {
select {
Expand Down Expand Up @@ -325,8 +335,13 @@ func (c *StreamableHTTP) readSSE(ctx context.Context, reader io.ReadCloser, hand
// Empty line means end of event
if event != "" && data != "" {
handler(event, data)
// Store the last event ID for resumability if present
if id != "" {
c.lastEventID.Store(id)
}
event = ""
data = ""
id = ""
}
continue
}
Expand All @@ -335,6 +350,8 @@ func (c *StreamableHTTP) readSSE(ctx context.Context, reader io.ReadCloser, hand
event = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
} else if strings.HasPrefix(line, "data:") {
data = strings.TrimSpace(strings.TrimPrefix(line, "data:"))
} else if strings.HasPrefix(line, "id:") {
id = strings.TrimSpace(strings.TrimPrefix(line, "id:"))
}
}
}
Expand All @@ -357,9 +374,19 @@ func (c *StreamableHTTP) SendNotification(ctx context.Context, notification mcp.
// Set headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")

// Add session ID if available
if sessionID := c.sessionID.Load(); sessionID != "" {
req.Header.Set(headerKeySessionID, sessionID.(string))
}

// Add Last-Event-Id header for resumability if available
lastEventID := c.lastEventID.Load()
if lastEventID != nil && lastEventID.(string) != "" {
req.Header.Set("Last-Event-Id", lastEventID.(string))
}

// Add custom headers
for k, v := range c.headers {
req.Header.Set(k, v)
}
Expand Down Expand Up @@ -392,3 +419,12 @@ func (c *StreamableHTTP) SetNotificationHandler(handler func(mcp.JSONRPCNotifica
func (c *StreamableHTTP) GetSessionId() string {
return c.sessionID.Load().(string)
}

// GetLastEventId returns the last event ID for resumability
func (c *StreamableHTTP) GetLastEventId() string {
lastEventID := c.lastEventID.Load()
if lastEventID == nil {
return ""
}
return lastEventID.(string)
}
71 changes: 71 additions & 0 deletions examples/minimal_client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/mark3labs/mcp-go/client/transport"
)

func main() {
// Create a new Streamable HTTP transport with a longer timeout
trans, err := transport.NewStreamableHTTP("http://localhost:8080/mcp",
transport.WithHTTPTimeout(30*time.Second))
if err != nil {
fmt.Printf("Failed to create transport: %v\n", err)
os.Exit(1)
}
defer trans.Close()

// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Initialize the connection
fmt.Println("Initializing connection...")
initRequest := transport.JSONRPCRequest{
JSONRPC: "2.0",
ID: 1,
Method: "initialize",
}

initResponse, err := trans.SendRequest(ctx, initRequest)
if err != nil {
fmt.Printf("Failed to initialize: %v\n", err)
os.Exit(1)
}

// Print the initialization response
initResponseJSON, _ := json.MarshalIndent(initResponse, "", " ")
fmt.Printf("Initialization response: %s\n", initResponseJSON)
fmt.Printf("Session ID: %s\n", trans.GetSessionId())

// Call the echo tool
fmt.Println("\nCalling echo tool...")
echoRequest := transport.JSONRPCRequest{
JSONRPC: "2.0",
ID: 2,
Method: "tools/call",
Params: map[string]interface{}{
"name": "echo",
"arguments": map[string]interface{}{
"message": "Hello from minimal client!",
},
},
}

echoResponse, err := trans.SendRequest(ctx, echoRequest)
if err != nil {
fmt.Printf("Failed to call echo tool: %v\n", err)
os.Exit(1)
}

// Print the echo response
echoResponseJSON, _ := json.MarshalIndent(echoResponse, "", " ")
fmt.Printf("Echo response: %s\n", echoResponseJSON)

fmt.Println("\nTest completed successfully!")
}
83 changes: 83 additions & 0 deletions examples/minimal_server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)

func main() {
// Create a new MCP server
mcpServer := server.NewMCPServer("minimal-server", "1.0.0")

// Add a simple echo tool
mcpServer.AddTool(
mcp.Tool{
Name: "echo",
Description: "Echoes back the input",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]interface{}{
"message": map[string]interface{}{
"type": "string",
"description": "The message to echo",
},
},
Required: []string{"message"},
},
},
func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Extract the message from the request
message, ok := request.Params.Arguments["message"].(string)
if !ok {
return nil, fmt.Errorf("message must be a string")
}

// Create the result
result := &mcp.CallToolResult{
Result: mcp.Result{},
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("Echo: %s", message),
},
},
}

return result, nil
},
)

// Create a new Streamable HTTP server with direct JSON responses
streamableServer := server.NewStreamableHTTPServer(mcpServer,
server.WithEnableJSONResponse(true),
)

// Start the server in a goroutine
go func() {
log.Println("Starting Minimal Streamable HTTP server on :8080...")
if err := streamableServer.Start(":8080"); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}()

// Wait for interrupt signal to gracefully shutdown the server
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

log.Println("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := streamableServer.Shutdown(ctx); err != nil {
log.Fatalf("Server shutdown failed: %v", err)
}
log.Println("Server exited properly")
}
Loading