Skip to content

Commit 332fcbf

Browse files
Fix/sse channel replacement conflict (#682)
* fix(streamable-http): return 409 Conflict when standalone SSE stream already active LocalSessionWorker::resume() unconditionally replaced self.common.tx on every GET request, orphaning the receiver the first SSE stream was reading from. All subsequent server-to-client notifications were sent to the new sender while the original client was still listening on the old, now-dead receiver. notify_tool_list_changed().await returned Ok(()) silently. This is triggered by VS Code's MCP extension which reconnects SSE every ~5 minutes with the same session ID. Fix: Check tx.is_closed() before replacing the common channel sender. If an active stream exists, return SessionError::Conflict which is propagated as HTTP 409 Conflict. This matches the TypeScript SDK behavior (streamableHttp.ts:423). Signed-off-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com> * fix(streamable-http): handle resume with completed request-wise channel When a client sends GET with Last-Event-ID from a completed POST SSE response, the request-wise channel no longer exists in tx_router. Previously this returned ChannelClosed -> 500, causing clients like Cursor to enter an infinite re-initialization loop. Now falls back to the common channel when the request-wise channel is completed, per MCP spec: "Resumption applies regardless of how the original stream was initiated (POST or GET)." * fix: allow SSE channel replacement instead of 409 Conflict Per MCP spec §Streamable HTTP, "The client MAY remain connected to multiple SSE streams simultaneously." Returning 409 Conflict when a second GET arrives causes Cursor to enter an infinite re-initialization loop (~3s cycle). Instead of rejecting, replace the old common channel sender. Dropping the old sender closes the old receiver, cleanly terminating the previous SSE stream so the client can reconnect on the new stream. This fixes both code paths: - GET with Last-Event-ID from a completed POST SSE response - GET without Last-Event-ID (standalone stream reconnection) * fix: skip cache replay when replacing active SSE stream When a client opens a new GET SSE stream while a previous one is still active, the old sender is dropped (terminating the old stream) and a new channel is created. Previously, sync() replayed all cached events to the new stream, but the client already received those events on the old stream. This caused an infinite notification loop: 1. Client receives notifications (e.g. ResourceListChanged) 2. Old SSE stream dies (sender replaced) 3. Client reconnects after sse_retry (3s) 4. sync() replays cached notifications the client already handled 5. Client processes them again → goto 2 Fix: check tx.is_closed() BEFORE replacing the sender. If the old stream was still alive, skip replay entirely — the client already has those events. Only replay when the old stream was genuinely dead (network failure, timeout) so the client catches up on missed events. * fix: use shadow channels to prevent SSE reconnect loops When POST SSE responses include a `retry` field, the browser's EventSource automatically reconnects via GET after the stream ends. This creates multiple competing EventSource connections that each replace the common channel sender, killing the other stream's receiver. Both reconnect every sse_retry seconds, creating an infinite loop. Instead of always replacing the common channel, check if the primary is still active. If so, create a "shadow" stream — an idle SSE connection kept alive by keep-alive pings that doesn't receive notifications or interfere with the primary channel. Also removes cache replay (sync) on common channel resume, as replaying server-initiated list_changed notifications causes clients to re-process old signals. Signed-off-by: Myko Ash <myko@mcpmux.com> Signed-off-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com> * test: comprehensive shadow channel tests (15 cases) Rewrite test suite for SSE channel replacement fix: - Shadow creation: standalone GET returns 200, multiple GETs coexist - Dead primary: replacement, notification delivery, repeated cycles - Notification routing: primary receives, shadow does not - Resume paths: completed request-wise, common alive/dead - Real scenarios: Cursor leapfrog, VS Code reconnect - Edge cases: invalid session, missing header, shadow cleanup Fix Accept header bug (was missing text/event-stream for notifications/initialized POST, causing 406 rejection). * fix: use correct HTTP status codes for session errors per MCP spec MCP spec (2025-11-25) section "Session Management" requires: - Missing session ID header → 400 Bad Request (not 401) - Unknown/terminated session → 404 Not Found (not 401) Using 401 Unauthorized caused MCP clients (e.g. VS Code) to trigger full OAuth re-authentication on server restart, instead of simply re-initializing the session. Signed-off-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com> * fix: address review feedback — remove dead Conflict variant, restore sync on resume, rename test - Remove unused SessionError::Conflict and dead string-matching in tower.rs (leftover from abandoned 409 approach) - Restore sync() replay when replacing a dead primary common channel so server-initiated requests and cached notifications are not lost on reconnect - Rename test from test_sse_channel_replacement_bug to test_sse_concurrent_streams per reviewer suggestion (describe what tests verify, not what triggered them) - Add test for cache replay on dead primary replacement - Use generic "MCP clients" in comments instead of specific client names Signed-off-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com> * fix: use minimal buffer for shadow streams and cap at 32 - Shadow streams only receive SSE keep-alive pings, so use capacity 1 instead of full channel_capacity - Cap shadow_txs at 32 to prevent unbounded growth from misbehaving clients, dropping the oldest shadow when the limit is reached - Add test verifying primary works after exceeding shadow limit Signed-off-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com> * fix: remove redundant single-component `use reqwest` import Fixes clippy::single_component_path_imports lint error in test_sse_concurrent_streams.rs. --------- Signed-off-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com> Signed-off-by: Myko Ash <myko@mcpmux.com> Co-authored-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com>
1 parent 3cb855b commit 332fcbf

File tree

4 files changed

+891
-37
lines changed

4 files changed

+891
-37
lines changed

crates/rmcp/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,8 @@ required-features = [
241241
"transport-streamable-http-server",
242242
]
243243
path = "tests/test_custom_headers.rs"
244+
245+
[[test]]
246+
name = "test_sse_concurrent_streams"
247+
required-features = ["server", "client", "transport-streamable-http-server", "transport-streamable-http-client", "reqwest"]
248+
path = "tests/test_sse_concurrent_streams.rs"

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 91 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,12 @@ pub struct LocalSessionWorker {
293293
tx_router: HashMap<HttpRequestId, HttpRequestWise>,
294294
resource_router: HashMap<ResourceKey, HttpRequestId>,
295295
common: CachedTx,
296+
/// Shadow senders for secondary SSE streams (e.g. from POST EventSource
297+
/// reconnections). These keep the HTTP connections alive via SSE keep-alive
298+
/// without receiving notifications, preventing MCP clients from entering
299+
/// infinite reconnect loops when multiple EventSource connections compete
300+
/// to replace the common channel.
301+
shadow_txs: Vec<Sender<ServerSseMessage>>,
296302
event_rx: Receiver<SessionEvent>,
297303
session_config: SessionConfig,
298304
}
@@ -513,36 +519,92 @@ impl LocalSessionWorker {
513519
&mut self,
514520
last_event_id: EventId,
515521
) -> Result<StreamableHttpMessageReceiver, SessionError> {
522+
// Clean up closed shadow senders before processing
523+
self.shadow_txs.retain(|tx| !tx.is_closed());
524+
516525
match last_event_id.http_request_id {
517526
Some(http_request_id) => {
518-
let request_wise = self
519-
.tx_router
520-
.get_mut(&http_request_id)
521-
.ok_or(SessionError::ChannelClosed(Some(http_request_id)))?;
522-
let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity);
523-
let (tx, rx) = channel;
524-
request_wise.tx.tx = tx;
525-
let index = last_event_id.index;
526-
// sync messages after index
527-
request_wise.tx.sync(index).await?;
528-
Ok(StreamableHttpMessageReceiver {
529-
http_request_id: Some(http_request_id),
530-
inner: rx,
531-
})
527+
if let Some(request_wise) = self.tx_router.get_mut(&http_request_id) {
528+
// Resume existing request-wise channel
529+
let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity);
530+
let (tx, rx) = channel;
531+
request_wise.tx.tx = tx;
532+
let index = last_event_id.index;
533+
// sync messages after index
534+
request_wise.tx.sync(index).await?;
535+
Ok(StreamableHttpMessageReceiver {
536+
http_request_id: Some(http_request_id),
537+
inner: rx,
538+
})
539+
} else {
540+
// Request-wise channel completed (POST response already delivered).
541+
// The client's EventSource is reconnecting after the POST SSE stream
542+
// ended. Fall through to common channel handling below.
543+
tracing::debug!(
544+
http_request_id,
545+
"Request-wise channel completed, falling back to common channel"
546+
);
547+
self.resume_or_shadow_common(last_event_id.index).await
548+
}
532549
}
533-
None => {
534-
let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity);
535-
let (tx, rx) = channel;
536-
self.common.tx = tx;
537-
let index = last_event_id.index;
538-
// sync messages after index
539-
self.common.sync(index).await?;
540-
Ok(StreamableHttpMessageReceiver {
541-
http_request_id: None,
542-
inner: rx,
543-
})
550+
None => self.resume_or_shadow_common(last_event_id.index).await,
551+
}
552+
}
553+
554+
/// Resume the common channel, or create a shadow stream if the primary is
555+
/// still active.
556+
///
557+
/// When the primary common channel is dead (receiver dropped), replace it
558+
/// so this stream becomes the new primary notification channel. Cached
559+
/// messages are replayed from `last_event_index` so the client receives
560+
/// any events it missed (including server-initiated requests).
561+
///
562+
/// When the primary is still active, create a "shadow" stream — an idle SSE
563+
/// connection kept alive by keep-alive pings. This prevents multiple
564+
/// EventSource connections (e.g. from POST response reconnections) from
565+
/// killing each other by repeatedly replacing the common channel sender.
566+
async fn resume_or_shadow_common(
567+
&mut self,
568+
last_event_index: usize,
569+
) -> Result<StreamableHttpMessageReceiver, SessionError> {
570+
let is_replacing_dead_primary = self.common.tx.is_closed();
571+
let capacity = if is_replacing_dead_primary {
572+
self.session_config.channel_capacity
573+
} else {
574+
1 // Shadow streams only need keep-alive pings
575+
};
576+
let (tx, rx) = tokio::sync::mpsc::channel(capacity);
577+
if is_replacing_dead_primary {
578+
// Primary common channel is dead — replace it.
579+
tracing::debug!("Replacing dead common channel with new primary");
580+
self.common.tx = tx;
581+
// Replay cached messages from where the client left off so
582+
// server-initiated requests and notifications are not lost.
583+
self.common.sync(last_event_index).await?;
584+
} else {
585+
// Primary common channel is still active. Create a shadow stream
586+
// that stays alive via SSE keep-alive but doesn't receive
587+
// notifications. This prevents competing EventSource connections
588+
// from killing each other's channels.
589+
const MAX_SHADOW_STREAMS: usize = 32;
590+
591+
if self.shadow_txs.len() >= MAX_SHADOW_STREAMS {
592+
tracing::warn!(
593+
shadow_count = self.shadow_txs.len(),
594+
"Shadow stream limit reached, dropping oldest"
595+
);
596+
self.shadow_txs.remove(0);
544597
}
598+
tracing::debug!(
599+
shadow_count = self.shadow_txs.len(),
600+
"Common channel active, creating shadow stream"
601+
);
602+
self.shadow_txs.push(tx);
545603
}
604+
Ok(StreamableHttpMessageReceiver {
605+
http_request_id: None,
606+
inner: rx,
607+
})
546608
}
547609

548610
async fn close_sse_stream(
@@ -584,6 +646,9 @@ impl LocalSessionWorker {
584646
let (tx, _rx) = tokio::sync::mpsc::channel(1);
585647
self.common.tx = tx;
586648

649+
// Also close all shadow streams
650+
self.shadow_txs.clear();
651+
587652
tracing::debug!("closed standalone SSE stream for server-initiated disconnection");
588653
Ok(())
589654
}
@@ -1036,6 +1101,7 @@ pub fn create_local_session(
10361101
tx_router: HashMap::new(),
10371102
resource_router: HashMap::new(),
10381103
common,
1104+
shadow_txs: Vec::new(),
10391105
event_rx,
10401106
session_config: config.clone(),
10411107
};

crates/rmcp/src/transport/streamable_http_server/tower.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,10 @@ where
299299
.and_then(|v| v.to_str().ok())
300300
.map(|s| s.to_owned().into());
301301
let Some(session_id) = session_id else {
302-
// unauthorized
302+
// MCP spec: servers that require a session ID SHOULD respond with 400 Bad Request
303303
return Ok(Response::builder()
304-
.status(http::StatusCode::UNAUTHORIZED)
305-
.body(Full::new(Bytes::from("Unauthorized: Session ID is required")).boxed())
304+
.status(http::StatusCode::BAD_REQUEST)
305+
.body(Full::new(Bytes::from("Bad Request: Session ID is required")).boxed())
306306
.expect("valid response"));
307307
};
308308
// check if session exists
@@ -312,10 +312,10 @@ where
312312
.await
313313
.map_err(internal_error_response("check session"))?;
314314
if !has_session {
315-
// unauthorized
315+
// MCP spec: server MUST respond with 404 Not Found for terminated/unknown sessions
316316
return Ok(Response::builder()
317-
.status(http::StatusCode::UNAUTHORIZED)
318-
.body(Full::new(Bytes::from("Unauthorized: Session not found")).boxed())
317+
.status(http::StatusCode::NOT_FOUND)
318+
.body(Full::new(Bytes::from("Not Found: Session not found")).boxed())
319319
.expect("valid response"));
320320
}
321321
// Validate MCP-Protocol-Version header (per 2025-06-18 spec)
@@ -426,10 +426,10 @@ where
426426
.await
427427
.map_err(internal_error_response("check session"))?;
428428
if !has_session {
429-
// unauthorized
429+
// MCP spec: server MUST respond with 404 Not Found for terminated/unknown sessions
430430
return Ok(Response::builder()
431-
.status(http::StatusCode::UNAUTHORIZED)
432-
.body(Full::new(Bytes::from("Unauthorized: Session not found")).boxed())
431+
.status(http::StatusCode::NOT_FOUND)
432+
.body(Full::new(Bytes::from("Not Found: Session not found")).boxed())
433433
.expect("valid response"));
434434
}
435435

@@ -629,10 +629,10 @@ where
629629
.and_then(|v| v.to_str().ok())
630630
.map(|s| s.to_owned().into());
631631
let Some(session_id) = session_id else {
632-
// unauthorized
632+
// MCP spec: servers that require a session ID SHOULD respond with 400 Bad Request
633633
return Ok(Response::builder()
634-
.status(http::StatusCode::UNAUTHORIZED)
635-
.body(Full::new(Bytes::from("Unauthorized: Session ID is required")).boxed())
634+
.status(http::StatusCode::BAD_REQUEST)
635+
.body(Full::new(Bytes::from("Bad Request: Session ID is required")).boxed())
636636
.expect("valid response"));
637637
};
638638
// Validate MCP-Protocol-Version header (per 2025-06-18 spec)

0 commit comments

Comments
 (0)