Skip to content

Commit 5ccdfc0

Browse files
authored
feat: add task-based stdio examples (#839)
* feat: add task-based stdio examples * docs: add Tasks section to Chinese README
1 parent d83b156 commit 5ccdfc0

10 files changed

Lines changed: 322 additions & 0 deletions

File tree

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ For the full MCP specification, see [modelcontextprotocol.io](https://modelconte
3131
- [Completions](#completions)
3232
- [Notifications](#notifications)
3333
- [Subscriptions](#subscriptions)
34+
- [Tasks](#tasks-long-running-tool-invocations)
3435
- [Examples](#examples)
3536
- [OAuth Support](#oauth-support)
3637
- [Related Resources](#related-resources)
@@ -954,6 +955,28 @@ impl ClientHandler for MyClient {
954955

955956
---
956957

958+
## Tasks (long-running tool invocations)
959+
960+
`rmcp` supports the [task-based tool invocation](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks)
961+
flow defined in SEP-1319. Annotate a tool with `execution(task_support = "required" | "optional")`
962+
and add `#[task_handler]` to your `ServerHandler` impl — `enqueue_task`, `tasks/list`, `tasks/get`,
963+
`tasks/result`, and `tasks/cancel` are generated for you on top of an `OperationProcessor`.
964+
965+
```rust, ignore
966+
#[tool(
967+
description = "Sum two numbers after a 2-second delay",
968+
execution(task_support = "required")
969+
)]
970+
async fn slow_sum(/* ... */) -> Result<CallToolResult, McpError> { /* ... */ }
971+
972+
#[tool_handler]
973+
#[task_handler]
974+
impl ServerHandler for TaskDemo {}
975+
```
976+
977+
See [`servers_task_stdio`](examples/servers/src/task_stdio.rs) and the matching
978+
[`clients_task_stdio`](examples/clients/src/task_stdio.rs) for a runnable end-to-end example.
979+
957980
## Examples
958981

959982
See [examples](examples/README.md).

docs/readme/README.zh-cn.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
- [补全](#补全)
3232
- [通知](#通知)
3333
- [订阅](#订阅)
34+
- [任务](#任务长时间运行的工具调用)
3435
- [示例](#示例)
3536
- [OAuth 支持](#oauth-支持)
3637
- [相关资源](#相关资源)
@@ -954,6 +955,26 @@ impl ClientHandler for MyClient {
954955

955956
---
956957

958+
## 任务(长时间运行的工具调用)
959+
960+
`rmcp` 支持 SEP-1319 中定义的[基于任务的工具调用](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks)流程。为工具添加 `execution(task_support = "required" | "optional")` 注解,并在 `ServerHandler` 实现上添加 `#[task_handler]` —— `enqueue_task``tasks/list``tasks/get``tasks/result``tasks/cancel` 将在 `OperationProcessor` 之上自动生成。
961+
962+
```rust, ignore
963+
#[tool(
964+
description = "Sum two numbers after a 2-second delay",
965+
execution(task_support = "required")
966+
)]
967+
async fn slow_sum(/* ... */) -> Result<CallToolResult, McpError> { /* ... */ }
968+
969+
#[tool_handler]
970+
#[task_handler]
971+
impl ServerHandler for TaskDemo {}
972+
```
973+
974+
完整的端到端示例请参阅 [`servers_task_stdio`](../../examples/servers/src/task_stdio.rs) 及对应的 [`clients_task_stdio`](../../examples/clients/src/task_stdio.rs)
975+
976+
---
977+
957978
## 示例
958979

959980
查看 [examples](../../examples/README.md)

examples/clients/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,7 @@ path = "src/progress_client.rs"
6161
[[example]]
6262
name = "clients_client_credentials"
6363
path = "src/auth/client_credentials.rs"
64+
65+
[[example]]
66+
name = "clients_task_stdio"
67+
path = "src/task_stdio.rs"

examples/clients/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ A client demonstrating how to use the sampling tool.
5959
- Retrieves server information and list of available tools
6060
- Calls the `ask_llm` tool
6161

62+
### Task Standard I/O Client (`task_stdio.rs`)
63+
64+
A client that exercises the task lifecycle against `servers_task_stdio`
65+
(per [SEP-1319](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks)).
66+
67+
- Spawns `servers_task_stdio` as a child process over stdio
68+
- Calls `quick_echo` synchronously
69+
- Calls `slow_sum` as a task via `CallToolRequestParams::with_task(...)`, polls `tasks/get` until completion, then fetches the result via `tasks/result`
70+
6271
### Progress Test Client (`progress_client.rs`)
6372

6473
A client that communicates with an MCP server using progress notifications.
@@ -91,6 +100,9 @@ cargo run -p mcp-client-examples --example clients_oauth_client
91100

92101
# Run the sampling standard I/O client example
93102
cargo run -p mcp-client-examples --example clients_sampling_stdio
103+
104+
# Run the task-based invocation client (drives servers_task_stdio)
105+
cargo run -p mcp-client-examples --example clients_task_stdio
94106
```
95107

96108
## Dependencies

examples/clients/src/task_stdio.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//! Client for the task-demo server in `examples/servers/src/task_stdio.rs`.
2+
//!
3+
//! Walks through the task lifecycle (SEP-1319):
4+
//! 1. Call a regular tool (`quick_echo`) — synchronous response.
5+
//! 2. Call a task-required tool (`slow_sum`) by attaching `task: {}` to
6+
//! the `tools/call` request. The server returns a `Task` with a `task_id`.
7+
//! 3. Poll `tasks/get` until status becomes `Completed`.
8+
//! 4. Fetch the underlying `CallToolResult` via `tasks/result`.
9+
10+
use anyhow::{Result, anyhow};
11+
use rmcp::{
12+
ServiceExt,
13+
model::{
14+
CallToolRequestParams, CallToolResult, ClientRequest, GetTaskInfoParams,
15+
GetTaskResultParams, JsonObject, Request, ServerResult, TaskStatus,
16+
},
17+
object,
18+
transport::{ConfigureCommandExt, TokioChildProcess},
19+
};
20+
use tokio::process::Command;
21+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
22+
23+
#[tokio::main]
24+
async fn main() -> Result<()> {
25+
tracing_subscriber::registry()
26+
.with(
27+
tracing_subscriber::EnvFilter::try_from_default_env()
28+
.unwrap_or_else(|_| format!("info,{}=debug", env!("CARGO_CRATE_NAME")).into()),
29+
)
30+
.with(tracing_subscriber::fmt::layer())
31+
.init();
32+
33+
// Spawn the task-demo server as a child process over stdio.
34+
let client = ()
35+
.serve(TokioChildProcess::new(Command::new("cargo").configure(
36+
|cmd| {
37+
cmd.arg("run")
38+
.arg("-q")
39+
.arg("-p")
40+
.arg("mcp-server-examples")
41+
.arg("--example")
42+
.arg("servers_task_stdio");
43+
},
44+
))?)
45+
.await?;
46+
47+
// 1) Synchronous call. `quick_echo` has the default task_support = forbidden.
48+
let echo = client
49+
.call_tool(
50+
CallToolRequestParams::new("quick_echo")
51+
.with_arguments(object!({ "message": "hi from rmcp" })),
52+
)
53+
.await?;
54+
tracing::info!("quick_echo -> {echo:#?}");
55+
56+
// 2) Task call. `slow_sum` is task_support = required, so we MUST attach a
57+
// `task` object. An empty object is fine — clients can stash arbitrary
58+
// metadata here that the server-side `OperationDescriptor` will keep.
59+
let create = client
60+
.send_request(ClientRequest::CallToolRequest(Request::new(
61+
CallToolRequestParams::new("slow_sum")
62+
.with_arguments(object!({ "a": 40, "b": 2 }))
63+
.with_task(JsonObject::new()),
64+
)))
65+
.await?;
66+
let ServerResult::CreateTaskResult(create) = create else {
67+
return Err(anyhow!("expected CreateTaskResult, got {create:?}"));
68+
};
69+
let task_id = create.task.task_id.clone();
70+
tracing::info!(
71+
"slow_sum enqueued as task {task_id} (status = {:?})",
72+
create.task.status
73+
);
74+
75+
// 3) Poll `tasks/get` until the server reports a terminal status.
76+
let final_status = loop {
77+
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
78+
79+
let info = client
80+
.send_request(ClientRequest::GetTaskInfoRequest(Request::new(
81+
GetTaskInfoParams {
82+
meta: None,
83+
task_id: task_id.clone(),
84+
},
85+
)))
86+
.await?;
87+
let ServerResult::GetTaskResult(info) = info else {
88+
return Err(anyhow!("expected GetTaskResult, got {info:?}"));
89+
};
90+
tracing::info!("status = {:?}", info.task.status);
91+
92+
match info.task.status {
93+
TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled => {
94+
break info.task.status;
95+
}
96+
_ => {}
97+
}
98+
};
99+
100+
if final_status != TaskStatus::Completed {
101+
return Err(anyhow!("task ended in {final_status:?}"));
102+
}
103+
104+
// 4) Fetch the payload. The server-side handler returns a serialized
105+
// `CallToolResult`. On the wire the response is just a JSON value, and
106+
// `ServerResult` is `#[serde(untagged)]`, so the client decodes it as
107+
// whichever variant the JSON shape matches first — a `CallToolResult`
108+
// here. (For a non-tool task the same value would surface as
109+
// `ServerResult::CustomResult` and need manual `serde_json::from_value`.)
110+
let payload = client
111+
.send_request(ClientRequest::GetTaskResultRequest(Request::new(
112+
GetTaskResultParams {
113+
meta: None,
114+
task_id: task_id.clone(),
115+
},
116+
)))
117+
.await?;
118+
let call_result: CallToolResult = match payload {
119+
ServerResult::CallToolResult(r) => r,
120+
ServerResult::CustomResult(c) => serde_json::from_value(c.0)?,
121+
other => return Err(anyhow!("unexpected task result: {other:?}")),
122+
};
123+
tracing::info!("slow_sum result -> {call_result:#?}");
124+
125+
client.cancel().await?;
126+
Ok(())
127+
}

examples/servers/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,7 @@ path = "src/calculator_stdio.rs"
109109
[[example]]
110110
name = "elicitation_enum_select"
111111
path = "src/elicitation_enum_inference.rs"
112+
113+
[[example]]
114+
name = "servers_task_stdio"
115+
path = "src/task_stdio.rs"

examples/servers/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ A server demonstrating the prompt framework capabilities.
6262
- Uses standard I/O transport
6363
- Good example of prompt implementation patterns
6464

65+
### Task Demo Server (`task_stdio.rs`)
66+
67+
A minimal stdio server demonstrating task-based tool invocation per
68+
[SEP-1319](https://modelcontextprotocol.io/specification/2025-11-25/basic/utilities/tasks).
69+
70+
- `slow_sum` is declared with `execution(task_support = "required")`, so clients MUST invoke it as a task
71+
- `quick_echo` is a regular synchronous tool for contrast
72+
- Wires up `enqueue_task` / `tasks/get` / `tasks/result` / `tasks/cancel` via `#[task_handler]`
73+
- Pair with `examples/clients/src/task_stdio.rs` to see the full lifecycle (create → poll → fetch result)
74+
6575
### Progress Demo Server (`progress_demo.rs`)
6676

6777
A server that demonstrates progress notifications during long-running operations.

examples/servers/src/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ pub mod calculator;
22
pub mod counter;
33
pub mod generic_service;
44
pub mod progress_demo;
5+
pub mod task_demo;
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
//! Minimal example of a tool that supports task-based invocation (SEP-1319).
2+
//!
3+
//! - `slow_sum` is marked `task_support = "required"`, so the client MUST invoke
4+
//! it as a task. The server enqueues the call into an `OperationProcessor`,
5+
//! returns a task id immediately, and the client polls `tasks/get` and
6+
//! fetches the payload via `tasks/result`.
7+
//! - `quick_echo` is a regular synchronous tool for contrast (the default,
8+
//! `task_support = "forbidden"`).
9+
//!
10+
//! See `examples/clients/src/task_stdio.rs` for the matching client.
11+
12+
#![allow(dead_code)]
13+
14+
use std::sync::Arc;
15+
16+
use rmcp::{
17+
ErrorData as McpError, ServerHandler,
18+
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
19+
model::{CallToolResult, Content},
20+
schemars, task_handler,
21+
task_manager::OperationProcessor,
22+
tool, tool_handler, tool_router,
23+
};
24+
use tokio::sync::Mutex;
25+
26+
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
27+
pub struct SumArgs {
28+
pub a: i32,
29+
pub b: i32,
30+
}
31+
32+
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
33+
pub struct EchoArgs {
34+
pub message: String,
35+
}
36+
37+
/// Server state. The `processor` field is required by `#[task_handler]`:
38+
/// the macro generates `enqueue_task` / `tasks/*` handlers that submit and
39+
/// poll operations through it.
40+
#[derive(Clone)]
41+
pub struct TaskDemo {
42+
tool_router: ToolRouter<TaskDemo>,
43+
processor: Arc<Mutex<OperationProcessor>>,
44+
}
45+
46+
impl Default for TaskDemo {
47+
fn default() -> Self {
48+
Self::new()
49+
}
50+
}
51+
52+
#[tool_router]
53+
impl TaskDemo {
54+
pub fn new() -> Self {
55+
Self {
56+
tool_router: Self::tool_router(),
57+
processor: Arc::new(Mutex::new(OperationProcessor::new())),
58+
}
59+
}
60+
61+
/// Long-running tool. The `execution(task_support = "required")` attribute
62+
/// tells clients they MUST call this tool as a task; the server returns
63+
/// `-32601` if they don't.
64+
#[tool(
65+
description = "Sum two numbers after a 2-second delay",
66+
execution(task_support = "required")
67+
)]
68+
async fn slow_sum(
69+
&self,
70+
Parameters(SumArgs { a, b }): Parameters<SumArgs>,
71+
) -> Result<CallToolResult, McpError> {
72+
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
73+
Ok(CallToolResult::success(vec![Content::text(
74+
(a + b).to_string(),
75+
)]))
76+
}
77+
78+
/// Synchronous tool with the default `task_support = "forbidden"`.
79+
#[tool(description = "Echo a message back immediately")]
80+
async fn quick_echo(
81+
&self,
82+
Parameters(EchoArgs { message }): Parameters<EchoArgs>,
83+
) -> Result<CallToolResult, McpError> {
84+
Ok(CallToolResult::success(vec![Content::text(message)]))
85+
}
86+
}
87+
88+
/// `#[task_handler]` reads `self.processor` (configurable via the macro's
89+
/// `processor = ...` argument) and synthesizes `enqueue_task`, `list_tasks`,
90+
/// `get_task_info`, `get_task_result`, and `cancel_task` for us.
91+
#[tool_handler]
92+
#[task_handler]
93+
impl ServerHandler for TaskDemo {}

examples/servers/src/task_stdio.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use anyhow::Result;
2+
use common::task_demo::TaskDemo;
3+
use rmcp::{ServiceExt, transport::stdio};
4+
use tracing_subscriber::{self, EnvFilter};
5+
mod common;
6+
7+
/// Stdio server demonstrating task-based tool invocation.
8+
///
9+
/// Run a matching client with:
10+
/// cargo run -p mcp-client-examples --example clients_task_stdio
11+
#[tokio::main]
12+
async fn main() -> Result<()> {
13+
tracing_subscriber::fmt()
14+
.with_env_filter(EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into()))
15+
.with_writer(std::io::stderr)
16+
.with_ansi(false)
17+
.init();
18+
19+
tracing::info!("Starting task-demo MCP server");
20+
21+
let service = TaskDemo::new().serve(stdio()).await.inspect_err(|e| {
22+
tracing::error!("serving error: {e:?}");
23+
})?;
24+
25+
service.waiting().await?;
26+
Ok(())
27+
}

0 commit comments

Comments
 (0)