Skip to content

Commit c1e0ead

Browse files
authored
fix: add init_timeout for streamable-http sessions (#811)
1 parent ef74147 commit c1e0ead

2 files changed

Lines changed: 89 additions & 7 deletions

File tree

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,8 @@ pub enum LocalSessionWorkerError {
930930
FailToHandleMessage(SessionError),
931931
#[error("keep alive timeout after {}ms", _0.as_millis())]
932932
KeepAliveTimeout(Duration),
933+
#[error("init timeout after {}ms", _0.as_millis())]
934+
InitTimeout(Duration),
933935
#[error("Transport closed")]
934936
TransportClosed,
935937
#[error("Tokio join error {0}")]
@@ -959,13 +961,24 @@ impl Worker for LocalSessionWorker {
959961
FromHttpService(SessionEvent),
960962
FromHandler(WorkerSendRequest<LocalSessionWorker>),
961963
}
962-
// waiting for initialize request
963-
let evt = self.event_rx.recv().await.ok_or_else(|| {
964-
WorkerQuitReason::fatal(
965-
LocalSessionWorkerError::TransportTerminated,
966-
"get initialize request",
967-
)
968-
})?;
964+
let init_timeout = self.session_config.init_timeout.unwrap_or(Duration::MAX);
965+
let evt = tokio::select! {
966+
evt = self.event_rx.recv() => evt.ok_or_else(|| {
967+
WorkerQuitReason::fatal(
968+
LocalSessionWorkerError::TransportTerminated,
969+
"get initialize request",
970+
)
971+
})?,
972+
_ = context.cancellation_token.cancelled() => {
973+
return Err(WorkerQuitReason::Cancelled);
974+
}
975+
_ = tokio::time::sleep(init_timeout) => {
976+
return Err(WorkerQuitReason::fatal(
977+
LocalSessionWorkerError::InitTimeout(init_timeout),
978+
"waiting for initialize request",
979+
));
980+
}
981+
};
969982
let SessionEvent::InitializeRequest { request, responder } = evt else {
970983
return Err(WorkerQuitReason::fatal(
971984
LocalSessionWorkerError::UnexpectedEvent(evt),
@@ -1122,13 +1135,18 @@ pub struct SessionConfig {
11221135
/// resume requests. After this duration, completed entries are evicted
11231136
/// and resume will return an error. Default is 60 seconds.
11241137
pub completed_cache_ttl: Duration,
1138+
/// Maximum duration to wait for the `initialize` request after session
1139+
/// creation. If not received within this window, the session is
1140+
/// terminated. Default is 60 seconds. Set to `None` to disable.
1141+
pub init_timeout: Option<Duration>,
11251142
}
11261143

11271144
impl SessionConfig {
11281145
pub const DEFAULT_CHANNEL_CAPACITY: usize = 16;
11291146
pub const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(300);
11301147
pub const DEFAULT_SSE_RETRY: Duration = Duration::from_secs(3);
11311148
pub const DEFAULT_COMPLETED_CACHE_TTL: Duration = Duration::from_secs(60);
1149+
pub const DEFAULT_INIT_TIMEOUT: Duration = Duration::from_secs(60);
11321150
}
11331151

11341152
impl Default for SessionConfig {
@@ -1138,6 +1156,7 @@ impl Default for SessionConfig {
11381156
keep_alive: Some(Self::DEFAULT_KEEP_ALIVE),
11391157
sse_retry: Some(Self::DEFAULT_SSE_RETRY),
11401158
completed_cache_ttl: Self::DEFAULT_COMPLETED_CACHE_TTL,
1159+
init_timeout: Some(Self::DEFAULT_INIT_TIMEOUT),
11411160
}
11421161
}
11431162
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#![cfg(all(feature = "transport-streamable-http-server", not(feature = "local")))]
2+
3+
use std::time::Duration;
4+
5+
use rmcp::{
6+
model::{ClientJsonRpcMessage, ClientRequest, PingRequest, RequestId},
7+
transport::streamable_http_server::session::{SessionManager, local::LocalSessionManager},
8+
};
9+
10+
#[tokio::test]
11+
async fn test_init_timeout_terminates_pre_init_session() -> anyhow::Result<()> {
12+
let mut manager = LocalSessionManager::default();
13+
manager.session_config.init_timeout = Some(Duration::from_millis(200));
14+
15+
// Bind the transport so its drop-guard doesn't cancel the worker — we
16+
// want termination via init_timeout, not via cancellation.
17+
let (session_id, _transport) = manager.create_session().await?;
18+
19+
tokio::time::sleep(Duration::from_millis(500)).await;
20+
21+
let message = ClientJsonRpcMessage::request(
22+
ClientRequest::PingRequest(PingRequest::default()),
23+
RequestId::Number(1),
24+
);
25+
let result = manager.initialize_session(&session_id, message).await;
26+
27+
assert!(
28+
result.is_err(),
29+
"expected worker to be dead; got: {result:?}"
30+
);
31+
32+
Ok(())
33+
}
34+
35+
#[tokio::test]
36+
async fn test_init_timeout_none_keeps_worker_alive() -> anyhow::Result<()> {
37+
let mut manager = LocalSessionManager::default();
38+
manager.session_config.init_timeout = None;
39+
40+
let (session_id, _transport) = manager.create_session().await?;
41+
42+
tokio::time::sleep(Duration::from_millis(500)).await;
43+
44+
let message = ClientJsonRpcMessage::request(
45+
ClientRequest::PingRequest(PingRequest::default()),
46+
RequestId::Number(1),
47+
);
48+
// Liveness probe: a live worker accepts the send then stalls waiting for
49+
// a handler response (none is wired up), tripping the outer timeout. A
50+
// dead worker would fail the send and return immediately.
51+
let probe = tokio::time::timeout(
52+
Duration::from_millis(200),
53+
manager.initialize_session(&session_id, message),
54+
)
55+
.await;
56+
57+
assert!(
58+
probe.is_err(),
59+
"expected worker to be alive; got: {probe:?}"
60+
);
61+
62+
Ok(())
63+
}

0 commit comments

Comments
 (0)