Skip to content

Commit 74e8117

Browse files
authored
chore: improve message dispatcher duration (#30)
* chore: update dispatcher to use Duration timeout param * chore: modified ping command to take timeout as parameter * chore: update client ping to accept timeout
1 parent af48a2b commit 74e8117

File tree

10 files changed

+65
-39
lines changed

10 files changed

+65
-39
lines changed

Makefile.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ args = [
2929
"rust-mcp-transport",
3030
]
3131

32+
33+
[tasks.check]
34+
dependencies = ["fmt", "clippy", "test", "doc-test"]
35+
3236
[tasks.clippy-fix]
3337
command = "cargo"
3438
args = ["clippy", "--fix", "--allow-dirty"]

crates/rust-mcp-sdk/src/mcp_runtimes/client_runtime.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl ClientRuntime {
5454

5555
async fn initialize_request(&self) -> SdkResult<()> {
5656
let request = InitializeRequest::new(self.client_details.clone());
57-
let result: ServerResult = self.request(request.into()).await?.try_into()?;
57+
let result: ServerResult = self.request(request.into(), None).await?.try_into()?;
5858

5959
if let ServerResult::InitializeResult(initialize_result) = result {
6060
// store server details
@@ -147,7 +147,9 @@ impl McpClient for ClientRuntime {
147147
Err(error_value) => MessageFromClient::Error(error_value),
148148
};
149149
// send the response back with corresponding request id
150-
sender.send(response, Some(jsonrpc_request.id)).await?;
150+
sender
151+
.send(response, Some(jsonrpc_request.id), None)
152+
.await?;
151153
}
152154
ServerMessage::Notification(jsonrpc_notification) => {
153155
self_ref

crates/rust-mcp-sdk/src/mcp_runtimes/server_runtime.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl McpServer for ServerRuntime {
107107

108108
// send the response back with corresponding request id
109109
sender
110-
.send(response, Some(client_jsonrpc_request.id))
110+
.send(response, Some(client_jsonrpc_request.id), None)
111111
.await?;
112112
}
113113
ClientMessage::Notification(client_jsonrpc_notification) => {

crates/rust-mcp-sdk/src/mcp_traits/mcp_client.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{sync::Arc, time::Duration};
22

33
use async_trait::async_trait;
44
use rust_mcp_schema::{
@@ -170,15 +170,19 @@ pub trait McpClient: Sync + Send {
170170
/// This function sends a `RequestFromClient` message to the server, waits for the response,
171171
/// and handles the result. If the response is empty or of an invalid type, an error is returned.
172172
/// Otherwise, it returns the result from the server.
173-
async fn request(&self, request: RequestFromClient) -> SdkResult<ResultFromServer> {
173+
async fn request(
174+
&self,
175+
request: RequestFromClient,
176+
timeout: Option<Duration>,
177+
) -> SdkResult<ResultFromServer> {
174178
let sender = self.sender().await.read().await;
175179
let sender = sender
176180
.as_ref()
177181
.ok_or(schema_utils::SdkError::connection_closed())?;
178182

179183
// Send the request and receive the response.
180184
let response = sender
181-
.send(MessageFromClient::RequestFromClient(request), None)
185+
.send(MessageFromClient::RequestFromClient(request), None, timeout)
182186
.await?;
183187

184188
let server_message = response.ok_or_else(|| {
@@ -205,6 +209,7 @@ pub trait McpClient: Sync + Send {
205209
.send(
206210
MessageFromClient::NotificationFromClient(notification),
207211
None,
212+
None,
208213
)
209214
.await?;
210215
Ok(())
@@ -220,9 +225,9 @@ pub trait McpClient: Sync + Send {
220225
/// # Returns
221226
/// A `SdkResult` containing the `rust_mcp_schema::Result` if the request is successful.
222227
/// If the request or conversion fails, an error is returned.
223-
async fn ping(&self) -> SdkResult<rust_mcp_schema::Result> {
228+
async fn ping(&self, timeout: Option<Duration>) -> SdkResult<rust_mcp_schema::Result> {
224229
let ping_request = PingRequest::new(None);
225-
let response = self.request(ping_request.into()).await?;
230+
let response = self.request(ping_request.into(), timeout).await?;
226231
Ok(response.try_into()?)
227232
}
228233

@@ -231,13 +236,13 @@ pub trait McpClient: Sync + Send {
231236
params: CompleteRequestParams,
232237
) -> SdkResult<rust_mcp_schema::CompleteResult> {
233238
let request = CompleteRequest::new(params);
234-
let response = self.request(request.into()).await?;
239+
let response = self.request(request.into(), None).await?;
235240
Ok(response.try_into()?)
236241
}
237242

238243
async fn set_logging_level(&self, level: LoggingLevel) -> SdkResult<rust_mcp_schema::Result> {
239244
let request = SetLevelRequest::new(SetLevelRequestParams { level });
240-
let response = self.request(request.into()).await?;
245+
let response = self.request(request.into(), None).await?;
241246
Ok(response.try_into()?)
242247
}
243248

@@ -246,7 +251,7 @@ pub trait McpClient: Sync + Send {
246251
params: GetPromptRequestParams,
247252
) -> SdkResult<rust_mcp_schema::GetPromptResult> {
248253
let request = GetPromptRequest::new(params);
249-
let response = self.request(request.into()).await?;
254+
let response = self.request(request.into(), None).await?;
250255
Ok(response.try_into()?)
251256
}
252257

@@ -255,7 +260,7 @@ pub trait McpClient: Sync + Send {
255260
params: Option<ListPromptsRequestParams>,
256261
) -> SdkResult<rust_mcp_schema::ListPromptsResult> {
257262
let request = ListPromptsRequest::new(params);
258-
let response = self.request(request.into()).await?;
263+
let response = self.request(request.into(), None).await?;
259264
Ok(response.try_into()?)
260265
}
261266

@@ -269,7 +274,7 @@ pub trait McpClient: Sync + Send {
269274
// that excepts an empty params to be passed (like server-everything)
270275
let request =
271276
ListResourcesRequest::new(params.or(Some(ListResourcesRequestParams::default())));
272-
let response = self.request(request.into()).await?;
277+
let response = self.request(request.into(), None).await?;
273278
Ok(response.try_into()?)
274279
}
275280

@@ -278,7 +283,7 @@ pub trait McpClient: Sync + Send {
278283
params: Option<ListResourceTemplatesRequestParams>,
279284
) -> SdkResult<rust_mcp_schema::ListResourceTemplatesResult> {
280285
let request = ListResourceTemplatesRequest::new(params);
281-
let response = self.request(request.into()).await?;
286+
let response = self.request(request.into(), None).await?;
282287
Ok(response.try_into()?)
283288
}
284289

@@ -287,7 +292,7 @@ pub trait McpClient: Sync + Send {
287292
params: ReadResourceRequestParams,
288293
) -> SdkResult<rust_mcp_schema::ReadResourceResult> {
289294
let request = ReadResourceRequest::new(params);
290-
let response = self.request(request.into()).await?;
295+
let response = self.request(request.into(), None).await?;
291296
Ok(response.try_into()?)
292297
}
293298

@@ -296,7 +301,7 @@ pub trait McpClient: Sync + Send {
296301
params: SubscribeRequestParams,
297302
) -> SdkResult<rust_mcp_schema::Result> {
298303
let request = SubscribeRequest::new(params);
299-
let response = self.request(request.into()).await?;
304+
let response = self.request(request.into(), None).await?;
300305
Ok(response.try_into()?)
301306
}
302307

@@ -305,13 +310,13 @@ pub trait McpClient: Sync + Send {
305310
params: UnsubscribeRequestParams,
306311
) -> SdkResult<rust_mcp_schema::Result> {
307312
let request = UnsubscribeRequest::new(params);
308-
let response = self.request(request.into()).await?;
313+
let response = self.request(request.into(), None).await?;
309314
Ok(response.try_into()?)
310315
}
311316

312317
async fn call_tool(&self, params: CallToolRequestParams) -> SdkResult<CallToolResult> {
313318
let request = CallToolRequest::new(params);
314-
let response = self.request(request.into()).await?;
319+
let response = self.request(request.into(), None).await?;
315320
Ok(response.try_into()?)
316321
}
317322

@@ -320,7 +325,7 @@ pub trait McpClient: Sync + Send {
320325
params: Option<ListToolsRequestParams>,
321326
) -> SdkResult<rust_mcp_schema::ListToolsResult> {
322327
let request = ListToolsRequest::new(params);
323-
let response = self.request(request.into()).await?;
328+
let response = self.request(request.into(), None).await?;
324329
Ok(response.try_into()?)
325330
}
326331

crates/rust-mcp-sdk/src/mcp_traits/mcp_server.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use async_trait::async_trait;
24
use rust_mcp_schema::{
35
schema_utils::{
@@ -62,14 +64,18 @@ pub trait McpServer: Sync + Send {
6264
/// This function sends a `RequestFromServer` message to the client, waits for the response,
6365
/// and handles the result. If the response is empty or of an invalid type, an error is returned.
6466
/// Otherwise, it returns the result from the client.
65-
async fn request(&self, request: RequestFromServer) -> SdkResult<ResultFromClient> {
67+
async fn request(
68+
&self,
69+
request: RequestFromServer,
70+
timeout: Option<Duration>,
71+
) -> SdkResult<ResultFromClient> {
6672
let sender = self.sender().await;
6773
let sender = sender.read().await;
6874
let sender = sender.as_ref().unwrap();
6975

7076
// Send the request and receive the response.
7177
let response = sender
72-
.send(MessageFromServer::RequestFromServer(request), None)
78+
.send(MessageFromServer::RequestFromServer(request), None, timeout)
7379
.await?;
7480
let client_message = response.ok_or_else(|| {
7581
RpcError::internal_error()
@@ -95,6 +101,7 @@ pub trait McpServer: Sync + Send {
95101
.send(
96102
MessageFromServer::NotificationFromServer(notification),
97103
None,
104+
None,
98105
)
99106
.await?;
100107
Ok(())
@@ -110,7 +117,7 @@ pub trait McpServer: Sync + Send {
110117
params: Option<ListRootsRequestParams>,
111118
) -> SdkResult<ListRootsResult> {
112119
let request: ListRootsRequest = ListRootsRequest::new(params);
113-
let response = self.request(request.into()).await?;
120+
let response = self.request(request.into(), None).await?;
114121
ListRootsResult::try_from(response).map_err(|err| err.into())
115122
}
116123

@@ -178,9 +185,9 @@ pub trait McpServer: Sync + Send {
178185
/// # Returns
179186
/// A `SdkResult` containing the `rust_mcp_schema::Result` if the request is successful.
180187
/// If the request or conversion fails, an error is returned.
181-
async fn ping(&self) -> SdkResult<rust_mcp_schema::Result> {
188+
async fn ping(&self, timeout: Option<Duration>) -> SdkResult<rust_mcp_schema::Result> {
182189
let ping_request = PingRequest::new(None);
183-
let response = self.request(ping_request.into()).await?;
190+
let response = self.request(ping_request.into(), timeout).await?;
184191
Ok(response.try_into()?)
185192
}
186193

@@ -194,7 +201,7 @@ pub trait McpServer: Sync + Send {
194201
params: CreateMessageRequestParams,
195202
) -> SdkResult<CreateMessageResult> {
196203
let ping_request = CreateMessageRequest::new(params);
197-
let response = self.request(ping_request.into()).await?;
204+
let response = self.request(ping_request.into(), None).await?;
198205
Ok(response.try_into()?)
199206
}
200207

crates/rust-mcp-transport/src/mcp_stream.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
collections::HashMap,
1010
pin::Pin,
1111
sync::{atomic::AtomicI64, Arc},
12+
time::Duration,
1213
};
1314
use tokio::{
1415
io::{AsyncBufReadExt, BufReader},
@@ -34,7 +35,7 @@ impl MCPStream {
3435
writable: Mutex<Pin<Box<dyn tokio::io::AsyncWrite + Send + Sync>>>,
3536
error_io: IoStream,
3637
pending_requests: Arc<Mutex<HashMap<RequestId, tokio::sync::oneshot::Sender<R>>>>,
37-
timeout_msec: u64,
38+
request_timeout: Duration,
3839
shutdown_rx: Receiver<bool>,
3940
) -> (
4041
Pin<Box<dyn Stream<Item = R> + Send>>,
@@ -62,7 +63,7 @@ impl MCPStream {
6263
pending_requests,
6364
writable,
6465
Arc::new(AtomicI64::new(0)),
65-
timeout_msec,
66+
request_timeout,
6667
);
6768

6869
(stream, sender, error_io)

crates/rust-mcp-transport/src/message_dispatcher.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub struct MessageDispatcher<R> {
2828
pending_requests: Arc<Mutex<HashMap<RequestId, oneshot::Sender<R>>>>,
2929
writable_std: Mutex<Pin<Box<dyn tokio::io::AsyncWrite + Send + Sync>>>,
3030
message_id_counter: Arc<AtomicI64>,
31-
timeout_msec: u64,
31+
request_timeout: Duration,
3232
}
3333

3434
impl<R> MessageDispatcher<R> {
@@ -38,21 +38,21 @@ impl<R> MessageDispatcher<R> {
3838
/// * `pending_requests` - A thread-safe map for storing pending request IDs and their response channels.
3939
/// * `writable_std` - A mutex-protected, pinned writer (e.g., stdout) for sending serialized messages.
4040
/// * `message_id_counter` - An atomic counter for generating unique request IDs.
41-
/// * `timeout_msec` - The timeout duration in milliseconds for awaiting responses.
41+
/// * `request_timeout` - The timeout duration in milliseconds for awaiting responses.
4242
///
4343
/// # Returns
4444
/// A new `MessageDispatcher` instance configured for MCP message handling.
4545
pub fn new(
4646
pending_requests: Arc<Mutex<HashMap<RequestId, oneshot::Sender<R>>>>,
4747
writable_std: Mutex<Pin<Box<dyn tokio::io::AsyncWrite + Send + Sync>>>,
4848
message_id_counter: Arc<AtomicI64>,
49-
timeout_msec: u64,
49+
request_timeout: Duration,
5050
) -> Self {
5151
Self {
5252
pending_requests,
5353
writable_std,
5454
message_id_counter,
55-
timeout_msec,
55+
request_timeout,
5656
}
5757
}
5858

@@ -112,6 +112,7 @@ impl McpDispatch<ServerMessage, MessageFromClient> for MessageDispatcher<ServerM
112112
&self,
113113
message: MessageFromClient,
114114
request_id: Option<RequestId>,
115+
request_timeout: Option<Duration>,
115116
) -> TransportResult<Option<ServerMessage>> {
116117
let mut writable_std = self.writable_std.lock().await;
117118

@@ -148,7 +149,7 @@ impl McpDispatch<ServerMessage, MessageFromClient> for MessageDispatcher<ServerM
148149

149150
if let Some(rx) = rx_response {
150151
// Wait for the response with timeout
151-
match await_timeout(rx, Duration::from_millis(self.timeout_msec)).await {
152+
match await_timeout(rx, request_timeout.unwrap_or(self.request_timeout)).await {
152153
Ok(response) => Ok(Some(response)),
153154
Err(error) => match error {
154155
TransportError::OneshotRecvError(_) => {
@@ -185,6 +186,7 @@ impl McpDispatch<ClientMessage, MessageFromServer> for MessageDispatcher<ClientM
185186
&self,
186187
message: MessageFromServer,
187188
request_id: Option<RequestId>,
189+
request_timeout: Option<Duration>,
188190
) -> TransportResult<Option<ClientMessage>> {
189191
let mut writable_std = self.writable_std.lock().await;
190192

@@ -220,7 +222,7 @@ impl McpDispatch<ClientMessage, MessageFromServer> for MessageDispatcher<ClientM
220222
writable_std.flush().await?;
221223

222224
if let Some(rx) = rx_response {
223-
match await_timeout(rx, Duration::from_millis(self.timeout_msec)).await {
225+
match await_timeout(rx, request_timeout.unwrap_or(self.request_timeout)).await {
224226
Ok(response) => Ok(Some(response)),
225227
Err(error) => Err(error),
226228
}

crates/rust-mcp-transport/src/transport.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::pin::Pin;
1+
use std::{pin::Pin, time::Duration};
22

33
use async_trait::async_trait;
44
use rust_mcp_schema::{schema_utils::McpMessage, RequestId};
@@ -29,12 +29,12 @@ pub struct TransportOptions {
2929
///
3030
/// This value defines the maximum amount of time to wait for a response before
3131
/// considering the request as timed out.
32-
pub timeout: u64,
32+
pub timeout: Duration,
3333
}
3434
impl Default for TransportOptions {
3535
fn default() -> Self {
3636
Self {
37-
timeout: DEFAULT_TIMEOUT_MSEC,
37+
timeout: Duration::from_millis(DEFAULT_TIMEOUT_MSEC),
3838
}
3939
}
4040
}
@@ -84,7 +84,12 @@ where
8484
/// Sends a raw message represented by type `S` and optionally includes a `request_id`.
8585
/// The `request_id` is used when sending a message in response to an MCP request.
8686
/// It should match the `request_id` of the original request.
87-
async fn send(&self, message: S, request_id: Option<RequestId>) -> TransportResult<Option<R>>;
87+
async fn send(
88+
&self,
89+
message: S,
90+
request_id: Option<RequestId>,
91+
request_timeout: Option<Duration>,
92+
) -> TransportResult<Option<R>>;
8893
}
8994

9095
/// A trait representing the transport layer for MCP.

examples/simple-mcp-client-core/src/inquiry_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ impl InquiryUtils {
204204
for ping_index in 1..=max_pings {
205205
print!("Ping the server ({} out of {})...", ping_index, max_pings);
206206
std::io::stdout().flush().unwrap();
207-
let ping_result = self.client.ping().await;
207+
let ping_result = self.client.ping(None).await;
208208
print!(
209209
"\rPing the server ({} out of {}) : {}",
210210
ping_index,

examples/simple-mcp-client/src/inquiry_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ impl InquiryUtils {
204204
for ping_index in 1..=max_pings {
205205
print!("Ping the server ({} out of {})...", ping_index, max_pings);
206206
std::io::stdout().flush().unwrap();
207-
let ping_result = self.client.ping().await;
207+
let ping_result = self.client.ping(None).await;
208208
print!(
209209
"\rPing the server ({} out of {}) : {}",
210210
ping_index,

0 commit comments

Comments
 (0)