|
1 | 1 | /* |
2 | | - * Copyright 2024 - 2024 the original author or authors. |
| 2 | + * Copyright 2024 - 2025 the original author or authors. |
3 | 3 | */ |
4 | 4 | package io.modelcontextprotocol.client.transport; |
5 | 5 |
|
@@ -102,6 +102,11 @@ public class HttpClientSseClientTransport implements McpClientTransport { |
102 | 102 | */ |
103 | 103 | protected final Sinks.One<String> messageEndpointSink = Sinks.one(); |
104 | 104 |
|
| 105 | + /** |
| 106 | + * Customizer to modify requests before they are executed. |
| 107 | + */ |
| 108 | + private final AsyncHttpRequestCustomizer httpRequestCustomizer; |
| 109 | + |
105 | 110 | /** |
106 | 111 | * Creates a new transport instance with default HTTP client and object mapper. |
107 | 112 | * @param baseUri the base URI of the MCP server |
@@ -172,18 +177,38 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques |
172 | 177 | * @param objectMapper the object mapper for JSON serialization/deserialization |
173 | 178 | * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null |
174 | 179 | */ |
| 180 | + @Deprecated(forRemoval = true) |
175 | 181 | HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, |
176 | 182 | String sseEndpoint, ObjectMapper objectMapper) { |
| 183 | + this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP); |
| 184 | + } |
| 185 | + |
| 186 | + /** |
| 187 | + * Creates a new transport instance with custom HTTP client builder, object mapper, |
| 188 | + * and headers. |
| 189 | + * @param httpClient the HTTP client to use |
| 190 | + * @param requestBuilder the HTTP request builder to use |
| 191 | + * @param baseUri the base URI of the MCP server |
| 192 | + * @param sseEndpoint the SSE endpoint path |
| 193 | + * @param objectMapper the object mapper for JSON serialization/deserialization |
| 194 | + * @param httpRequestCustomizer customizer for the requestBuilder before executing |
| 195 | + * requests |
| 196 | + * @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null |
| 197 | + */ |
| 198 | + HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, |
| 199 | + String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) { |
177 | 200 | Assert.notNull(objectMapper, "ObjectMapper must not be null"); |
178 | 201 | Assert.hasText(baseUri, "baseUri must not be empty"); |
179 | 202 | Assert.hasText(sseEndpoint, "sseEndpoint must not be empty"); |
180 | 203 | Assert.notNull(httpClient, "httpClient must not be null"); |
181 | 204 | Assert.notNull(requestBuilder, "requestBuilder must not be null"); |
| 205 | + Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null"); |
182 | 206 | this.baseUri = URI.create(baseUri); |
183 | 207 | this.sseEndpoint = sseEndpoint; |
184 | 208 | this.objectMapper = objectMapper; |
185 | 209 | this.httpClient = httpClient; |
186 | 210 | this.requestBuilder = requestBuilder; |
| 211 | + this.httpRequestCustomizer = httpRequestCustomizer; |
187 | 212 | } |
188 | 213 |
|
189 | 214 | /** |
@@ -213,6 +238,8 @@ public static class Builder { |
213 | 238 | private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() |
214 | 239 | .header("Content-Type", "application/json"); |
215 | 240 |
|
| 241 | + private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP; |
| 242 | + |
216 | 243 | /** |
217 | 244 | * Creates a new builder instance. |
218 | 245 | */ |
@@ -310,31 +337,66 @@ public Builder objectMapper(ObjectMapper objectMapper) { |
310 | 337 | return this; |
311 | 338 | } |
312 | 339 |
|
| 340 | + /** |
| 341 | + * Sets the customizer for {@link HttpRequest.Builder}, to modify requests before |
| 342 | + * executing them. |
| 343 | + * <p> |
| 344 | + * This overrides the customizer from |
| 345 | + * {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}. |
| 346 | + * <p> |
| 347 | + * Do NOT use a blocking {@link SyncHttpRequestCustomizer} in a non-blocking |
| 348 | + * context. Use {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)} |
| 349 | + * instead. |
| 350 | + * @param syncHttpRequestCustomizer the request customizer |
| 351 | + * @return this builder |
| 352 | + */ |
| 353 | + public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) { |
| 354 | + this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer); |
| 355 | + return this; |
| 356 | + } |
| 357 | + |
| 358 | + /** |
| 359 | + * Sets the customizer for {@link HttpRequest.Builder}, to modify requests before |
| 360 | + * executing them. |
| 361 | + * <p> |
| 362 | + * This overrides the customizer from |
| 363 | + * {@link #httpRequestCustomizer(SyncHttpRequestCustomizer)}. |
| 364 | + * <p> |
| 365 | + * Do NOT use a blocking implementation in a non-blocking context. |
| 366 | + * @param asyncHttpRequestCustomizer the request customizer |
| 367 | + * @return this builder |
| 368 | + */ |
| 369 | + public Builder asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) { |
| 370 | + this.httpRequestCustomizer = asyncHttpRequestCustomizer; |
| 371 | + return this; |
| 372 | + } |
| 373 | + |
313 | 374 | /** |
314 | 375 | * Builds a new {@link HttpClientSseClientTransport} instance. |
315 | 376 | * @return a new transport instance |
316 | 377 | */ |
317 | 378 | public HttpClientSseClientTransport build() { |
318 | 379 | return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint, |
319 | | - objectMapper); |
| 380 | + objectMapper, httpRequestCustomizer); |
320 | 381 | } |
321 | 382 |
|
322 | 383 | } |
323 | 384 |
|
324 | 385 | @Override |
325 | 386 | public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) { |
| 387 | + var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint); |
326 | 388 |
|
327 | | - return Mono.create(sink -> { |
328 | | - |
329 | | - HttpRequest request = requestBuilder.copy() |
330 | | - .uri(Utils.resolveUri(this.baseUri, this.sseEndpoint)) |
| 389 | + return Mono.defer(() -> { |
| 390 | + var builder = requestBuilder.copy() |
| 391 | + .uri(uri) |
331 | 392 | .header("Accept", "text/event-stream") |
332 | 393 | .header("Cache-Control", "no-cache") |
333 | | - .GET() |
334 | | - .build(); |
335 | | - |
| 394 | + .GET(); |
| 395 | + return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null)); |
| 396 | + }).flatMap(requestBuilder -> Mono.create(sink -> { |
336 | 397 | Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient |
337 | | - .sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink)) |
| 398 | + .sendAsync(requestBuilder.build(), |
| 399 | + responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink)) |
338 | 400 | .exceptionallyCompose(e -> { |
339 | 401 | sseSink.error(e); |
340 | 402 | return CompletableFuture.failedFuture(e); |
@@ -397,7 +459,7 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) { |
397 | 459 | .subscribe(); |
398 | 460 |
|
399 | 461 | this.sseSubscription.set(connection); |
400 | | - }); |
| 462 | + })); |
401 | 463 | } |
402 | 464 |
|
403 | 465 | /** |
@@ -453,13 +515,13 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) { |
453 | 515 |
|
454 | 516 | private Mono<HttpResponse<String>> sendHttpPost(final String endpoint, final String body) { |
455 | 517 | final URI requestUri = Utils.resolveUri(baseUri, endpoint); |
456 | | - final HttpRequest request = this.requestBuilder.copy() |
457 | | - .uri(requestUri) |
458 | | - .POST(HttpRequest.BodyPublishers.ofString(body)) |
459 | | - .build(); |
460 | | - |
461 | | - // TODO: why discard the body? |
462 | | - return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())); |
| 518 | + return Mono.defer(() -> { |
| 519 | + var builder = this.requestBuilder.copy().uri(requestUri).POST(HttpRequest.BodyPublishers.ofString(body)); |
| 520 | + return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body)); |
| 521 | + }).flatMap(customizedBuilder -> { |
| 522 | + var request = customizedBuilder.build(); |
| 523 | + return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())); |
| 524 | + }); |
463 | 525 | } |
464 | 526 |
|
465 | 527 | /** |
|
0 commit comments