Skip to content

Commit 971122b

Browse files
committed
add streamable http servlet keep alive support
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent 09a29ee commit 971122b

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,8 @@ public Mono<Void> closeGracefully() {
370370
logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size());
371371

372372
return Flux.fromIterable(sessions.values()).flatMap(McpServerSession::closeGracefully).then().doOnSuccess(v -> {
373-
logger.debug("Graceful shutdown completed");
374373
sessions.clear();
374+
logger.debug("Graceful shutdown completed");
375375
if (this.keepAliveScheduler != null) {
376376
this.keepAliveScheduler.shutdown();
377377
}

mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.io.BufferedReader;
88
import java.io.IOException;
99
import java.io.PrintWriter;
10+
import java.time.Duration;
1011
import java.util.ArrayList;
1112
import java.util.List;
1213
import java.util.concurrent.ConcurrentHashMap;
@@ -28,12 +29,14 @@
2829
import io.modelcontextprotocol.spec.McpStreamableServerTransport;
2930
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
3031
import io.modelcontextprotocol.util.Assert;
32+
import io.modelcontextprotocol.util.KeepAliveScheduler;
3133
import jakarta.servlet.AsyncContext;
3234
import jakarta.servlet.ServletException;
3335
import jakarta.servlet.annotation.WebServlet;
3436
import jakarta.servlet.http.HttpServlet;
3537
import jakarta.servlet.http.HttpServletRequest;
3638
import jakarta.servlet.http.HttpServletResponse;
39+
import reactor.core.publisher.Flux;
3740
import reactor.core.publisher.Mono;
3841

3942
/**
@@ -110,6 +113,12 @@ public class HttpServletStreamableServerTransportProvider extends HttpServlet
110113
*/
111114
private volatile boolean isClosing = false;
112115

116+
/**
117+
* Keep-alive scheduler for managing session pings. Activated if keepAliveInterval is
118+
* set. Disabled by default.
119+
*/
120+
private KeepAliveScheduler keepAliveScheduler;
121+
113122
/**
114123
* Constructs a new HttpServletStreamableServerTransportProvider instance.
115124
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
@@ -121,7 +130,8 @@ public class HttpServletStreamableServerTransportProvider extends HttpServlet
121130
* @throws IllegalArgumentException if any parameter is null
122131
*/
123132
private HttpServletStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint,
124-
boolean disallowDelete, McpTransportContextExtractor<HttpServletRequest> contextExtractor) {
133+
boolean disallowDelete, McpTransportContextExtractor<HttpServletRequest> contextExtractor,
134+
Duration keepAliveInterval) {
125135
Assert.notNull(objectMapper, "ObjectMapper must not be null");
126136
Assert.notNull(mcpEndpoint, "MCP endpoint must not be null");
127137
Assert.notNull(contextExtractor, "Context extractor must not be null");
@@ -130,6 +140,18 @@ private HttpServletStreamableServerTransportProvider(ObjectMapper objectMapper,
130140
this.mcpEndpoint = mcpEndpoint;
131141
this.disallowDelete = disallowDelete;
132142
this.contextExtractor = contextExtractor;
143+
144+
if (keepAliveInterval != null) {
145+
146+
this.keepAliveScheduler = KeepAliveScheduler
147+
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(sessions.values()))
148+
.initialDelay(keepAliveInterval)
149+
.interval(keepAliveInterval)
150+
.build();
151+
152+
this.keepAliveScheduler.start();
153+
}
154+
133155
}
134156

135157
@Override
@@ -187,6 +209,12 @@ public Mono<Void> closeGracefully() {
187209

188210
this.sessions.clear();
189211
logger.debug("Graceful shutdown completed");
212+
}).then().doOnSuccess(v -> {
213+
sessions.clear();
214+
logger.debug("Graceful shutdown completed");
215+
if (this.keepAliveScheduler != null) {
216+
this.keepAliveScheduler.shutdown();
217+
}
190218
});
191219
}
192220

@@ -737,6 +765,8 @@ public static class Builder {
737765

738766
private McpTransportContextExtractor<HttpServletRequest> contextExtractor = (serverRequest, context) -> context;
739767

768+
private Duration keepAliveInterval;
769+
740770
/**
741771
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
742772
* messages.
@@ -784,6 +814,18 @@ public Builder contextExtractor(McpTransportContextExtractor<HttpServletRequest>
784814
return this;
785815
}
786816

817+
/**
818+
* Sets the keep-alive interval for the transport. If set, a keep-alive scheduler
819+
* will be activated to periodically ping active sessions.
820+
* @param keepAliveInterval The interval for keep-alive pings. If null, no
821+
* keep-alive will be scheduled.
822+
* @return this builder instance
823+
*/
824+
public Builder keepAliveInterval(Duration keepAliveInterval) {
825+
this.keepAliveInterval = keepAliveInterval;
826+
return this;
827+
}
828+
787829
/**
788830
* Builds a new instance of {@link HttpServletStreamableServerTransportProvider}
789831
* with the configured settings.
@@ -795,7 +837,7 @@ public HttpServletStreamableServerTransportProvider build() {
795837
Assert.notNull(this.mcpEndpoint, "MCP endpoint must be set");
796838

797839
return new HttpServletStreamableServerTransportProvider(this.objectMapper, this.mcpEndpoint,
798-
this.disallowDelete, this.contextExtractor);
840+
this.disallowDelete, this.contextExtractor, this.keepAliveInterval);
799841
}
800842

801843
}

0 commit comments

Comments
 (0)