Skip to content

Commit 2715ab8

Browse files
committed
fix: handle concurrent sendMessage calls in StdioServerTransportProvider
1 parent 46bacda commit 2715ab8

File tree

2 files changed

+45
-6
lines changed

2 files changed

+45
-6
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,8 @@ public StdioMcpSessionTransport() {
147147
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
148148

149149
return Mono.zip(inboundReady.asMono(), outboundReady.asMono()).then(Mono.defer(() -> {
150-
if (outboundSink.tryEmitNext(message).isSuccess()) {
151-
return Mono.empty();
152-
}
153-
else {
154-
return Mono.error(new RuntimeException("Failed to enqueue message"));
155-
}
150+
outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(java.time.Duration.ofSeconds(1)));
151+
return Mono.<Void>empty();
156152
}));
157153
}
158154

mcp-test/src/test/java/io/modelcontextprotocol/server/transport/StdioServerTransportProviderTests.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,4 +220,47 @@ void shouldHandleSessionClose() throws Exception {
220220
verify(mockSession).closeGracefully();
221221
}
222222

223+
@Test
224+
void shouldHandleConcurrentMessages() throws Exception {
225+
java.io.PipedOutputStream pipedOut = new java.io.PipedOutputStream();
226+
java.io.PipedInputStream pipedIn = new java.io.PipedInputStream(pipedOut);
227+
228+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
229+
transportProvider = new StdioServerTransportProvider(McpJsonDefaults.getMapper(), pipedIn, outputStream);
230+
231+
McpServerSession.Factory realSessionFactory = transport -> {
232+
McpServerSession session = mock(McpServerSession.class);
233+
when(session.handle(any())).thenAnswer(invocation -> {
234+
McpSchema.JSONRPCMessage incomingMessage = invocation.getArgument(0);
235+
// Simulate async tool call processing with a delay
236+
return Mono.delay(java.time.Duration.ofMillis(50))
237+
.then(transport.sendMessage(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION,
238+
((McpSchema.JSONRPCRequest) incomingMessage).id(), Map.of("result", "ok"), null)));
239+
});
240+
when(session.closeGracefully()).thenReturn(Mono.empty());
241+
return session;
242+
};
243+
244+
// Set session factory
245+
transportProvider.setSessionFactory(realSessionFactory);
246+
247+
String jsonMessage1 = "{\"jsonrpc\":\"2.0\",\"method\":\"test1\",\"params\":{},\"id\":1}\n";
248+
String jsonMessage2 = "{\"jsonrpc\":\"2.0\",\"method\":\"test2\",\"params\":{},\"id\":2}\n";
249+
pipedOut.write(jsonMessage1.getBytes(StandardCharsets.UTF_8));
250+
pipedOut.write(jsonMessage2.getBytes(StandardCharsets.UTF_8));
251+
pipedOut.flush();
252+
253+
// Verify both concurrent responses complete without error
254+
StepVerifier
255+
.create(Mono.delay(java.time.Duration.ofSeconds(2))
256+
.then(Mono.fromCallable(() -> outputStream.toString(StandardCharsets.UTF_8))))
257+
.assertNext(output -> {
258+
assertThat(output).contains("\"id\":1");
259+
assertThat(output).contains("\"id\":2");
260+
})
261+
.verifyComplete();
262+
263+
pipedOut.close();
264+
}
265+
223266
}

0 commit comments

Comments
 (0)