Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/rmcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ name = "test_streamable_http_priming"
required-features = ["server", "client", "transport-streamable-http-server", "reqwest"]
path = "tests/test_streamable_http_priming.rs"

[[test]]
name = "test_streamable_http_json_response"
required-features = ["server", "client", "transport-streamable-http-server", "reqwest"]
path = "tests/test_streamable_http_json_response.rs"


[[test]]
name = "test_custom_request"
Expand Down
63 changes: 49 additions & 14 deletions crates/rmcp/src/transport/streamable_http_server/tower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ pub struct StreamableHttpServerConfig {
/// If true, the server will create a session for each request and keep it alive.
/// When enabled, SSE priming events are sent to enable client reconnection.
pub stateful_mode: bool,
/// When true and `stateful_mode` is false, the server returns
/// `Content-Type: application/json` directly instead of `text/event-stream`.
/// This eliminates SSE framing overhead for simple request-response tools,
/// allowed by the MCP Streamable HTTP spec (2025-06-18).
pub json_response: bool,
/// Cancellation token for the Streamable HTTP server.
///
/// When this token is cancelled, all active sessions are terminated and
Expand All @@ -51,6 +56,7 @@ impl Default for StreamableHttpServerConfig {
sse_keep_alive: Some(Duration::from_secs(15)),
sse_retry: Some(Duration::from_secs(3)),
stateful_mode: true,
json_response: false,
cancellation_token: CancellationToken::new(),
}
}
Expand Down Expand Up @@ -585,27 +591,56 @@ where
match message {
ClientJsonRpcMessage::Request(mut request) => {
request.request.extensions_mut().insert(part);
let (transport, receiver) =
let (transport, mut receiver) =
OneshotTransport::<RoleServer>::new(ClientJsonRpcMessage::Request(request));
let service = serve_directly(service, transport, None);
tokio::spawn(async move {
// on service created
let _ = service.waiting().await;
});
// Stateless mode: no priming (no session to resume)
let stream = ReceiverStream::new(receiver).map(|message| {
tracing::info!(?message);
ServerSseMessage {
event_id: None,
message: Some(Arc::new(message)),
retry: None,
if self.config.json_response {
// JSON-direct mode: await the single response and return as
// application/json, eliminating SSE framing overhead.
// Allowed by MCP Streamable HTTP spec (2025-06-18).
let cancel = self.config.cancellation_token.child_token();
match tokio::select! {
res = receiver.recv() => res,
_ = cancel.cancelled() => None,
} {
Some(message) => {
tracing::info!(?message);
let body = serde_json::to_vec(&message).map_err(|e| {
internal_error_response("serialize json response")(e)
})?;
Ok(Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, JSON_MIME_TYPE)
.body(Full::new(Bytes::from(body)).boxed())
.expect("valid response"))
}
None => Err(internal_error_response("empty response")(
std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"no response message received from handler",
),
)),
}
});
Ok(sse_stream_response(
stream,
self.config.sse_keep_alive,
self.config.cancellation_token.child_token(),
))
} else {
// SSE mode (default): original behaviour preserved unchanged
let stream = ReceiverStream::new(receiver).map(|message| {
tracing::info!(?message);
ServerSseMessage {
event_id: None,
message: Some(Arc::new(message)),
retry: None,
}
});
Ok(sse_stream_response(
stream,
self.config.sse_keep_alive,
self.config.cancellation_token.child_token(),
))
}
}
ClientJsonRpcMessage::Notification(_notification) => {
// ignore
Expand Down
1 change: 1 addition & 0 deletions crates/rmcp/tests/test_sse_concurrent_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async fn start_test_server(ct: CancellationToken, trigger: Arc<Notify>) -> Strin
sse_keep_alive: Some(Duration::from_secs(15)),
sse_retry: Some(Duration::from_secs(3)),
cancellation_token: ct.child_token(),
..Default::default()
},
);

Expand Down
155 changes: 155 additions & 0 deletions crates/rmcp/tests/test_streamable_http_json_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use rmcp::transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
};
use tokio_util::sync::CancellationToken;

mod common;
use common::calculator::Calculator;

const INIT_BODY: &str = r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#;

async fn spawn_server(
config: StreamableHttpServerConfig,
) -> (reqwest::Client, String, CancellationToken) {
let ct = config.cancellation_token.clone();
let service: StreamableHttpService<Calculator, LocalSessionManager> =
StreamableHttpService::new(|| Ok(Calculator::new()), Default::default(), config);

let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = tcp_listener.local_addr().unwrap();

tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});

let client = reqwest::Client::new();
let base_url = format!("http://{addr}/mcp");
(client, base_url, ct)
}

#[tokio::test]
async fn stateless_json_response_returns_application_json() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let (client, url, ct) = spawn_server(StreamableHttpServerConfig {
stateful_mode: false,
json_response: true,
sse_keep_alive: None,
cancellation_token: ct.child_token(),
..Default::default()
})
.await;

let response = client
.post(&url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(INIT_BODY)
.send()
.await?;

assert_eq!(response.status(), 200);

let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
assert!(
content_type.contains("application/json"),
"Expected application/json, got: {content_type}"
);

let body = response.text().await?;
let parsed: serde_json::Value = serde_json::from_str(&body)?;
assert_eq!(parsed["jsonrpc"], "2.0");
assert_eq!(parsed["id"], 1);
assert!(parsed["result"].is_object(), "Expected result object");

ct.cancel();
Ok(())
}

#[tokio::test]
async fn stateless_sse_mode_default_unchanged() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let (client, url, ct) = spawn_server(StreamableHttpServerConfig {
stateful_mode: false,
json_response: false,
sse_keep_alive: None,
cancellation_token: ct.child_token(),
..Default::default()
})
.await;

let response = client
.post(&url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(INIT_BODY)
.send()
.await?;

assert_eq!(response.status(), 200);

let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
assert!(
content_type.contains("text/event-stream"),
"Expected text/event-stream, got: {content_type}"
);

let body = response.text().await?;
assert!(
body.contains("data:"),
"Expected SSE framing (data: prefix), got: {body}"
);

ct.cancel();
Ok(())
}

#[tokio::test]
async fn json_response_ignored_in_stateful_mode() -> anyhow::Result<()> {
let ct = CancellationToken::new();
// json_response: true has no effect when stateful_mode: true — server still uses SSE
let (client, url, ct) = spawn_server(StreamableHttpServerConfig {
stateful_mode: true,
json_response: true,
sse_keep_alive: None,
cancellation_token: ct.child_token(),
..Default::default()
})
.await;

let response = client
.post(&url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(INIT_BODY)
.send()
.await?;

assert_eq!(response.status(), 200);

let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
assert!(
content_type.contains("text/event-stream"),
"Stateful mode should always use SSE regardless of json_response, got: {content_type}"
);

ct.cancel();
Ok(())
}