-
Notifications
You must be signed in to change notification settings - Fork 0
WIP Streamable HTTP #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR adds full support for the Streamable HTTP transport in MCP by extending the SSE client to conditionally include session headers, enhancing utilities, updating logging levels, and introducing dedicated Streamable HTTP transport implementations for both server and client (including a transport detector), along with comprehensive unit and integration tests. File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @ZachGerman - I've reviewed your changes - here's some feedback:
- FlowSseClient.subscribe rebuilds the HttpRequest repeatedly to inspect headers; consider building it once (or extracting the headers first) to avoid repetitive builds and improve performance.
- StreamableHttpServerTransportProvider has grown very large and mixes servlet logic, session management, and SSE streaming; refactor by extracting common functionality (header validation, SSE setup, session handling) into smaller helper classes to improve maintainability.
- Header validation and addition logic is duplicated across client and server transports; centralize this into a shared utility or builder method to reduce duplication and ensure consistency.
Here's what I looked at during the review
- 🟡 General issues: 2 issues found
- 🟢 Security: all looks good
- 🟡 Testing: 7 issues found
- 🟡 Complexity: 3 issues found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
||
private final Sinks.Many<SseEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer(); | ||
|
||
private final Map<String, SseEvent> eventHistory = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): eventHistory may grow unbounded and leak memory
Consider limiting the size of eventHistory or removing old entries after replay to avoid memory leaks in long-running streams.
* HTTP+SSE transport | ||
* @throws IOException If an error occurs during transport detection | ||
*/ | ||
private static boolean detectStreamableHttpSupport(String serverUrl) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Detection POST may create a server session that is never cleaned up
Issue a DELETE request to remove any session created during detection to prevent stale sessions.
Suggested implementation:
private static boolean detectStreamableHttpSupport(String serverUrl) throws IOException {
HttpClient httpClient = HttpClient.newHttpClient();
FlowSseClient sseClient = new FlowSseClient(httpClient);
// Create an initialize request to test with
String initializeRequest = """
{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}},"id":1}
""";
String sessionId = null;
try {
// Try POST to the server URL (Streamable HTTP)
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(serverUrl))
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
// Check for session id in response headers or body
// Try to extract session id from response headers (e.g., Location or Set-Cookie)
String location = response.headers().firstValue("Location").orElse(null);
if (location != null && location.contains("/session/")) {
// Example: .../session/{sessionId}
int idx = location.lastIndexOf("/session/");
if (idx != -1) {
sessionId = location.substring(idx + "/session/".length());
}
}
// Optionally, try to extract from body if protocol returns it there
// (Add more extraction logic as needed)
return true;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during transport detection", e);
} finally {
// Clean up session if one was created
if (sessionId != null) {
try {
HttpRequest deleteRequest = HttpRequest.newBuilder()
.DELETE()
.uri(URI.create(serverUrl.endsWith("/") ? serverUrl + "session/" + sessionId : serverUrl + "/session/" + sessionId))
.build();
httpClient.send(deleteRequest, HttpResponse.BodyHandlers.discarding());
} catch (Exception cleanupEx) {
// Log or ignore cleanup errors
}
}
}
- If your server returns the session ID in a different way (e.g., in the response body or a different header), you may need to adjust the extraction logic accordingly.
- If the session endpoint path is different, update the DELETE URI construction as needed.
- Consider logging cleanup failures if you have a logging framework.
/** | ||
* Unit tests for {@link StreamableHttpServerTransportProvider}. | ||
*/ | ||
class StreamableHttpServerTransportProviderTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Consider testing behavior when server is shutting down
Add tests to ensure methods like doGet
and doPost
return SC_SERVICE_UNAVAILABLE
when isClosing
is true.
} | ||
|
||
@Test | ||
void shouldHandlePostRequestForInitialize() throws IOException, ServletException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Enhance POST request tests with header and body error conditions
Please add tests for missing or incorrect Accept
headers (expecting SC_BAD_REQUEST
) and for malformed JSON bodies to verify error handling.
} | ||
|
||
@Test | ||
void shouldHandleGetRequest() throws IOException, ServletException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Add test for GET request with Last-Event-ID header
Please add a test that sets the Last-Event-ID
header and verifies replayEventsAfter
is called with the expected value.
Suggested implementation:
@Test
void shouldHandleGetRequestWithLastEventIdHeader() throws IOException, ServletException {
HttpServletRequest request = mock(HttpServletRequest.class);
HttpServletResponse response = mock(HttpServletResponse.class);
StreamableHttpServerTransportProvider provider = spy(new StreamableHttpServerTransportProvider());
String sessionId = "test-session-id";
String lastEventId = "42";
when(request.getMethod()).thenReturn("GET");
when(request.getRequestURI()).thenReturn("/mcp");
when(request.getHeader("Accept")).thenReturn("text/event-stream");
when(request.getHeader(StreamableHttpServerTransportProvider.SESSION_ID_HEADER)).thenReturn(sessionId);
when(request.getHeader("Last-Event-ID")).thenReturn(lastEventId);
when(request.getHeaderNames()).thenReturn(Collections.enumeration(Collections.emptyList()));
PrintWriter writer = new PrintWriter(new StringWriter());
when(response.getWriter()).thenReturn(writer);
// Act
provider.handleRequest(request, response);
// Assert
verify(provider).replayEventsAfter(eq(sessionId), eq(lastEventId), any(), any());
}
@Test
StringWriter stringWriter = new StringWriter();
PrintWriter writer = new PrintWriter(stringWriter);
when(request.getRequestURI()).thenReturn("/mcp");
when(request.getHeader("Accept")).thenReturn("application/json, text/event-stream");
when(request.getHeader(StreamableHttpServerTransportProvider.SESSION_ID_HEADER)).thenReturn(null);
when(request.getHeaderNames()).thenReturn(Collections.enumeration(Collections.emptyList()));
String initializeRequest = "{\"jsonrpc\":\"2.0\",\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"2024-11-05\",\"capabilities\":{},\"clientInfo\":{\"name\":\"test-client\",\"version\":\"1.0.0\"}},\"id\":1}";
when(request.getReader()).thenReturn(new java.io.BufferedReader(new java.io.StringReader(initializeRequest)));
when(response.getWriter()).thenReturn(writer);
- Ensure that
StreamableHttpServerTransportProvider
and itsreplayEventsAfter
method are accessible and can be spied/mocked as shown. - If the method signature for
replayEventsAfter
is different, adjust the arguments in theverify
call accordingly. - If the test class does not already use Mockito's
spy
,mock
, orany
, ensure the necessary static imports are present.
} | ||
|
||
@Test | ||
void testInitialize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Assert InitializeResult structure and Mcp-Session-Id header in testInitialize
Add assertions to check that the response body deserializes to an InitializeResult
with expected fields, and that the Mcp-Session-Id
header is present and non-empty.
Suggested implementation:
@Test
void testInitialize() {
System.out.println("Starting testInitialize");
try {
URL url = new URL("http://localhost:8080/initialize");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", "application/json");
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);
// Write request body if needed
// OutputStream os = connection.getOutputStream();
// os.write(...);
// os.flush();
// os.close();
int responseCode = connection.getResponseCode();
assertEquals(200, responseCode);
// Read response body
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));
StringBuilder response = new StringBuilder();
String line;
while ((line = in.readLine()) != null) {
response.append(line);
}
in.close();
// Deserialize response to InitializeResult
com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
InitializeResult result = objectMapper.readValue(response.toString(), InitializeResult.class);
// Assert expected fields in InitializeResult (adjust as needed)
assertNotNull(result);
// Example: assertNotNull(result.getSessionId());
// Example: assertEquals("expectedValue", result.getSomeField());
// Assert Mcp-Session-Id header is present and non-empty
String mcpSessionId = connection.getHeaderField("Mcp-Session-Id");
assertNotNull(mcpSessionId, "Mcp-Session-Id header should be present");
assertFalse(mcpSessionId.isEmpty(), "Mcp-Session-Id header should not be empty");
} catch (Exception e) {
fail("Exception during testInitialize: " + e.getMessage());
}
- Ensure that the
InitializeResult
class is imported at the top of the file. - Adjust the assertions for
InitializeResult
fields to match the actual fields and expected values in your implementation. - If you use a different JSON library, adjust the deserialization code accordingly.
@Test | ||
void testToolCallSuccess() { | ||
System.out.println("Starting testToolCallSuccess"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Assert tools list response content in testToolCallSuccess
Add assertions to verify that the response body from the tools/list
request includes the expected tool details, such as 'tool1' with its description and schema.
@Test | |
void testToolCallSuccess() { | |
System.out.println("Starting testToolCallSuccess"); | |
@Test | |
void testToolCallSuccess() { | |
System.out.println("Starting testToolCallSuccess"); | |
try { | |
// Prepare the request to /tools/list | |
URL url = new URL("http://localhost:8080/tools/list"); | |
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); | |
conn.setRequestMethod("GET"); | |
conn.setRequestProperty("Accept", "application/json"); | |
int responseCode = conn.getResponseCode(); | |
assertEquals(200, responseCode, "Expected HTTP 200 from /tools/list"); | |
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8)); | |
StringBuilder response = new StringBuilder(); | |
String inputLine; | |
while ((inputLine = in.readLine()) != null) { | |
response.append(inputLine); | |
} | |
in.close(); | |
String responseBody = response.toString(); | |
System.out.println("tools/list response: " + responseBody); | |
// Assert that the response contains expected tool details | |
assertTrue(responseBody.contains("\"name\":\"tool1\""), "Response should contain tool1 name"); | |
assertTrue(responseBody.contains("\"description\":\"A test tool\""), "Response should contain tool1 description"); | |
assertTrue(responseBody.contains("\"schema\""), "Response should contain tool1 schema"); | |
conn.disconnect(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
fail("Exception during testToolCallSuccess: " + e.getMessage()); | |
} |
.GET() | ||
.build(); | ||
.GET(); | ||
if (mcpSessionId != null) { // Using StreamableHTTP Transport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider refactoring header-setting logic to build the request only twice and apply missing headers in a single loop.
// 1. Copy & set the URI/GET only once
HttpRequest.Builder rb = this.requestBuilder.copy()
.uri(URI.create(url))
.header("Cache-Control", "no-cache")
.GET();
// 2. Snapshot existing header names in one build()
Set<String> existing = rb.build().headers().map().keySet();
// 3. Prepare the defaults per transport
List<Map.Entry<String,String>> defaults;
if (mcpSessionId != null) {
defaults = List.of(
Map.entry(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE),
Map.entry(CONTENT_TYPE_HEADER_NAME, CONTENT_TYPE_HEADER_VALUE),
Map.entry(MCP_SESSION_ID_HEADER_NAME, mcpSessionId)
);
} else {
defaults = List.of(
Map.entry(ACCEPT_HEADER_NAME, "text/event-stream")
);
}
// 4. Apply only the missing headers
for (var hdr : defaults) {
if (!existing.contains(hdr.getKey())) {
rb.header(hdr.getKey(), hdr.getValue());
}
}
// 5. Final build
HttpRequest request = rb.build();
Benefits:
- Builds the request only twice (once for header-snapshot, once final) instead of N times.
- Flattens nested
if
-blocks into a single loop over defaults. - Maintains identical header logic and ordering.
* </ul> | ||
* | ||
*/ | ||
@WebServlet(asyncSupported = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider refactoring the servlet by extracting repeated logic for JSON error responses, header logging, and SSE setup into dedicated helper methods or classes.
The core servlet is still doing three very different things (HTTP dispatch, header-&error handling, SSE wiring) with nearly identical blocks in both doGet and doPost. You can collapse each into small reusable methods (and even move SSE wiring into its own helper/manager class) without touching any of the behavior you just added.
1) Extract JSON-error sending
// replace each block like:
response.setContentType(APPLICATION_JSON);
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
response.getWriter().write(createErrorJson("foo"));
// …with:
private void sendJsonError(HttpServletResponse resp, int status, String msg) throws IOException {
resp.setContentType(APPLICATION_JSON);
resp.setStatus(status);
resp.getWriter().write(createErrorJson(msg));
}
// usage:
sendJsonError(response, SC_BAD_REQUEST, "Accept header must include text/event-stream");
2) Extract header-logging
// at top of doGet/doPost:
logAllHeaders(request);
// new helper:
private void logAllHeaders(HttpServletRequest req) {
Enumeration<String> names = req.getHeaderNames();
while (names.hasMoreElements()) {
String n = names.nextElement();
logger.debug("Header: {}={}", n, req.getHeader(n));
}
}
3) Extract SSE wiring into one method
// in both doGet() and doPost() instead of repeating ~20 lines:
startSse(request, response, sessionId, lastEventId);
// new helper:
private void startSse(HttpServletRequest req,
HttpServletResponse resp,
String sessionId,
@Nullable String lastEventId) throws IOException {
resp.setContentType(TEXT_EVENT_STREAM);
resp.setCharacterEncoding(UTF_8);
resp.setHeader("Cache-Control", "no-cache");
resp.setHeader("Connection", "keep-alive");
resp.setHeader(SESSION_ID_HEADER, sessionId);
AsyncContext asyncCtx = req.startAsync();
asyncCtx.setTimeout(0);
StreamableHttpSseStream stream = getOrCreateSseStream(sessionId);
if (lastEventId != null) stream.replayEventsAfter(lastEventId);
PrintWriter w = resp.getWriter();
Flux<SseEvent> flux = stream.getEventFlux();
flux.subscribe(
event -> writeEvent(w, event, asyncCtx),
err -> finish(asyncCtx),
() -> finish(asyncCtx)
);
}
private void writeEvent(PrintWriter w, SseEvent e, AsyncContext ctx) {
try {
if (e.id() != null) w.write("id: " + e.id() + "\n");
if (e.event() != null) w.write("event: " + e.event() + "\n");
w.write("data: " + e.data() + "\n\n");
w.flush();
if (w.checkError()) throw new IOException("Client disconnected");
} catch (IOException ex) {
logger.debug("SSE write error: {}", ex.getMessage());
ctx.complete();
}
}
private void finish(AsyncContext ctx) {
try { ctx.getResponse().getWriter().close(); } catch (IOException ignored) {}
ctx.complete();
}
By pulling out these three areas, your doGet/doPost/doDelete all become a couple of if
checks plus calls to small focused helpers—drastically flattening nesting and removing duplication while preserving every bit of new behavior.
* </ul> | ||
* | ||
*/ | ||
public class StreamableHttpClientTransport implements McpClientTransport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider extracting repeated header setup, response handling, and SSE event logic into dedicated helper methods to simplify the class structure.
Here are a few low-risk refactorings that will pull repeated logic out of this 550+ line class without changing behavior:
1) Extract common header setup into a single helper.
Replace all these blocks:
```java
if (!builder.build().headers().map().containsKey("Accept")) {
builder.header("Accept", "application/json, text/event-stream");
}
if (!builder.build().headers().map().containsKey("Content-Type")) {
builder.header("Content-Type", "application/json");
}
String sid = sessionId.get();
if (sid != null && !builder.build().headers().map().containsKey(SESSION_ID_HEADER)) {
builder.header(SESSION_ID_HEADER, sid);
}
with:
private HttpRequest.Builder applyDefaultHeaders(HttpRequest.Builder b) {
if (!b.build().headers().map().containsKey(ACCEPT)) {
b.header(ACCEPT, APPLICATION_JSON + ", " + TEXT_EVENT_STREAM);
}
if (!b.build().headers().map().containsKey(CONTENT_TYPE)) {
b.header(CONTENT_TYPE, APPLICATION_JSON);
}
String sid = sessionId.get();
if (sid != null) {
b.header(SESSION_ID_HEADER, sid);
}
return b;
}
And then in sendMessage(...)
:
HttpRequest.Builder builder = this.requestBuilder.copy()
.uri(requestUri)
.POST(HttpRequest.BodyPublishers.ofString(jsonText));
applyDefaultHeaders(builder);
HttpRequest request = builder.build();
-
Pull response‐handling out of the inline
thenAccept(...)
into a private method:private void processServerResponse(HttpResponse<String> resp) { int status = resp.statusCode(); Optional<String> newSid = resp.headers().firstValue(SESSION_ID_HEADER); newSid.filter(id -> sessionId.get() == null).ifPresent(id -> { sessionId.set(id); sessionLatch.countDown(); setupSseConnection(); }); switch (status) { case 202 -> logger.debug("Server accepted the message"); case 200 -> handle200(resp); default -> { if (status == 404 && sessionId.get() != null) { logger.warn("Session expired"); sessionId.set(null); } else if (status >= 400) { logger.error("Error sending message: {} - {}", status, resp.body()); } } } } private void handle200(HttpResponse<String> resp) { /* existing JSON vs SSE logic */ }
Then your
sendMessage
becomes:return Mono.fromFuture( httpClient .sendAsync(request, BodyHandlers.ofString()) .thenAccept(this::processServerResponse) );
-
Move SSE‐event handling into its own small method:
private void handleSseEvent(SseEvent event) { if (isClosing || !MESSAGE_EVENT_TYPE.equals(event.type())) return; Optional.ofNullable(event.id()).ifPresent(lastEventId::set); try { JSONRPCMessage msg = McpSchema.deserializeJsonRpcMessage(objectMapper, event.data()); messageHandler.get() .apply(Mono.just(msg)) .doOnError(e -> logger.error("Error processing SSE message", e)) .subscribe(); } catch (IOException e) { logger.error("Error processing SSE event", e); } }
And subscribe with:
sseClient.subscribe(getUri.toString(), sid, new SseEventHandler() { @Override public void onEvent(SseEvent e) { handleSseEvent(e); } @Override public void onError(Throwable t) { /* unchanged reconnect logic */ } });
These small extractions keep every bit of functionality intact but collapse ~30 lines of duplicated/deeply-nested code into 3 helpers of ~10 lines each, reducing cognitive load and making each piece easier to test.
|
||
/** | ||
* Implementation of the MCP Streamable HTTP transport for clients. This implementation | ||
* follows the Streamable HTTP transport specification from protocol version 2024-11-05. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2025-03-26?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q comments. Will revise all prior to publishing. Thanks for the heads up!
URI getUri = Utils.resolveUri(this.baseUri, this.mcpEndpoint); | ||
HttpRequest.Builder getBuilder = HttpRequest.newBuilder(getUri) | ||
.GET() | ||
.header("Accept", "application/json, text/event-stream") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this particular request should only have Accept: text/event-stream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct! Thank you!
HttpRequest.Builder getBuilder = HttpRequest.newBuilder(getUri) | ||
.GET() | ||
.header("Accept", "application/json, text/event-stream") | ||
.header("Content-Type", "application/json"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this request should not have any Content-Type
specified? Shouldn't be an issue with servers on the TS SDK either, I think.
GET
requests should not have any body payload, so it doesn't make sense to have a Content-Type
, either.
/** | ||
* Implementation of the MCP Streamable HTTP transport provider for servers. This | ||
* implementation follows the Streamable HTTP transport specification from protocol | ||
* version 2024-11-05. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong version
isInitializeRequest = true; | ||
// For initialize requests, create a new session if one doesn't exist | ||
if (sessionId == null) { | ||
sessionId = UUID.randomUUID().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an option to provide a consumer-defined SessionIdProvider
or something here. This is a problem on the existing SSE transport too, but essentially: on a remote MCP server, I think it's probably possible to hijack sessions by just generating UUIDs until one hits.
Session IDs should be cryptographically secure, and it may not be desirable to rely on the SDK's implementation.
The TS SDK does this, for reference (uses UUID for a default convenience implementation here, too).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add this to my list of items to tackle!
@SuppressWarnings("resource") | ||
GenericContainer<?> container = new GenericContainer<>("mcp/everything:latest") | ||
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) | ||
.withExposedPorts(3001) | ||
.withCommand("/bin/sh", "-c", | ||
"npm install -g @modelcontextprotocol/server-everything@latest && npx --node-options=\"--inspect\" @modelcontextprotocol/server-everything streamableHttp") | ||
.waitingFor(Wait.forHttp("/").forStatusCode(404)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mcp/everything
is the official image, so we should probably just bug someone to either update it or set up CI/CD instead of doing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
*/ | ||
public class StreamableHttpSseStream { | ||
|
||
private final Sinks.Many<SseEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's not a particular reason for this to be multicast, it should be unicast, I think. There will only ever be one stream subscriber handling flushing data back to the client on a stream.
} | ||
|
||
/** ["Origin","Accept"] */ | ||
public static final List<String> REQUIRED_HEADERS = List.of("Origin", "Accept"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Origin
shouldn't be required, here. Even though you can technically use it in a non-web context like this, I don't think it really makes sense to do so.
String newSessionId = response.headers().firstValue(SESSION_ID_HEADER).orElse(null); | ||
if (newSessionId != null && sessionId.get() == null) { | ||
sessionId.set(newSessionId); | ||
sessionLatch.countDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this get used? Nothing appears to be await
ing this.
private final String mcpEndpoint; | ||
|
||
/** Map of active client sessions, keyed by session ID */ | ||
private final Map<String, McpServerSession> sessions = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should pull this out to a new SessionManager
that consumers can override to enable serverless sessions (also used in the SSE transport, same deal there).
|
||
private final Sinks.Many<SseEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer(); | ||
|
||
private final Map<String, SseEvent> eventHistory = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably pull this out to an overridable EventStore
for serverless usage, too.
5f8e64e
to
006b6f8
Compare
Summary by Sourcery
Introduce full support for the Streamable HTTP transport specification in the MCP protocol by adding new server and client transport classes, auto-detection logic, utility enhancements, and comprehensive test coverage. Ensure compatibility with existing HTTP+SSE transport via extended FlowSseClient.
New Features:
Enhancements:
Tests: