Skip to content

WIP Streamable HTTP #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

WIP Streamable HTTP #1

wants to merge 1 commit into from

Conversation

ZachGerman
Copy link
Owner

@ZachGerman ZachGerman commented May 29, 2025

Summary by Sourcery

Introduce full support for the Streamable HTTP transport specification in the MCP protocol by adding new server and client transport classes, auto-detection logic, utility enhancements, and comprehensive test coverage. Ensure compatibility with existing HTTP+SSE transport via extended FlowSseClient.

New Features:

  • Implement Streamable HTTP transport for MCP with server provider and client transport implementations
  • Add automatic transport detection via McpTransportDetector to choose between Streamable HTTP and HTTP+SSE

Enhancements:

  • Extend FlowSseClient.subscribe to support both HTTP+SSE and Streamable HTTP modes
  • Add URI resolution overload in Utils for base URI string and path
  • Increase test logging verbosity from INFO to DEBUG via logback config

Tests:

  • Add extensive unit and integration tests for StreamableHttpServerTransportProvider, StreamableHttpClientTransport, and transport detection

Copy link

sourcery-ai bot commented May 29, 2025

Reviewer's Guide

This PR adds full support for the Streamable HTTP transport in MCP by extending the SSE client to conditionally include session headers, enhancing utilities, updating logging levels, and introducing dedicated Streamable HTTP transport implementations for both server and client (including a transport detector), along with comprehensive unit and integration tests.

File-Level Changes

Change Details Files
Extended FlowSseClient.subscribe to support Streamable HTTP with session headers
  • Added mcpSessionId parameter and conditional header checks for Accept, Content-Type, and MCP-Session-Id
  • Retained legacy HTTP+SSE behavior when sessionId is null
  • Updated HttpClientSseClientTransport.subscribe call to invoke new overload with null sessionId
FlowSseClient.java
HttpClientSseClientTransport.java
Enhanced utilities
  • Imported concurrency and time classes
  • Added resolveUri(String baseUri, String path) overload
Utils.java
Relaxed logback test configuration to DEBUG
  • Changed logger levels for main, client, spec packages and root logger to DEBUG
logback.xml
Introduced StreamableHttpServerTransportProvider
  • Implemented doGet/doPost/doDelete with session management and SSE streams
  • Managed concurrent sessions and event streams with buffering and replay
  • Provided inner transport and SSE stream classes and builder pattern
StreamableHttpServerTransportProvider.java
Added StreamableHttpClientTransport implementation
  • Built builder for baseUri, endpoint, client and request customization
  • Implemented connect, sendMessage, setupSseConnection, and closeGracefully with correct header handling and DELETE termination
  • Handled JSON and SSE responses according to status codes
StreamableHttpClientTransport.java
Created McpTransportDetector factory
  • Probed server via initialize POST to detect Streamable HTTP support
  • Returned StreamableHttpClientTransport or HttpClientSseClientTransport accordingly
McpTransportDetector.java
Defined Streamable HTTP client spec constants
  • Added REQUIRED_HEADERS, REQUIRED_ACCEPTED_CONTENT, and standard header names/values
McpStreamableHttpClient.java
Added unit tests for server transport provider
  • Verified notifyClients, graceful close, and CRUD HTTP methods behavior
  • Mocked sessions and SSE streams to test event sending and error handling
StreamableHttpServerTransportProviderTests.java
Added integration tests for server provider
  • Bootstrapped embedded Tomcat with provider
  • Executed direct HTTP initialize and tools calls to validate end-to-end flow
StreamableHttpServerTransportProviderIntegrationTests.java
Added client transport integration tests
  • Tested async and sync clients with Testcontainers against a running streamable HTTP server
  • Confirmed transport auto-configuration and message exchange
StreamableHttpMcpAsyncClientTests.java
StreamableHttpMcpSyncClientTests.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ZachGerman - I've reviewed your changes - here's some feedback:

  • FlowSseClient.subscribe rebuilds the HttpRequest repeatedly to inspect headers; consider building it once (or extracting the headers first) to avoid repetitive builds and improve performance.
  • StreamableHttpServerTransportProvider has grown very large and mixes servlet logic, session management, and SSE streaming; refactor by extracting common functionality (header validation, SSE setup, session handling) into smaller helper classes to improve maintainability.
  • Header validation and addition logic is duplicated across client and server transports; centralize this into a shared utility or builder method to reduce duplication and ensure consistency.
Here's what I looked at during the review
  • 🟡 General issues: 2 issues found
  • 🟢 Security: all looks good
  • 🟡 Testing: 7 issues found
  • 🟡 Complexity: 3 issues found
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.


private final Sinks.Many<SseEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer();

private final Map<String, SseEvent> eventHistory = new ConcurrentHashMap<>();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): eventHistory may grow unbounded and leak memory

Consider limiting the size of eventHistory or removing old entries after replay to avoid memory leaks in long-running streams.

* HTTP+SSE transport
* @throws IOException If an error occurs during transport detection
*/
private static boolean detectStreamableHttpSupport(String serverUrl) throws IOException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Detection POST may create a server session that is never cleaned up

Issue a DELETE request to remove any session created during detection to prevent stale sessions.

Suggested implementation:

	private static boolean detectStreamableHttpSupport(String serverUrl) throws IOException {
		HttpClient httpClient = HttpClient.newHttpClient();
		FlowSseClient sseClient = new FlowSseClient(httpClient);

		// Create an initialize request to test with
		String initializeRequest = """
				{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}},"id":1}
				""";

		String sessionId = null;

		try {
			// Try POST to the server URL (Streamable HTTP)
			HttpRequest request = HttpRequest.newBuilder()
				.uri(URI.create(serverUrl))
			HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
			if (response.statusCode() == 200) {
				// Check for session id in response headers or body
				// Try to extract session id from response headers (e.g., Location or Set-Cookie)
				String location = response.headers().firstValue("Location").orElse(null);
				if (location != null && location.contains("/session/")) {
					// Example: .../session/{sessionId}
					int idx = location.lastIndexOf("/session/");
					if (idx != -1) {
						sessionId = location.substring(idx + "/session/".length());
					}
				}
				// Optionally, try to extract from body if protocol returns it there
				// (Add more extraction logic as needed)

				return true;
			}
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new IOException("Interrupted during transport detection", e);
		} finally {
			// Clean up session if one was created
			if (sessionId != null) {
				try {
					HttpRequest deleteRequest = HttpRequest.newBuilder()
						.DELETE()
						.uri(URI.create(serverUrl.endsWith("/") ? serverUrl + "session/" + sessionId : serverUrl + "/session/" + sessionId))
						.build();
					httpClient.send(deleteRequest, HttpResponse.BodyHandlers.discarding());
				} catch (Exception cleanupEx) {
					// Log or ignore cleanup errors
				}
			}
		}
  • If your server returns the session ID in a different way (e.g., in the response body or a different header), you may need to adjust the extraction logic accordingly.
  • If the session endpoint path is different, update the DELETE URI construction as needed.
  • Consider logging cleanup failures if you have a logging framework.

/**
* Unit tests for {@link StreamableHttpServerTransportProvider}.
*/
class StreamableHttpServerTransportProviderTests {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider testing behavior when server is shutting down

Add tests to ensure methods like doGet and doPost return SC_SERVICE_UNAVAILABLE when isClosing is true.

}

@Test
void shouldHandlePostRequestForInitialize() throws IOException, ServletException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Enhance POST request tests with header and body error conditions

Please add tests for missing or incorrect Accept headers (expecting SC_BAD_REQUEST) and for malformed JSON bodies to verify error handling.

}

@Test
void shouldHandleGetRequest() throws IOException, ServletException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add test for GET request with Last-Event-ID header

Please add a test that sets the Last-Event-ID header and verifies replayEventsAfter is called with the expected value.

Suggested implementation:

	@Test
	void shouldHandleGetRequestWithLastEventIdHeader() throws IOException, ServletException {
		HttpServletRequest request = mock(HttpServletRequest.class);
		HttpServletResponse response = mock(HttpServletResponse.class);
		StreamableHttpServerTransportProvider provider = spy(new StreamableHttpServerTransportProvider());
		String sessionId = "test-session-id";
		String lastEventId = "42";

		when(request.getMethod()).thenReturn("GET");
		when(request.getRequestURI()).thenReturn("/mcp");
		when(request.getHeader("Accept")).thenReturn("text/event-stream");
		when(request.getHeader(StreamableHttpServerTransportProvider.SESSION_ID_HEADER)).thenReturn(sessionId);
		when(request.getHeader("Last-Event-ID")).thenReturn(lastEventId);
		when(request.getHeaderNames()).thenReturn(Collections.enumeration(Collections.emptyList()));

		PrintWriter writer = new PrintWriter(new StringWriter());
		when(response.getWriter()).thenReturn(writer);

		// Act
		provider.handleRequest(request, response);

		// Assert
		verify(provider).replayEventsAfter(eq(sessionId), eq(lastEventId), any(), any());
	}

	@Test

		StringWriter stringWriter = new StringWriter();
		PrintWriter writer = new PrintWriter(stringWriter);

		when(request.getRequestURI()).thenReturn("/mcp");
		when(request.getHeader("Accept")).thenReturn("application/json, text/event-stream");
		when(request.getHeader(StreamableHttpServerTransportProvider.SESSION_ID_HEADER)).thenReturn(null);
		when(request.getHeaderNames()).thenReturn(Collections.enumeration(Collections.emptyList()));
		String initializeRequest = "{\"jsonrpc\":\"2.0\",\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"2024-11-05\",\"capabilities\":{},\"clientInfo\":{\"name\":\"test-client\",\"version\":\"1.0.0\"}},\"id\":1}";
		when(request.getReader()).thenReturn(new java.io.BufferedReader(new java.io.StringReader(initializeRequest)));
		when(response.getWriter()).thenReturn(writer);
  • Ensure that StreamableHttpServerTransportProvider and its replayEventsAfter method are accessible and can be spied/mocked as shown.
  • If the method signature for replayEventsAfter is different, adjust the arguments in the verify call accordingly.
  • If the test class does not already use Mockito's spy, mock, or any, ensure the necessary static imports are present.

}

@Test
void testInitialize() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Assert InitializeResult structure and Mcp-Session-Id header in testInitialize

Add assertions to check that the response body deserializes to an InitializeResult with expected fields, and that the Mcp-Session-Id header is present and non-empty.

Suggested implementation:

	@Test
	void testInitialize() {
		System.out.println("Starting testInitialize");

		try {
			URL url = new URL("http://localhost:8080/initialize");
			HttpURLConnection connection = (HttpURLConnection) url.openConnection();
			connection.setRequestMethod("POST");
			connection.setDoOutput(true);
			connection.setRequestProperty("Content-Type", "application/json");
			connection.setConnectTimeout(5000);
			connection.setReadTimeout(5000);

			// Write request body if needed
			// OutputStream os = connection.getOutputStream();
			// os.write(...);
			// os.flush();
			// os.close();

			int responseCode = connection.getResponseCode();
			assertEquals(200, responseCode);

			// Read response body
			BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));
			StringBuilder response = new StringBuilder();
			String line;
			while ((line = in.readLine()) != null) {
				response.append(line);
			}
			in.close();

			// Deserialize response to InitializeResult
			com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
			InitializeResult result = objectMapper.readValue(response.toString(), InitializeResult.class);

			// Assert expected fields in InitializeResult (adjust as needed)
			assertNotNull(result);
			// Example: assertNotNull(result.getSessionId());
			// Example: assertEquals("expectedValue", result.getSomeField());

			// Assert Mcp-Session-Id header is present and non-empty
			String mcpSessionId = connection.getHeaderField("Mcp-Session-Id");
			assertNotNull(mcpSessionId, "Mcp-Session-Id header should be present");
			assertFalse(mcpSessionId.isEmpty(), "Mcp-Session-Id header should not be empty");

		} catch (Exception e) {
			fail("Exception during testInitialize: " + e.getMessage());
		}
  • Ensure that the InitializeResult class is imported at the top of the file.
  • Adjust the assertions for InitializeResult fields to match the actual fields and expected values in your implementation.
  • If you use a different JSON library, adjust the deserialization code accordingly.

Comment on lines +172 to +175
@Test
void testToolCallSuccess() {
System.out.println("Starting testToolCallSuccess");

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Assert tools list response content in testToolCallSuccess

Add assertions to verify that the response body from the tools/list request includes the expected tool details, such as 'tool1' with its description and schema.

Suggested change
@Test
void testToolCallSuccess() {
System.out.println("Starting testToolCallSuccess");
@Test
void testToolCallSuccess() {
System.out.println("Starting testToolCallSuccess");
try {
// Prepare the request to /tools/list
URL url = new URL("http://localhost:8080/tools/list");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Accept", "application/json");
int responseCode = conn.getResponseCode();
assertEquals(200, responseCode, "Expected HTTP 200 from /tools/list");
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
StringBuilder response = new StringBuilder();
String inputLine;
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
String responseBody = response.toString();
System.out.println("tools/list response: " + responseBody);
// Assert that the response contains expected tool details
assertTrue(responseBody.contains("\"name\":\"tool1\""), "Response should contain tool1 name");
assertTrue(responseBody.contains("\"description\":\"A test tool\""), "Response should contain tool1 description");
assertTrue(responseBody.contains("\"schema\""), "Response should contain tool1 schema");
conn.disconnect();
} catch (Exception e) {
e.printStackTrace();
fail("Exception during testToolCallSuccess: " + e.getMessage());
}

.GET()
.build();
.GET();
if (mcpSessionId != null) { // Using StreamableHTTP Transport
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring header-setting logic to build the request only twice and apply missing headers in a single loop.

// 1. Copy & set the URI/GET only once
HttpRequest.Builder rb = this.requestBuilder.copy()
    .uri(URI.create(url))
    .header("Cache-Control", "no-cache")
    .GET();

// 2. Snapshot existing header names in one build()
Set<String> existing = rb.build().headers().map().keySet();

// 3. Prepare the defaults per transport
List<Map.Entry<String,String>> defaults;
if (mcpSessionId != null) {
    defaults = List.of(
      Map.entry(ACCEPT_HEADER_NAME,      ACCEPT_HEADER_VALUE),
      Map.entry(CONTENT_TYPE_HEADER_NAME, CONTENT_TYPE_HEADER_VALUE),
      Map.entry(MCP_SESSION_ID_HEADER_NAME, mcpSessionId)
    );
} else {
    defaults = List.of(
      Map.entry(ACCEPT_HEADER_NAME, "text/event-stream")
    );
}

// 4. Apply only the missing headers
for (var hdr : defaults) {
  if (!existing.contains(hdr.getKey())) {
    rb.header(hdr.getKey(), hdr.getValue());
  }
}

// 5. Final build
HttpRequest request = rb.build();

Benefits:

  • Builds the request only twice (once for header-snapshot, once final) instead of N times.
  • Flattens nested if-blocks into a single loop over defaults.
  • Maintains identical header logic and ordering.

* </ul>
*
*/
@WebServlet(asyncSupported = true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring the servlet by extracting repeated logic for JSON error responses, header logging, and SSE setup into dedicated helper methods or classes.

The core servlet is still doing three very different things (HTTP dispatch, header-&­error handling, SSE wiring) with nearly identical blocks in both doGet and doPost. You can collapse each into small reusable methods (and even move SSE wiring into its own helper/manager class) without touching any of the behavior you just added.

1) Extract JSON-error sending

// replace each block like:
response.setContentType(APPLICATION_JSON);
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
response.getWriter().write(createErrorJson("foo"));
// …with:

private void sendJsonError(HttpServletResponse resp, int status, String msg) throws IOException {
  resp.setContentType(APPLICATION_JSON);
  resp.setStatus(status);
  resp.getWriter().write(createErrorJson(msg));
}

// usage:
sendJsonError(response, SC_BAD_REQUEST, "Accept header must include text/event-stream");

2) Extract header-logging

// at top of doGet/doPost:
logAllHeaders(request);

// new helper:
private void logAllHeaders(HttpServletRequest req) {
  Enumeration<String> names = req.getHeaderNames();
  while (names.hasMoreElements()) {
    String n = names.nextElement();
    logger.debug("Header: {}={}", n, req.getHeader(n));
  }
}

3) Extract SSE wiring into one method

// in both doGet() and doPost() instead of repeating ~20 lines:
startSse(request, response, sessionId, lastEventId);

// new helper:
private void startSse(HttpServletRequest req,
                      HttpServletResponse resp,
                      String sessionId,
                      @Nullable String lastEventId) throws IOException {
  resp.setContentType(TEXT_EVENT_STREAM);
  resp.setCharacterEncoding(UTF_8);
  resp.setHeader("Cache-Control", "no-cache");
  resp.setHeader("Connection", "keep-alive");
  resp.setHeader(SESSION_ID_HEADER, sessionId);

  AsyncContext asyncCtx = req.startAsync();
  asyncCtx.setTimeout(0);

  StreamableHttpSseStream stream = getOrCreateSseStream(sessionId);
  if (lastEventId != null) stream.replayEventsAfter(lastEventId);

  PrintWriter w = resp.getWriter();
  Flux<SseEvent> flux = stream.getEventFlux();
  flux.subscribe(
    event -> writeEvent(w, event, asyncCtx),
    err   -> finish(asyncCtx),
    ()    -> finish(asyncCtx)
  );
}

private void writeEvent(PrintWriter w, SseEvent e, AsyncContext ctx) {
  try {
    if (e.id()    != null) w.write("id: "    + e.id()    + "\n");
    if (e.event() != null) w.write("event: " + e.event() + "\n");
    w.write("data: " + e.data() + "\n\n");
    w.flush();
    if (w.checkError()) throw new IOException("Client disconnected");
  } catch (IOException ex) {
    logger.debug("SSE write error: {}", ex.getMessage());
    ctx.complete();
  }
}

private void finish(AsyncContext ctx) {
  try { ctx.getResponse().getWriter().close(); } catch (IOException ignored) {}
  ctx.complete();
}

By pulling out these three areas, your doGet/doPost/doDelete all become a couple of if checks plus calls to small focused helpers—drastically flattening nesting and removing duplication while preserving every bit of new behavior.

* </ul>
*
*/
public class StreamableHttpClientTransport implements McpClientTransport {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting repeated header setup, response handling, and SSE event logic into dedicated helper methods to simplify the class structure.

Here are a few low-risk refactorings that will pull repeated logic out of this 550+ line class without changing behavior:

1) Extract common header setup into a single helper.  
   Replace all these blocks:
   ```java
   if (!builder.build().headers().map().containsKey("Accept")) {
     builder.header("Accept", "application/json, text/event-stream");
   }
   if (!builder.build().headers().map().containsKey("Content-Type")) {
     builder.header("Content-Type", "application/json");
   }
   String sid = sessionId.get();
   if (sid != null && !builder.build().headers().map().containsKey(SESSION_ID_HEADER)) {
     builder.header(SESSION_ID_HEADER, sid);
   }

with:

private HttpRequest.Builder applyDefaultHeaders(HttpRequest.Builder b) {
  if (!b.build().headers().map().containsKey(ACCEPT)) {
    b.header(ACCEPT, APPLICATION_JSON + ", " + TEXT_EVENT_STREAM);
  }
  if (!b.build().headers().map().containsKey(CONTENT_TYPE)) {
    b.header(CONTENT_TYPE, APPLICATION_JSON);
  }
  String sid = sessionId.get();
  if (sid != null) {
    b.header(SESSION_ID_HEADER, sid);
  }
  return b;
}

And then in sendMessage(...):

HttpRequest.Builder builder = this.requestBuilder.copy()
    .uri(requestUri)
    .POST(HttpRequest.BodyPublishers.ofString(jsonText));
applyDefaultHeaders(builder);
HttpRequest request = builder.build();
  1. Pull response‐handling out of the inline thenAccept(...) into a private method:

    private void processServerResponse(HttpResponse<String> resp) {
      int status = resp.statusCode();
      Optional<String> newSid = resp.headers().firstValue(SESSION_ID_HEADER);
      newSid.filter(id -> sessionId.get() == null).ifPresent(id -> {
        sessionId.set(id);
        sessionLatch.countDown();
        setupSseConnection();
      });
      switch (status) {
        case 202 -> logger.debug("Server accepted the message");
        case 200 -> handle200(resp);
        default -> {
          if (status == 404 && sessionId.get() != null) {
            logger.warn("Session expired");
            sessionId.set(null);
          } else if (status >= 400) {
            logger.error("Error sending message: {} - {}", status, resp.body());
          }
        }
      }
    }
    private void handle200(HttpResponse<String> resp) { /* existing JSON vs SSE logic */ }

    Then your sendMessage becomes:

    return Mono.fromFuture(
      httpClient
        .sendAsync(request, BodyHandlers.ofString())
        .thenAccept(this::processServerResponse)
    );
  2. Move SSE‐event handling into its own small method:

    private void handleSseEvent(SseEvent event) {
      if (isClosing || !MESSAGE_EVENT_TYPE.equals(event.type())) return;
      Optional.ofNullable(event.id()).ifPresent(lastEventId::set);
      try {
        JSONRPCMessage msg = McpSchema.deserializeJsonRpcMessage(objectMapper, event.data());
        messageHandler.get()
          .apply(Mono.just(msg))
          .doOnError(e -> logger.error("Error processing SSE message", e))
          .subscribe();
      } catch (IOException e) {
        logger.error("Error processing SSE event", e);
      }
    }

    And subscribe with:

    sseClient.subscribe(getUri.toString(), sid, new SseEventHandler() {
      @Override public void onEvent(SseEvent e)   { handleSseEvent(e); }
      @Override public void onError(Throwable t) { /* unchanged reconnect logic */ }
    });

These small extractions keep every bit of functionality intact but collapse ~30 lines of duplicated/deeply-nested code into 3 helpers of ~10 lines each, reducing cognitive load and making each piece easier to test.


/**
* Implementation of the MCP Streamable HTTP transport for clients. This implementation
* follows the Streamable HTTP transport specification from protocol version 2024-11-05.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2025-03-26?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q comments. Will revise all prior to publishing. Thanks for the heads up!

URI getUri = Utils.resolveUri(this.baseUri, this.mcpEndpoint);
HttpRequest.Builder getBuilder = HttpRequest.newBuilder(getUri)
.GET()
.header("Accept", "application/json, text/event-stream")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this particular request should only have Accept: text/event-stream?

https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#listening-for-messages-from-the-server

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! Thank you!

HttpRequest.Builder getBuilder = HttpRequest.newBuilder(getUri)
.GET()
.header("Accept", "application/json, text/event-stream")
.header("Content-Type", "application/json");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this request should not have any Content-Type specified? Shouldn't be an issue with servers on the TS SDK either, I think.

GET requests should not have any body payload, so it doesn't make sense to have a Content-Type, either.

/**
* Implementation of the MCP Streamable HTTP transport provider for servers. This
* implementation follows the Streamable HTTP transport specification from protocol
* version 2024-11-05.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong version

isInitializeRequest = true;
// For initialize requests, create a new session if one doesn't exist
if (sessionId == null) {
sessionId = UUID.randomUUID().toString();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an option to provide a consumer-defined SessionIdProvider or something here. This is a problem on the existing SSE transport too, but essentially: on a remote MCP server, I think it's probably possible to hijack sessions by just generating UUIDs until one hits.

Session IDs should be cryptographically secure, and it may not be desirable to rely on the SDK's implementation.

The TS SDK does this, for reference (uses UUID for a default convenience implementation here, too).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add this to my list of items to tackle!

Comment on lines +25 to +30
@SuppressWarnings("resource")
GenericContainer<?> container = new GenericContainer<>("mcp/everything:latest")
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withExposedPorts(3001)
.withCommand("/bin/sh", "-c",
"npm install -g @modelcontextprotocol/server-everything@latest && npx --node-options=\"--inspect\" @modelcontextprotocol/server-everything streamableHttp")
.waitingFor(Wait.forHttp("/").forStatusCode(404));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mcp/everything is the official image, so we should probably just bug someone to either update it or set up CI/CD instead of doing this.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*/
public class StreamableHttpSseStream {

private final Sinks.Many<SseEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's not a particular reason for this to be multicast, it should be unicast, I think. There will only ever be one stream subscriber handling flushing data back to the client on a stream.

}

/** ["Origin","Accept"] */
public static final List<String> REQUIRED_HEADERS = List.of("Origin", "Accept");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Origin shouldn't be required, here. Even though you can technically use it in a non-web context like this, I don't think it really makes sense to do so.

String newSessionId = response.headers().firstValue(SESSION_ID_HEADER).orElse(null);
if (newSessionId != null && sessionId.get() == null) {
sessionId.set(newSessionId);
sessionLatch.countDown();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this get used? Nothing appears to be awaiting this.

private final String mcpEndpoint;

/** Map of active client sessions, keyed by session ID */
private final Map<String, McpServerSession> sessions = new ConcurrentHashMap<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should pull this out to a new SessionManager that consumers can override to enable serverless sessions (also used in the SSE transport, same deal there).


private final Sinks.Many<SseEvent> eventSink = Sinks.many().multicast().onBackpressureBuffer();

private final Map<String, SseEvent> eventHistory = new ConcurrentHashMap<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably pull this out to an overridable EventStore for serverless usage, too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants