Skip to content

Commit 4b6201a

Browse files
committed
Attempting to structure the client API with sync and async abstractions
1 parent 5906625 commit 4b6201a

File tree

9 files changed

+521
-230
lines changed

9 files changed

+521
-230
lines changed

src/main/java/spring/ai/mcp/McpApplication.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.fasterxml.jackson.databind.ObjectMapper;
77
import spring.ai.mcp.client.McpClient;
8+
import spring.ai.mcp.client.McpSyncClient;
89
import spring.ai.mcp.client.stdio.StdioServerParameters;
910
import spring.ai.mcp.client.stdio.StdioServerTransport;
1011

@@ -41,7 +42,7 @@ public CommandLineRunner test(ChatClient.Builder chatClientBuilder, List<McpFunc
4142
}
4243

4344
@Bean
44-
public List<McpFunctionCallback> functionCallbacks(McpClient mcpClient) {
45+
public List<McpFunctionCallback> functionCallbacks(McpSyncClient mcpClient) {
4546

4647
return mcpClient.listTools(null)
4748
.tools()
@@ -51,16 +52,17 @@ public List<McpFunctionCallback> functionCallbacks(McpClient mcpClient) {
5152
}
5253

5354
@Bean(destroyMethod = "close")
54-
public McpClient clientSession() {
55+
public McpSyncClient clientSession() {
5556

5657
var stdioParams = StdioServerParameters.builder("npx")
5758
.args("-y", "@modelcontextprotocol/server-filesystem",
58-
"/Users/christiantzolov/Dev/projects/demo/mcp-server/dir")
59+
"dir")
5960
.build();
6061

61-
McpClient mcpClient = null;
62+
McpSyncClient mcpClient = null;
6263
try {
63-
mcpClient = new McpClient(new StdioServerTransport(stdioParams, Duration.ofMillis(100)),
64+
mcpClient = McpClient.sync(new StdioServerTransport(stdioParams,
65+
Duration.ofMillis(100)),
6466
Duration.ofSeconds(10), new ObjectMapper());
6567

6668
var init = mcpClient.initialize();

src/main/java/spring/ai/mcp/McpFunctionCallback.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Map;
1919

2020
import spring.ai.mcp.client.McpClient;
21+
import spring.ai.mcp.client.McpSyncClient;
2122
import spring.ai.mcp.spec.McpSchema.CallToolRequest;
2223
import spring.ai.mcp.spec.McpSchema.CallToolResult;
2324
import spring.ai.mcp.spec.McpSchema.Tool;
@@ -32,11 +33,12 @@
3233

3334
public class McpFunctionCallback implements FunctionCallback {
3435

35-
private final McpClient mcpClient;
36+
// TODO: revisit function calling as well to handle the async case
37+
private final McpSyncClient mcpClient;
3638

3739
private final Tool tool;
3840

39-
public McpFunctionCallback(McpClient clientSession, Tool tool) {
41+
public McpFunctionCallback(McpSyncClient clientSession, Tool tool) {
4042
this.mcpClient = clientSession;
4143
this.tool = tool;
4244
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package spring.ai.mcp.client;
2+
3+
import java.time.Duration;
4+
import java.util.concurrent.CompletableFuture;
5+
6+
import com.fasterxml.jackson.core.type.TypeReference;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import reactor.core.publisher.Mono;
9+
import spring.ai.mcp.spec.McpSchema;
10+
import spring.ai.mcp.spec.McpSession;
11+
import spring.ai.mcp.spec.McpTransport;
12+
13+
public class McpAsyncClient extends McpAsyncSession {
14+
15+
public McpAsyncClient(McpTransport transport) {
16+
this(transport, Duration.ofSeconds(10), new ObjectMapper());
17+
}
18+
19+
public McpAsyncClient(McpTransport transport, Duration requestTimeout, ObjectMapper objectMapper) {
20+
super(requestTimeout, objectMapper, transport);
21+
}
22+
23+
24+
/**
25+
* The initialization phase MUST be the first interaction between client and server.
26+
* During this phase, the client and server:
27+
* <ul>
28+
* <li>Establish protocol version compatibility</li>
29+
* <li>Exchange and negotiate capabilities</li>
30+
* <li>Share implementation details</li>
31+
* </ul>
32+
* <br/>
33+
* The client MUST initiate this phase by sending an initialize request containing:
34+
* <ul>
35+
* <li>The protocol version the client supports</li>
36+
* <li>The client's capabilities</li>
37+
* <li>Client implementation information</li>
38+
* </ul>
39+
*
40+
* The server MUST respond with its own capabilities and information:
41+
* {@link McpSchema.ServerCapabilities}. <br/>
42+
* After successful initialization, the client MUST send an initialized notification
43+
* to indicate it is ready to begin normal operations.
44+
*
45+
* <br/>
46+
*
47+
* <a href=
48+
* "https://github.com/modelcontextprotocol/specification/blob/main/docs/specification/basic/lifecycle.md#initialization">Initialization
49+
* Spec</a>
50+
* @return the initialize result.
51+
*/
52+
public Mono<McpSchema.InitializeResult> initialize() {
53+
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(// @formatter:off
54+
McpSchema.LATEST_PROTOCOL_VERSION,
55+
new McpSchema.ClientCapabilities(null, new McpSchema.ClientCapabilities.RootCapabilities(true), null),
56+
new McpSchema.Implementation("mcp-java-client", "0.0.1")); // @formatter:on
57+
58+
Mono<McpSchema.InitializeResult> result =
59+
this.sendRequest("initialize", initializeRequest, new TypeReference<McpSchema.InitializeResult>() {});
60+
61+
return result.flatMap(initializeResult -> {
62+
if (initializeResult.protocolVersion() != McpSchema.LATEST_PROTOCOL_VERSION) {
63+
return Mono.error(
64+
new McpError("Unsupported protocol version from the server: "
65+
+ initializeResult.protocolVersion()));
66+
} else {
67+
return this.sendNotification("notifications/initialized", null)
68+
.thenReturn(initializeResult);
69+
}
70+
});
71+
}
72+
73+
/**
74+
* Send a roots/list_changed notification.
75+
*/
76+
public Mono<Void> sendRootsListChanged() {
77+
return this.sendNotification("notifications/roots/list_changed");
78+
}
79+
80+
/**
81+
* Send a synchronous ping request.
82+
*/
83+
public Mono<Void> ping() {
84+
return this.sendRequest("ping", null,
85+
new TypeReference<Void>() {});
86+
}
87+
88+
// --------------------------
89+
// Tools
90+
// --------------------------
91+
/**
92+
* Send a tools/call request.
93+
* @param callToolRequest the call tool request.
94+
* @return the call tool result.
95+
*/
96+
public Mono<McpSchema.CallToolResult> callTool(McpSchema.CallToolRequest callToolRequest) {
97+
return this.sendRequest("tools/call", callToolRequest,
98+
new TypeReference<McpSchema.CallToolResult>() {});
99+
}
100+
101+
/**
102+
* Send a tools/list request.
103+
* @return the list of tools result.
104+
*/
105+
public Mono<McpSchema.ListToolsResult> listTools(String cursor) {
106+
return this.sendRequest("tools/list", new McpSchema.PaginatedRequest(cursor),
107+
new TypeReference<McpSchema.ListToolsResult>() {});
108+
}
109+
110+
// --------------------------
111+
// Resources
112+
// --------------------------
113+
114+
/**
115+
* Send a resources/list request.
116+
* @param cursor the cursor
117+
* @return the list of resources result.
118+
*/
119+
public Mono<McpSchema.ListResourcesResult> listResources(String cursor) {
120+
return this.sendRequest("resources/list", new McpSchema.PaginatedRequest(cursor),
121+
new TypeReference<McpSchema.ListResourcesResult>() {});
122+
}
123+
124+
/**
125+
* Send a resources/read request.
126+
* @param resource the resource to read
127+
* @return the resource content.
128+
*/
129+
public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.Resource resource) {
130+
return this.readResource(new McpSchema.ReadResourceRequest(resource.uri()));
131+
}
132+
133+
/**
134+
* Send a resources/read request.
135+
* @param readResourceRequest the read resource request.
136+
* @return the resource content.
137+
*/
138+
public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.ReadResourceRequest readResourceRequest) {
139+
return this.sendRequest("resources/read", readResourceRequest,
140+
new TypeReference<McpSchema.ReadResourceResult>() {});
141+
}
142+
143+
/**
144+
* Resource templates allow servers to expose parameterized resources using URI
145+
* templates. Arguments may be auto-completed through the completion API.
146+
*
147+
* Request a list of resource templates the server has.
148+
* @param cursor the cursor
149+
* @return the list of resource templates result.
150+
*/
151+
public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates(String cursor) {
152+
return this.sendRequest("resources/templates/list", new McpSchema.PaginatedRequest(cursor),
153+
new TypeReference<McpSchema.ListResourceTemplatesResult>() {});
154+
}
155+
156+
/**
157+
* List Changed Notification. When the list of available resources changes, servers
158+
* that declared the listChanged capability SHOULD send a notification:
159+
*/
160+
public Mono<Void> sendResourcesListChanged() {
161+
return this.sendNotification("notifications/resources/list_changed");
162+
}
163+
164+
/**
165+
* Subscriptions. The protocol supports optional subscriptions to resource changes.
166+
* Clients can subscribe to specific resources and receive notifications when they
167+
* change.
168+
*
169+
* Send a resources/subscribe request.
170+
* @param subscribeRequest the subscribe request contains the uri of the resource to
171+
* subscribe to.
172+
*/
173+
public Mono<Void> subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
174+
return this.sendRequest("resources/subscribe", subscribeRequest,
175+
new TypeReference<Void>() {});
176+
}
177+
178+
/**
179+
* Send a resources/unsubscribe request.
180+
* @param unsubscribeRequest the unsubscribe request contains the uri of the resource
181+
* to unsubscribe from.
182+
*/
183+
public Mono<Void> unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
184+
return this.sendRequest("resources/unsubscribe", unsubscribeRequest,
185+
new TypeReference<Void>() {});
186+
}
187+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package spring.ai.mcp.client;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.UUID;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import com.fasterxml.jackson.core.type.TypeReference;
11+
import com.fasterxml.jackson.databind.ObjectMapper;
12+
import reactor.core.publisher.Mono;
13+
import reactor.core.publisher.MonoSink;
14+
import reactor.core.publisher.Sinks;
15+
import spring.ai.mcp.spec.McpSchema;
16+
import spring.ai.mcp.spec.McpSession;
17+
import spring.ai.mcp.spec.McpTransport;
18+
19+
public class McpAsyncSession {
20+
21+
private final ConcurrentHashMap<Object, MonoSink<McpSchema.JSONRPCResponse>> pendingResponses
22+
= new ConcurrentHashMap<>();
23+
24+
private final Duration requestTimeout;
25+
26+
private final ObjectMapper objectMapper;
27+
28+
private final McpTransport transport;
29+
30+
public McpAsyncSession(Duration requestTimeout,
31+
ObjectMapper objectMapper,
32+
McpTransport transport) {
33+
this.requestTimeout = requestTimeout;
34+
this.objectMapper = objectMapper;
35+
this.transport = transport;
36+
37+
this.transport.setMessageHandler(message -> {
38+
switch (message) {
39+
case McpSchema.JSONRPCResponse response -> {
40+
var sink = pendingResponses.remove(response.id());
41+
if (sink == null) {
42+
System.out.println("Unexpected response for unkown id " + response.id());
43+
} else {
44+
sink.success(response);
45+
}
46+
}
47+
case McpSchema.JSONRPCRequest request -> {
48+
System.out.println("Client does not yet support server requests");
49+
}
50+
case McpSchema.JSONRPCNotification notification -> {
51+
System.out.println("Notifications not yet supported");
52+
}
53+
}
54+
});
55+
56+
this.transport.setErrorHandler(error -> System.out.println("Error received " + error));
57+
58+
this.transport.start();
59+
}
60+
61+
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
62+
String requestId = UUID.randomUUID().toString();
63+
64+
return Mono.<McpSchema.JSONRPCResponse>create(sink -> {
65+
this.pendingResponses.put(requestId, sink);
66+
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method, requestId, requestParams);
67+
try {
68+
// TODO convert the transport to non-blocking as well
69+
this.transport.sendMessage(jsonrpcRequest);
70+
} catch (Exception e) {
71+
sink.error(e);
72+
}
73+
})
74+
.timeout(this.requestTimeout)
75+
.handle((jsonRpcResponse, s) -> {
76+
if (jsonRpcResponse.error() != null) {
77+
s.error(new McpError(jsonRpcResponse.error()));
78+
} else {
79+
s.next(this.objectMapper.convertValue(jsonRpcResponse.result(), typeRef));
80+
}
81+
});
82+
}
83+
84+
public static class McpError extends RuntimeException {
85+
86+
public McpError(Object error) {
87+
super(error.toString());
88+
}
89+
}
90+
91+
public Mono<Void> sendNotification(String method) {
92+
return sendNotification(method, null);
93+
}
94+
95+
public Mono<Void> sendNotification(String method, Map<String, Object> params) {
96+
McpSchema.JSONRPCNotification
97+
jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, method, params);
98+
try {
99+
// TODO: make it non-blocking
100+
this.transport.sendMessage(jsonrpcNotification);
101+
}
102+
catch (Exception e) {
103+
return Mono.error(new McpError(e));
104+
}
105+
return Mono.empty();
106+
}
107+
108+
public Mono<Void> closeGracefully(Duration timeout) {
109+
// TODO handle the timeout in transport
110+
return Mono.fromRunnable(this.transport::close);
111+
}
112+
}

0 commit comments

Comments
 (0)