88import java .time .Duration ;
99import java .util .UUID ;
1010import java .util .concurrent .ConcurrentHashMap ;
11+ import java .util .concurrent .locks .ReentrantLock ;
1112
1213import com .fasterxml .jackson .core .type .TypeReference ;
1314import com .fasterxml .jackson .databind .ObjectMapper ;
@@ -339,6 +340,12 @@ private class WebMvcMcpSessionTransport implements McpServerTransport {
339340
340341 private final SseBuilder sseBuilder ;
341342
343+ /**
344+ * Lock to ensure thread-safe access to the SSE builder when sending messages.
345+ * This prevents concurrent modifications that could lead to corrupted SSE events.
346+ */
347+ private final ReentrantLock sseBuilderLock = new ReentrantLock ();
348+
342349 /**
343350 * Creates a new session transport with the specified ID and SSE builder.
344351 * @param sessionId The unique identifier for this session
@@ -358,6 +365,7 @@ private class WebMvcMcpSessionTransport implements McpServerTransport {
358365 @ Override
359366 public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message ) {
360367 return Mono .fromRunnable (() -> {
368+ sseBuilderLock .lock ();
361369 try {
362370 String jsonText = objectMapper .writeValueAsString (message );
363371 sseBuilder .id (sessionId ).event (MESSAGE_EVENT_TYPE ).data (jsonText );
@@ -367,6 +375,9 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
367375 logger .error ("Failed to send message to session {}: {}" , sessionId , e .getMessage ());
368376 sseBuilder .error (e );
369377 }
378+ finally {
379+ sseBuilderLock .unlock ();
380+ }
370381 });
371382 }
372383
@@ -390,13 +401,17 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
390401 public Mono <Void > closeGracefully () {
391402 return Mono .fromRunnable (() -> {
392403 logger .debug ("Closing session transport: {}" , sessionId );
404+ sseBuilderLock .lock ();
393405 try {
394406 sseBuilder .complete ();
395407 logger .debug ("Successfully completed SSE builder for session {}" , sessionId );
396408 }
397409 catch (Exception e ) {
398410 logger .warn ("Failed to complete SSE builder for session {}: {}" , sessionId , e .getMessage ());
399411 }
412+ finally {
413+ sseBuilderLock .unlock ();
414+ }
400415 });
401416 }
402417
@@ -405,13 +420,17 @@ public Mono<Void> closeGracefully() {
405420 */
406421 @ Override
407422 public void close () {
423+ sseBuilderLock .lock ();
408424 try {
409425 sseBuilder .complete ();
410426 logger .debug ("Successfully completed SSE builder for session {}" , sessionId );
411427 }
412428 catch (Exception e ) {
413429 logger .warn ("Failed to complete SSE builder for session {}: {}" , sessionId , e .getMessage ());
414430 }
431+ finally {
432+ sseBuilderLock .unlock ();
433+ }
415434 }
416435
417436 }
0 commit comments