Skip to content

Adding StreamableHttpServerTransportProvider class and unit tests #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ZachGerman
Copy link

@ZachGerman ZachGerman commented Jun 2, 2025

Not planned:

  • Backward-compatible endpoint combo

Motivation and Context

Trying to reach spec parity with TS and Python for Java. Will continue working on other aspects of this.

How Has This Been Tested?

Unit tests and integ tests using the SHTTP web client in mcp-spring.

Breaking Changes

N/A

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey 👋 Thanks for a comprehensive PR! I did my first round focusing on the main themes. Happy to offer guidance to cover the essential aspects (simple/stateful servers, multiple streams per session, lifecycle) if you'd like to push this forward.

AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0); // No timeout

StreamableHttpSseStream sseStream = getOrCreateSseStream(sessionId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the spec is being slightly violated IMO. We should aim to respect the MAY keyword to the best of our ability. The responses should go on a dedicated SSE stream if a stream is to be used, not the one opened initially with GET. The free hanging GET stream is meant for notifications or requests from the server. The SSE events that deal with the originating request should be sent over the stream associated with this request.

Here's some explanation from the specification on POST:

If the server initiates an SSE stream:
...
The server MAY send JSON-RPC requests and notifications before sending a JSON-RPC response.
These messages SHOULD relate to the originating client request.
These requests and notifications MAY be [batched](https://www.jsonrpc.org/specification#batch).
The server SHOULD NOT close the SSE stream before sending a JSON-RPC response
per each received JSON-RPC request, unless the [session](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#session-management) expires

and for GET:

If the server initiates an SSE stream:
The server MAY send JSON-RPC requests and notifications on the stream.
These requests and notifications MAY be [batched](https://www.jsonrpc.org/specification#batch).
These messages SHOULD be unrelated to any concurrently-running JSON-RPC request from the client.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment:
We can probably just add a single stream session for GET requests into the StreamableHttpSession class. I can add that to my list today.

// Create or get SSE stream for this session
StreamableHttpSseStream sseStream = getOrCreateSseStream(sessionId);
if (lastEventId != null) {
sseStream.replayEventsAfter(lastEventId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, in case of resumption, once the final response is streamed, the SSE stream should be closed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either client or server can close it at any time. Should we add a timer of sorts to hold the stream open and have it default to end right after stream completes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we should close the stream on the server side once the Flux containing the json payloads completes. Not a timer though, just giving the user the ability to return a Flux that contains a set of notification/request/response payloads, where the response matching the initial request should be the last message from the Flux, followed by the onComplete signal. The other option would be for the user handling to return a Mono that signifies a non-SSE response directly to the POST message.

String lastEventId = request.getHeader(LAST_EVENT_ID_HEADER);

// Create or get SSE stream for this session
StreamableHttpSseStream sseStream = getOrCreateSseStream(sessionId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resumption needs to happen on a brand new SSE stream, otherwise the client is unable to distinguish between different streams.

@ZachGerman
Copy link
Author

Thank you very much for all of the input @chemicL! I will begin making changes accordingly this afternoon.

@ZachGerman
Copy link
Author

Today I'm targeting origin header validation and moving the dedicated GET stream to the StreamableHttpSession class, then adding an integ test for GET on /mcp to start the listening stream.
After that, I believe we should have all core functionality except sessionless and proper SSE response upgrade logic.

@sivankri
Copy link

Hi @ZachGerman I tried using your StreamableHttpServerTransportProvider.java file + java MCP SDK 0.10.0. My backend server is Jetty 12. The request from MCP Interceptor hangs in the below call.

return streamSession.handle(message).then(Mono.just(responseType)).onErrorReturn(ResponseType.IMMEDIATE);

This is the actuall line which gets blocked
--McpServerSession.java --> return var10000.flatMap(var10001::sendMessage);

Can you please check this?

@tzolov
Copy link
Contributor

tzolov commented Jun 26, 2025

Today I'm targeting origin header validation and moving the dedicated GET stream to the StreamableHttpSession class, then adding an integ test for GET on /mcp to start the listening stream. After that, I believe we should have all core functionality except sessionless and proper SSE response upgrade logic.

@ZachGerman what do you mean by origin header validation? I hope it is not overlapping with the #284

Copy link
Member

@chemicL chemicL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few more comments.

if (sessionId == null) {
response.setContentType(APPLICATION_JSON);
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
response.getWriter().write(createErrorJson("Session ID missing in request header"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modelcontextprotocol/modelcontextprotocol#282 please check this issue. I'm leaning towards separating the JSON-RPC lifecycle from the lower level transport lifecycle. In that world, GET and POST and session concepts are not at the JSON-RPC layer, so the lifecycle does not apply. In fact, the previous SSE transport did begin with a HTTP GET request before the initialization request could have been sent. For the stateless servers it should definitely be improved in the spec to mention that initialization is not required before other requests.

// Subscribe to the SSE stream and write events to the response
sseStream.getEventFlux().doOnNext(event -> {
try {
if (event.id() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but do we emit events with no ID ? Reading the code I don't see a situation like this. My understanding of the mentioned spec is that if the ID is missing then the client would not use this id for Last-Event-ID tracking, but we are on the server side and we always generate an ID.

// Create or get SSE stream for this session
StreamableHttpSseStream sseStream = getOrCreateSseStream(sessionId);
if (lastEventId != null) {
sseStream.replayEventsAfter(lastEventId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we should close the stream on the server side once the Flux containing the json payloads completes. Not a timer though, just giving the user the ability to return a Flux that contains a set of notification/request/response payloads, where the response matching the initial request should be the last message from the Flux, followed by the onComplete signal. The other option would be for the user handling to return a Mono that signifies a non-SSE response directly to the POST message.

@ZachGerman
Copy link
Author

ZachGerman commented Jun 26, 2025

@sivankri: Changing the related logic after my meeting with Dariusz this morning, so (hopefully) your issue goes away with the new response type mapping.
@tzolov: It's related, but I don't think it's overlapping as I'm just adding the ability to set a list of allowed origins to the server transport provider and enforcing it if it's set.

@viyaviya
Copy link

merge commit 6c4830b is bad

@ZachGerman
Copy link
Author

ZachGerman commented Jun 27, 2025

merge commit 6c4830b is bad

Oops! Thanks for the heads up! Fixed!

@ZachGerman ZachGerman force-pushed the StreamableHttpServerTransportProvider branch from 6c4830b to 7817da6 Compare June 27, 2025 16:45
@ZachGerman
Copy link
Author

ZachGerman commented Jun 27, 2025

@sivankri the new logic added streamTools to McpServerFeatures, used in the Async constructor, which facilitates a list of tool specifications that return Flux instead of Mono via the new AsyncStreamingToolSpecification record type and uses the return type of the call to differentiate between direct-HTTP and SSE-stream responses.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants