11package io .modelcontextprotocol .server .transport ;
22
33import java .io .IOException ;
4+ import java .time .Duration ;
45import java .util .concurrent .ConcurrentHashMap ;
56
67import com .fasterxml .jackson .core .type .TypeReference ;
1112import io .modelcontextprotocol .spec .McpServerTransport ;
1213import io .modelcontextprotocol .spec .McpServerTransportProvider ;
1314import io .modelcontextprotocol .util .Assert ;
15+ import io .modelcontextprotocol .util .KeepAliveScheduler ;
16+
1417import org .slf4j .Logger ;
1518import org .slf4j .LoggerFactory ;
1619import reactor .core .Exceptions ;
@@ -109,6 +112,12 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
109112 */
110113 private volatile boolean isClosing = false ;
111114
115+ /**
116+ * Keep-alive scheduler for managing session pings. Activated if keepAliveInterval is
117+ * set. Disabled by default.
118+ */
119+ private KeepAliveScheduler keepAliveScheduler ;
120+
112121 /**
113122 * Constructs a new WebFlux SSE server transport provider instance with the default
114123 * SSE endpoint.
@@ -118,7 +127,10 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
118127 * messages. This endpoint will be communicated to clients during SSE connection
119128 * setup. Must not be null.
120129 * @throws IllegalArgumentException if either parameter is null
130+ * @deprecated Use the builder {@link #builder()} instead for better configuration
131+ * options.
121132 */
133+ @ Deprecated
122134 public WebFluxSseServerTransportProvider (ObjectMapper objectMapper , String messageEndpoint ) {
123135 this (objectMapper , messageEndpoint , DEFAULT_SSE_ENDPOINT );
124136 }
@@ -131,7 +143,10 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
131143 * messages. This endpoint will be communicated to clients during SSE connection
132144 * setup. Must not be null.
133145 * @throws IllegalArgumentException if either parameter is null
146+ * @deprecated Use the builder {@link #builder()} instead for better configuration
147+ * options.
134148 */
149+ @ Deprecated
135150 public WebFluxSseServerTransportProvider (ObjectMapper objectMapper , String messageEndpoint , String sseEndpoint ) {
136151 this (objectMapper , DEFAULT_BASE_URL , messageEndpoint , sseEndpoint );
137152 }
@@ -145,9 +160,32 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
145160 * messages. This endpoint will be communicated to clients during SSE connection
146161 * setup. Must not be null.
147162 * @throws IllegalArgumentException if either parameter is null
163+ * @deprecated Use the builder {@link #builder()} instead for better configuration
164+ * options.
148165 */
166+ @ Deprecated
149167 public WebFluxSseServerTransportProvider (ObjectMapper objectMapper , String baseUrl , String messageEndpoint ,
150168 String sseEndpoint ) {
169+ this (objectMapper , baseUrl , messageEndpoint , sseEndpoint , null );
170+ }
171+
172+ /**
173+ * Constructs a new WebFlux SSE server transport provider instance.
174+ * @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
175+ * of MCP messages. Must not be null.
176+ * @param baseUrl webflux message base path
177+ * @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
178+ * messages. This endpoint will be communicated to clients during SSE connection
179+ * setup. Must not be null.
180+ * @param sseEndpoint The SSE endpoint path. Must not be null.
181+ * @param keepAliveInterval The interval for sending keep-alive pings to clients.
182+ * @throws IllegalArgumentException if either parameter is null
183+ * @deprecated Use the builder {@link #builder()} instead for better configuration
184+ * options.
185+ */
186+ @ Deprecated
187+ public WebFluxSseServerTransportProvider (ObjectMapper objectMapper , String baseUrl , String messageEndpoint ,
188+ String sseEndpoint , Duration keepAliveInterval ) {
151189 Assert .notNull (objectMapper , "ObjectMapper must not be null" );
152190 Assert .notNull (baseUrl , "Message base path must not be null" );
153191 Assert .notNull (messageEndpoint , "Message endpoint must not be null" );
@@ -161,6 +199,17 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseU
161199 .GET (this .sseEndpoint , this ::handleSseConnection )
162200 .POST (this .messageEndpoint , this ::handleMessage )
163201 .build ();
202+
203+ if (keepAliveInterval != null ) {
204+
205+ this .keepAliveScheduler = KeepAliveScheduler
206+ .builder (() -> (isClosing ) ? Flux .empty () : Flux .fromIterable (sessions .values ()))
207+ .initialDelay (keepAliveInterval )
208+ .interval (keepAliveInterval )
209+ .build ();
210+
211+ this .keepAliveScheduler .start ();
212+ }
164213 }
165214
166215 @ Override
@@ -209,23 +258,21 @@ public Mono<Void> notifyClients(String method, Object params) {
209258 /**
210259 * Initiates a graceful shutdown of all the sessions. This method ensures all active
211260 * sessions are properly closed and cleaned up.
212- *
213- * <p>
214- * The shutdown process:
215- * <ul>
216- * <li>Marks the transport as closing to prevent new connections</li>
217- * <li>Closes each active session</li>
218- * <li>Removes closed sessions from the sessions map</li>
219- * <li>Times out after 5 seconds if shutdown takes too long</li>
220- * </ul>
221261 * @return A Mono that completes when all sessions have been closed
222262 */
223263 @ Override
224264 public Mono <Void > closeGracefully () {
225265 return Flux .fromIterable (sessions .values ())
226266 .doFirst (() -> logger .debug ("Initiating graceful shutdown with {} active sessions" , sessions .size ()))
227267 .flatMap (McpServerSession ::closeGracefully )
228- .then ();
268+ .then ()
269+ .doOnSuccess (v -> {
270+ logger .debug ("Graceful shutdown completed" );
271+ sessions .clear ();
272+ if (this .keepAliveScheduler != null ) {
273+ this .keepAliveScheduler .shutdown ();
274+ }
275+ });
229276 }
230277
231278 /**
@@ -396,6 +443,8 @@ public static class Builder {
396443
397444 private String sseEndpoint = DEFAULT_SSE_ENDPOINT ;
398445
446+ private Duration keepAliveInterval ;
447+
399448 /**
400449 * Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
401450 * messages.
@@ -446,6 +495,17 @@ public Builder sseEndpoint(String sseEndpoint) {
446495 return this ;
447496 }
448497
498+ /**
499+ * Sets the interval for sending keep-alive pings to clients.
500+ * @param keepAliveInterval The keep-alive interval duration. If null, keep-alive
501+ * is disabled.
502+ * @return this builder instance
503+ */
504+ public Builder keepAliveInterval (Duration keepAliveInterval ) {
505+ this .keepAliveInterval = keepAliveInterval ;
506+ return this ;
507+ }
508+
449509 /**
450510 * Builds a new instance of {@link WebFluxSseServerTransportProvider} with the
451511 * configured settings.
@@ -456,7 +516,8 @@ public WebFluxSseServerTransportProvider build() {
456516 Assert .notNull (objectMapper , "ObjectMapper must be set" );
457517 Assert .notNull (messageEndpoint , "Message endpoint must be set" );
458518
459- return new WebFluxSseServerTransportProvider (objectMapper , baseUrl , messageEndpoint , sseEndpoint );
519+ return new WebFluxSseServerTransportProvider (objectMapper , baseUrl , messageEndpoint , sseEndpoint ,
520+ keepAliveInterval );
460521 }
461522
462523 }
0 commit comments