1
+ /*
2
+ * Copyright 2024 - 2024 the original author or authors.
3
+ */
4
+ package io .modelcontextprotocol .client .transport ;
5
+
6
+ import com .fasterxml .jackson .core .type .TypeReference ;
7
+ import com .fasterxml .jackson .databind .ObjectMapper ;
8
+ import io .modelcontextprotocol .spec .McpClientTransport ;
9
+ import io .modelcontextprotocol .spec .McpError ;
10
+ import io .modelcontextprotocol .spec .McpSchema ;
11
+ import io .modelcontextprotocol .spec .McpSchema .JSONRPCMessage ;
12
+ import io .modelcontextprotocol .util .Assert ;
13
+ import org .noear .solon .net .http .HttpResponse ;
14
+ import org .noear .solon .net .http .HttpUtilsBuilder ;
15
+ import org .noear .solon .net .http .textstream .ServerSentEvent ;
16
+ import org .noear .solon .rx .SimpleSubscriber ;
17
+ import org .slf4j .Logger ;
18
+ import org .slf4j .LoggerFactory ;
19
+ import reactor .core .publisher .Mono ;
20
+
21
+ import java .io .IOException ;
22
+ import java .util .concurrent .CompletableFuture ;
23
+ import java .util .concurrent .CountDownLatch ;
24
+ import java .util .concurrent .TimeUnit ;
25
+ import java .util .concurrent .atomic .AtomicReference ;
26
+ import java .util .function .Function ;
27
+
28
+ /**
29
+ * Server-Sent Events (SSE) implementation of the
30
+ * {@link io.modelcontextprotocol.spec.McpTransport} that follows the MCP HTTP with SSE
31
+ * transport specification.
32
+ *
33
+ * <p>
34
+ * This transport establishes a bidirectional communication channel where:
35
+ * <ul>
36
+ * <li>Inbound messages are received through an SSE connection from the server</li>
37
+ * <li>Outbound messages are sent via HTTP POST requests to a server-provided
38
+ * endpoint</li>
39
+ * </ul>
40
+ *
41
+ * <p>
42
+ * The message flow follows these steps:
43
+ * <ol>
44
+ * <li>The client establishes an SSE connection to the server's /sse endpoint</li>
45
+ * <li>The server sends an 'endpoint' event containing the URI for sending messages</li>
46
+ * </ol>
47
+ *
48
+ * This implementation uses {@link HttpUtilsBuilder} for HTTP communications and supports JSON. and base JDK8
49
+ * serialization/deserialization of messages.
50
+ *
51
+ * @author Christian Tzolov
52
+ * @author noear
53
+ * @see <a href=
54
+ * "https://spec.modelcontextprotocol.io/specification/basic/transports/#http-with-sse">MCP
55
+ * HTTP with SSE Transport Specification</a>
56
+ */
57
+ public class WebRxSseClientTransport implements McpClientTransport {
58
+
59
+ private static final Logger logger = LoggerFactory .getLogger (WebRxSseClientTransport .class );
60
+
61
+ /** SSE event type for JSON-RPC messages */
62
+ private static final String MESSAGE_EVENT_TYPE = "message" ;
63
+
64
+ /** SSE event type for endpoint discovery */
65
+ private static final String ENDPOINT_EVENT_TYPE = "endpoint" ;
66
+
67
+ /** Default SSE endpoint path */
68
+ private static final String DEFAULT_SSE_ENDPOINT = "/sse" ;
69
+
70
+ /** HttpUtils instance builder */
71
+ private final HttpUtilsBuilder webBuilder ;
72
+
73
+ /** SSE endpoint path */
74
+ private final String sseEndpoint ;
75
+
76
+ /** JSON object mapper for message serialization/deserialization */
77
+ protected ObjectMapper objectMapper ;
78
+
79
+ /** Flag indicating if the transport is in closing state */
80
+ private volatile boolean isClosing = false ;
81
+
82
+ /** Latch for coordinating endpoint discovery */
83
+ private final CountDownLatch closeLatch = new CountDownLatch (1 );
84
+
85
+ /** Holds the discovered message endpoint URL */
86
+ private final AtomicReference <String > messageEndpoint = new AtomicReference <>();
87
+
88
+ /** Holds the SSE connection future */
89
+ private final AtomicReference <CompletableFuture <Void >> connectionFuture = new AtomicReference <>();
90
+
91
+ /**
92
+ * Creates a new transport instance with default HTTP client and object mapper.
93
+ * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance
94
+ */
95
+ public WebRxSseClientTransport (HttpUtilsBuilder webBuilder ) {
96
+ this (webBuilder , new ObjectMapper ());
97
+ }
98
+
99
+ /**
100
+ * Creates a new transport instance with custom HTTP client builder and object mapper.
101
+ * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance
102
+ * @param objectMapper the object mapper for JSON serialization/deserialization
103
+ * @throws IllegalArgumentException if objectMapper or clientBuilder is null
104
+ */
105
+ public WebRxSseClientTransport (HttpUtilsBuilder webBuilder , ObjectMapper objectMapper ) {
106
+ this (webBuilder , DEFAULT_SSE_ENDPOINT , objectMapper );
107
+ }
108
+
109
+ /**
110
+ * Creates a new transport instance with custom HTTP client builder and object mapper.
111
+ * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance
112
+ * @param sseEndpoint the SSE endpoint path
113
+ * @param objectMapper the object mapper for JSON serialization/deserialization
114
+ * @throws IllegalArgumentException if objectMapper or clientBuilder is null
115
+ */
116
+ public WebRxSseClientTransport (HttpUtilsBuilder webBuilder , String sseEndpoint ,
117
+ ObjectMapper objectMapper ) {
118
+ Assert .notNull (objectMapper , "ObjectMapper must not be null" );
119
+ Assert .notNull (webBuilder , "baseUri must not be empty" );
120
+ Assert .hasText (sseEndpoint , "sseEndpoint must not be empty" );
121
+ this .webBuilder = webBuilder ;
122
+ this .sseEndpoint = sseEndpoint ;
123
+ this .objectMapper = objectMapper ;
124
+ }
125
+
126
+ /**
127
+ * Creates a new builder for {@link WebRxSseClientTransport}.
128
+ * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance
129
+ * @return a new builder instance
130
+ */
131
+ public static Builder builder (HttpUtilsBuilder webBuilder ) {
132
+ return new Builder (webBuilder );
133
+ }
134
+
135
+ /**
136
+ * Builder for {@link WebRxSseClientTransport}.
137
+ */
138
+ public static class Builder {
139
+
140
+ private final HttpUtilsBuilder webBuilder ;
141
+
142
+ private String sseEndpoint = DEFAULT_SSE_ENDPOINT ;
143
+
144
+ private ObjectMapper objectMapper = new ObjectMapper ();
145
+
146
+ /**
147
+ * Creates a new builder with the specified base URI.
148
+ * @param webBuilder the HttpUtilsBuilder to use for creating the HttpUtils instance
149
+ */
150
+ public Builder (HttpUtilsBuilder webBuilder ) {
151
+ Assert .notNull (webBuilder , "webBuilder must not be empty" );
152
+ this .webBuilder = webBuilder ;
153
+ }
154
+
155
+ /**
156
+ * Sets the SSE endpoint path.
157
+ * @param sseEndpoint the SSE endpoint path
158
+ * @return this builder
159
+ */
160
+ public Builder sseEndpoint (String sseEndpoint ) {
161
+ Assert .hasText (sseEndpoint , "sseEndpoint must not be null" );
162
+ this .sseEndpoint = sseEndpoint ;
163
+ return this ;
164
+ }
165
+
166
+ /**
167
+ * Sets the object mapper for JSON serialization/deserialization.
168
+ * @param objectMapper the object mapper
169
+ * @return this builder
170
+ */
171
+ public Builder objectMapper (ObjectMapper objectMapper ) {
172
+ Assert .notNull (objectMapper , "objectMapper must not be null" );
173
+ this .objectMapper = objectMapper ;
174
+ return this ;
175
+ }
176
+
177
+ /**
178
+ * Builds a new {@link WebRxSseClientTransport} instance.
179
+ * @return a new transport instance
180
+ */
181
+ public WebRxSseClientTransport build () {
182
+ return new WebRxSseClientTransport (webBuilder , sseEndpoint , objectMapper );
183
+ }
184
+
185
+ }
186
+
187
+ /**
188
+ * Establishes the SSE connection with the server and sets up message handling.
189
+ *
190
+ * <p>
191
+ * This method:
192
+ * <ul>
193
+ * <li>Initiates the SSE connection</li>
194
+ * <li>Handles endpoint discovery events</li>
195
+ * <li>Processes incoming JSON-RPC messages</li>
196
+ * </ul>
197
+ * @param handler the function to process received JSON-RPC messages
198
+ * @return a Mono that completes when the connection is established
199
+ */
200
+ @ Override
201
+ public Mono <Void > connect (Function <Mono <JSONRPCMessage >, Mono <JSONRPCMessage >> handler ) {
202
+ CompletableFuture <Void > future = new CompletableFuture <>();
203
+ connectionFuture .set (future );
204
+
205
+ webBuilder .build (this .sseEndpoint )
206
+ .execAsSseStream ("GET" )
207
+ .subscribe (new SimpleSubscriber <ServerSentEvent >()
208
+ .doOnNext (event -> {
209
+ if (isClosing ) {
210
+ return ;
211
+ }
212
+
213
+ try {
214
+ if (ENDPOINT_EVENT_TYPE .equals (event .getEvent ())) {
215
+ String endpoint = event .data ();
216
+ messageEndpoint .set (endpoint );
217
+ closeLatch .countDown ();
218
+ future .complete (null );
219
+ } else if (MESSAGE_EVENT_TYPE .equals (event .getEvent ())) {
220
+ JSONRPCMessage message = McpSchema .deserializeJsonRpcMessage (objectMapper , event .data ());
221
+ handler .apply (Mono .just (message )).subscribe ();
222
+ } else {
223
+ logger .error ("Received unrecognized SSE event type: {}" , event .getEvent ());
224
+ }
225
+ } catch (IOException e ) {
226
+ logger .error ("Error processing SSE event" , e );
227
+ future .completeExceptionally (e );
228
+ }
229
+ }).doOnError (error -> {
230
+ if (!isClosing ) {
231
+ logger .warn ("SSE connection error" , error );
232
+ future .completeExceptionally (error );
233
+ }
234
+ }));
235
+
236
+ return Mono .fromFuture (future );
237
+ }
238
+
239
+ /**
240
+ * Sends a JSON-RPC message to the server.
241
+ *
242
+ * <p>
243
+ * This method waits for the message endpoint to be discovered before sending the
244
+ * message. The message is serialized to JSON and sent as an HTTP POST request.
245
+ * @param message the JSON-RPC message to send
246
+ * @return a Mono that completes when the message is sent
247
+ * @throws McpError if the message endpoint is not available or the wait times out
248
+ */
249
+ @ Override
250
+ public Mono <Void > sendMessage (JSONRPCMessage message ) {
251
+ if (isClosing ) {
252
+ return Mono .empty ();
253
+ }
254
+
255
+ try {
256
+ if (!closeLatch .await (10 , TimeUnit .SECONDS )) {
257
+ return Mono .error (new McpError ("Failed to wait for the message endpoint" ));
258
+ }
259
+ } catch (InterruptedException e ) {
260
+ return Mono .error (new McpError ("Failed to wait for the message endpoint" ));
261
+ }
262
+
263
+ String endpoint = messageEndpoint .get ();
264
+ if (endpoint == null ) {
265
+ return Mono .error (new McpError ("No message endpoint available" ));
266
+ }
267
+
268
+ try {
269
+ String jsonText = this .objectMapper .writeValueAsString (message );
270
+ CompletableFuture <HttpResponse > future = webBuilder .build (endpoint )
271
+ .header ("Content-Type" , "application/json" )
272
+ .bodyOfJson (jsonText )
273
+ .execAsync ("POST" );
274
+
275
+ return Mono .fromFuture (future .thenAccept (response -> {
276
+ if (response .code () != 200 && response .code () != 201 && response .code () != 202
277
+ && response .code () != 206 ) {
278
+ logger .error ("Error sending message: {}" , response .code ());
279
+ }
280
+ }));
281
+ } catch (IOException e ) {
282
+ if (!isClosing ) {
283
+ return Mono .error (new RuntimeException ("Failed to serialize message" , e ));
284
+ }
285
+ return Mono .empty ();
286
+ }
287
+ }
288
+
289
+ /**
290
+ * Gracefully closes the transport connection.
291
+ *
292
+ * <p>
293
+ * Sets the closing flag and cancels any pending connection future. This prevents new
294
+ * messages from being sent and allows ongoing operations to complete.
295
+ * @return a Mono that completes when the closing process is initiated
296
+ */
297
+ @ Override
298
+ public Mono <Void > closeGracefully () {
299
+ return Mono .fromRunnable (() -> {
300
+ isClosing = true ;
301
+ CompletableFuture <Void > future = connectionFuture .get ();
302
+ if (future != null && !future .isDone ()) {
303
+ future .cancel (true );
304
+ }
305
+ });
306
+ }
307
+
308
+ /**
309
+ * Unmarshals data to the specified type using the configured object mapper.
310
+ * @param data the data to unmarshal
311
+ * @param typeRef the type reference for the target type
312
+ * @param <T> the target type
313
+ * @return the unmarshalled object
314
+ */
315
+ @ Override
316
+ public <T > T unmarshalFrom (Object data , TypeReference <T > typeRef ) {
317
+ return this .objectMapper .convertValue (data , typeRef );
318
+ }
319
+ }
0 commit comments