Skip to content

Commit 2f94434

Browse files
authored
feat: WebClient Streamable HTTP support (#292)
An implementation of Streamable HTTP Client with WebFlux WebClient. Aside from implementing the specification, several improvements have been incorporated throughout the client-side of the architecture. The changes cover: - resilience tests using toxiproxy in testcontainers - integration tests using updated everything-server with streamableHttp support - improved logging - session invalidation handling (both transport session and JSON-RPC concept of session) - implicit initialization and burst protection (in case of concurrent `Mcp(Sync|Async)Client` use - more logging, e.g. stdio process lifecycle logs Related #72, #273, #253, #107, #105 Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
1 parent 8a5a591 commit 2f94434

31 files changed

+1501
-242
lines changed

mcp-spring/mcp-spring-webflux/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@
9999
<version>${testcontainers.version}</version>
100100
<scope>test</scope>
101101
</dependency>
102+
<dependency>
103+
<groupId>org.testcontainers</groupId>
104+
<artifactId>toxiproxy</artifactId>
105+
<version>${toxiproxy.version}</version>
106+
<scope>test</scope>
107+
</dependency>
102108

103109
<dependency>
104110
<groupId>org.awaitility</groupId>

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 520 additions & 0 deletions
Large diffs are not rendered by default.

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe
190190
*/
191191
@Override
192192
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
193+
// TODO: Avoid eager connection opening and enable resilience
194+
// -> upon disconnects, re-establish connection
195+
// -> allow optimizing for eager connection start using a constructor flag
193196
Flux<ServerSentEvent<String>> events = eventStream();
194197
this.inboundSubscription = events.concatMap(event -> Mono.just(event).<JSONRPCMessage>handle((e, s) -> {
195198
if (ENDPOINT_EVENT_TYPE.equals(event.event())) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.modelcontextprotocol.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
5+
import io.modelcontextprotocol.spec.McpClientTransport;
6+
import org.junit.jupiter.api.Timeout;
7+
import org.springframework.web.reactive.function.client.WebClient;
8+
9+
@Timeout(15)
10+
public class WebClientStreamableHttpAsyncClientResiliencyTests extends AbstractMcpAsyncClientResiliencyTests {
11+
12+
@Override
13+
protected McpClientTransport createMcpTransport() {
14+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
15+
}
16+
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.modelcontextprotocol.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
5+
import io.modelcontextprotocol.spec.McpClientTransport;
6+
import org.junit.jupiter.api.Timeout;
7+
import org.springframework.web.reactive.function.client.WebClient;
8+
import org.testcontainers.containers.GenericContainer;
9+
import org.testcontainers.containers.wait.strategy.Wait;
10+
import org.testcontainers.images.builder.ImageFromDockerfile;
11+
12+
@Timeout(15)
13+
public class WebClientStreamableHttpAsyncClientTests extends AbstractMcpAsyncClientTests {
14+
15+
static String host = "http://localhost:3001";
16+
17+
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
18+
@SuppressWarnings("resource")
19+
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
20+
.withCommand("node dist/index.js streamableHttp")
21+
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
22+
.withExposedPorts(3001)
23+
.waitingFor(Wait.forHttp("/").forStatusCode(404));
24+
25+
@Override
26+
protected McpClientTransport createMcpTransport() {
27+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
28+
}
29+
30+
@Override
31+
protected void onStart() {
32+
container.start();
33+
int port = container.getMappedPort(3001);
34+
host = "http://" + container.getHost() + ":" + port;
35+
}
36+
37+
@Override
38+
public void onClose() {
39+
container.stop();
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.modelcontextprotocol.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
5+
import io.modelcontextprotocol.spec.McpClientTransport;
6+
import org.junit.jupiter.api.Timeout;
7+
import org.springframework.web.reactive.function.client.WebClient;
8+
import org.testcontainers.containers.GenericContainer;
9+
import org.testcontainers.containers.wait.strategy.Wait;
10+
11+
@Timeout(15)
12+
public class WebClientStreamableHttpSyncClientTests extends AbstractMcpSyncClientTests {
13+
14+
static String host = "http://localhost:3001";
15+
16+
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
17+
@SuppressWarnings("resource")
18+
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
19+
.withCommand("node dist/index.js streamableHttp")
20+
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
21+
.withExposedPorts(3001)
22+
.waitingFor(Wait.forHttp("/").forStatusCode(404));
23+
24+
@Override
25+
protected McpClientTransport createMcpTransport() {
26+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
27+
}
28+
29+
@Override
30+
protected void onStart() {
31+
container.start();
32+
int port = container.getMappedPort(3001);
33+
host = "http://" + container.getHost() + ":" + port;
34+
}
35+
36+
@Override
37+
public void onClose() {
38+
container.stop();
39+
}
40+
41+
}

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ class WebFluxSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests {
2626

2727
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
2828
@SuppressWarnings("resource")
29-
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1")
29+
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
30+
.withCommand("node dist/index.js sse")
3031
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
3132
.withExposedPorts(3001)
3233
.waitingFor(Wait.forHttp("/").forStatusCode(404));

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpSyncClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ class WebFluxSseMcpSyncClientTests extends AbstractMcpSyncClientTests {
2626

2727
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
2828
@SuppressWarnings("resource")
29-
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1")
29+
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
30+
.withCommand("node dist/index.js sse")
3031
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
3132
.withExposedPorts(3001)
3233
.waitingFor(Wait.forHttp("/").forStatusCode(404));

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ class WebFluxSseClientTransportTests {
4141
static String host = "http://localhost:3001";
4242

4343
@SuppressWarnings("resource")
44-
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1")
44+
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
45+
.withCommand("node dist/index.js sse")
4546
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
4647
.withExposedPorts(3001)
4748
.waitingFor(Wait.forHttp("/").forStatusCode(404));

mcp-spring/mcp-spring-webflux/src/test/resources/logback.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99
</appender>
1010

1111
<!-- Main MCP package -->
12-
<logger name="org.springframework.ai.mcp" level="INFO"/>
12+
<logger name="io.modelcontextprotocol" level="INFO"/>
1313

1414
<!-- Client packages -->
15-
<logger name="org.springframework.ai.mcp.client" level="INFO"/>
16-
15+
<logger name="io.modelcontextprotocol.client" level="INFO"/>
16+
1717
<!-- Spec package -->
18-
<logger name="org.springframework.ai.mcp.spec" level="INFO"/>
18+
<logger name="io.modelcontextprotocol.spec" level="INFO"/>
1919

2020

2121
<!-- Root logger -->

mcp-test/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@
6868
<artifactId>junit-jupiter</artifactId>
6969
<version>${testcontainers.version}</version>
7070
</dependency>
71+
<dependency>
72+
<groupId>org.testcontainers</groupId>
73+
<artifactId>toxiproxy</artifactId>
74+
<version>${toxiproxy.version}</version>
75+
</dependency>
7176

7277
<dependency>
7378
<groupId>org.awaitility</groupId>

0 commit comments

Comments
 (0)