-
Notifications
You must be signed in to change notification settings - Fork 820
Add resource subscribe/unsubscribe support to McpAsyncServer #838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,9 +25,9 @@ | |
| import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion; | ||
| import io.modelcontextprotocol.spec.McpSchema.ErrorCodes; | ||
| import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; | ||
| import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; | ||
| import io.modelcontextprotocol.spec.McpSchema.PromptReference; | ||
| import io.modelcontextprotocol.spec.McpSchema.ResourceReference; | ||
| import io.modelcontextprotocol.spec.McpSchema.ResourcesUpdatedNotification; | ||
| import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest; | ||
| import io.modelcontextprotocol.spec.McpSchema.Tool; | ||
| import io.modelcontextprotocol.spec.McpServerSession; | ||
|
|
@@ -109,6 +109,8 @@ public class McpAsyncServer { | |
|
|
||
| private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates = new ConcurrentHashMap<>(); | ||
|
|
||
| private final Map<String, McpSchema.SubscribeRequest> resourceSubscriptions = new ConcurrentHashMap<>(); | ||
|
|
||
| private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>(); | ||
|
|
||
| // FIXME: this field is deprecated and should be remvoed together with the | ||
|
|
@@ -215,6 +217,11 @@ private Map<String, McpRequestHandler<?>> prepareRequestHandlers() { | |
| requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler()); | ||
| requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler()); | ||
| requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler()); | ||
|
|
||
| if (this.serverCapabilities.resources().subscribe()) { | ||
| requestHandlers.put(McpSchema.METHOD_RESOURCES_SUBSCRIBE, resourcesSubscribeHandler()); | ||
| requestHandlers.put(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, resourcesUnsubscribeHandler()); | ||
| } | ||
| } | ||
|
|
||
| // Add prompts API handlers if provider exists | ||
|
|
@@ -560,15 +567,26 @@ public Mono<Void> addResource(McpServerFeatures.AsyncResourceSpecification resou | |
| } | ||
|
|
||
| return Mono.defer(() -> { | ||
| var previous = this.resources.put(resourceSpecification.resource().uri(), resourceSpecification); | ||
| var resourceUri = resourceSpecification.resource().uri(); | ||
|
|
||
| var previous = this.resources.put(resourceUri, resourceSpecification); | ||
| if (previous != null) { | ||
| logger.warn("Replace existing Resource with URI '{}'", resourceSpecification.resource().uri()); | ||
| logger.warn("Replace existing Resource with URI '{}'", resourceUri); | ||
| } | ||
| else { | ||
| logger.debug("Added resource handler: {}", resourceSpecification.resource().uri()); | ||
| logger.debug("Added resource handler: {}", resourceUri); | ||
| } | ||
|
|
||
| if (this.serverCapabilities.resources().subscribe() | ||
| && this.resourceSubscriptions.containsKey(resourceUri)) { | ||
| Mono<Void> updated = this.notifyResourcesUpdated(new ResourcesUpdatedNotification(resourceUri)); | ||
| if (this.serverCapabilities.resources().listChanged()) { | ||
| return updated.then(this.notifyResourcesListChanged()); | ||
| } | ||
| return updated; | ||
| } | ||
|
Comment on lines
+580
to
587
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is unnecessary. A resource is not updated when it's added but when it's modify. This code block should be removed. |
||
| if (this.serverCapabilities.resources().listChanged()) { | ||
| return notifyResourcesListChanged(); | ||
| return this.notifyResourcesListChanged(); | ||
| } | ||
| return Mono.empty(); | ||
| }); | ||
|
|
@@ -600,8 +618,9 @@ public Mono<Void> removeResource(String resourceUri) { | |
| McpServerFeatures.AsyncResourceSpecification removed = this.resources.remove(resourceUri); | ||
| if (removed != null) { | ||
| logger.debug("Removed resource handler: {}", resourceUri); | ||
|
|
||
| if (this.serverCapabilities.resources().listChanged()) { | ||
| return notifyResourcesListChanged(); | ||
| return this.notifyResourcesListChanged(); | ||
| } | ||
| return Mono.empty(); | ||
| } | ||
|
|
@@ -734,6 +753,33 @@ private McpRequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHand | |
| }; | ||
| } | ||
|
|
||
| private McpRequestHandler<Object> resourcesSubscribeHandler() { | ||
| return (ex, params) -> { | ||
| McpSchema.SubscribeRequest resourceSubscribeRequest = jsonMapper.convertValue(params, new TypeRef<>() { | ||
| }); | ||
|
|
||
| var resourceUri = resourceSubscribeRequest.uri(); | ||
|
|
||
| this.resourceSubscriptions.put(resourceUri, resourceSubscribeRequest); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use |
||
|
|
||
| return Mono.just(Map.of()); | ||
| }; | ||
| } | ||
|
|
||
| private McpRequestHandler<Object> resourcesUnsubscribeHandler() { | ||
|
|
||
| return (ex, params) -> { | ||
| McpSchema.SubscribeRequest resourceSubscribeRequest = jsonMapper.convertValue(params, new TypeRef<>() { | ||
| }); | ||
|
|
||
| var resourceUri = resourceSubscribeRequest.uri(); | ||
|
|
||
| this.resourceSubscriptions.remove(resourceUri); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only remove the particular session, not the resource uri from the mapping - only remove it if no sessions are left that have a subscription. |
||
|
|
||
| return Mono.just(Map.of()); | ||
| }; | ||
| } | ||
|
|
||
| private Optional<McpServerFeatures.AsyncResourceSpecification> findResourceSpecification(String uri) { | ||
| var result = this.resources.values() | ||
| .stream() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,217 @@ | ||
| /* | ||
| * Copyright 2024-2024 the original author or authors. | ||
| */ | ||
|
|
||
| package io.modelcontextprotocol.server; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.List; | ||
|
|
||
| import io.modelcontextprotocol.MockMcpServerTransport; | ||
| import io.modelcontextprotocol.MockMcpServerTransportProvider; | ||
| import io.modelcontextprotocol.spec.McpSchema; | ||
| import io.modelcontextprotocol.spec.McpSchema.ReadResourceResult; | ||
| import io.modelcontextprotocol.spec.McpSchema.ServerCapabilities; | ||
| import io.modelcontextprotocol.spec.ProtocolVersions; | ||
| import org.junit.jupiter.api.AfterEach; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
| import reactor.core.publisher.Mono; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatCode; | ||
|
|
||
| /** | ||
| * Tests for resource subscribe/unsubscribe support in {@link McpAsyncServer}. | ||
| * | ||
| * Covers the handlers registered under {@code resources/subscribe} and | ||
| * {@code resources/unsubscribe}, the capability guard that keeps them absent when | ||
| * {@code subscribe=false}, and the {@code notifications/resources/updated} notification | ||
| * emitted by {@link McpAsyncServer#addResource} when a subscribed URI is (re-)added. | ||
| */ | ||
| class ResourceSubscriptionTests { | ||
|
|
||
| private static final String RESOURCE_URI = "test://resource/item"; | ||
|
|
||
| private MockMcpServerTransport mockTransport; | ||
|
|
||
| private MockMcpServerTransportProvider mockTransportProvider; | ||
|
|
||
| private McpAsyncServer mcpAsyncServer; | ||
|
|
||
| @BeforeEach | ||
| void setUp() { | ||
| mockTransport = new MockMcpServerTransport(); | ||
| mockTransportProvider = new MockMcpServerTransportProvider(mockTransport); | ||
| } | ||
|
|
||
| @AfterEach | ||
| void tearDown() { | ||
| if (mcpAsyncServer != null) { | ||
| assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))) | ||
| .doesNotThrowAnyException(); | ||
| } | ||
| } | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // Capability guard | ||
| // ------------------------------------------------------------------------- | ||
|
|
||
| @Test | ||
| void subscribeHandlerNotRegisteredWhenSubscribeCapabilityDisabled() { | ||
| mcpAsyncServer = McpServer.async(mockTransportProvider) | ||
| .serverInfo("test-server", "1.0.0") | ||
| .capabilities(ServerCapabilities.builder().resources(false, false).build()) | ||
| .build(); | ||
|
|
||
| initializeSession(); | ||
| mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); | ||
|
|
||
| McpSchema.JSONRPCMessage response = mockTransport.getLastSentMessage(); | ||
| assertThat(response).isInstanceOf(McpSchema.JSONRPCResponse.class); | ||
| McpSchema.JSONRPCResponse jsonResponse = (McpSchema.JSONRPCResponse) response; | ||
| assertThat(jsonResponse.error()).isNotNull(); | ||
| assertThat(jsonResponse.error().code()).isEqualTo(McpSchema.ErrorCodes.METHOD_NOT_FOUND); | ||
| } | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // Subscribe / unsubscribe happy paths | ||
| // ------------------------------------------------------------------------- | ||
|
|
||
| @Test | ||
| void subscribeRequestReturnsEmptyResult() { | ||
| mcpAsyncServer = McpServer.async(mockTransportProvider) | ||
| .serverInfo("test-server", "1.0.0") | ||
| .capabilities(ServerCapabilities.builder().resources(true, false).build()) | ||
| .build(); | ||
|
|
||
| initializeSession(); | ||
| mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); | ||
|
|
||
| McpSchema.JSONRPCMessage response = mockTransport.getLastSentMessage(); | ||
| assertThat(response).isInstanceOf(McpSchema.JSONRPCResponse.class); | ||
| McpSchema.JSONRPCResponse jsonResponse = (McpSchema.JSONRPCResponse) response; | ||
| assertThat(jsonResponse.id()).isEqualTo("req-1"); | ||
| assertThat(jsonResponse.error()).isNull(); | ||
| } | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // addResource notification behaviour | ||
| // ------------------------------------------------------------------------- | ||
|
|
||
| @Test | ||
| void addSubscribedResourceSendsResourcesUpdatedNotification() { | ||
| mcpAsyncServer = McpServer.async(mockTransportProvider) | ||
| .serverInfo("test-server", "1.0.0") | ||
| .capabilities(ServerCapabilities.builder().resources(true, false).build()) | ||
| .build(); | ||
|
|
||
| initializeSession(); | ||
| // Client subscribes first | ||
| mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); | ||
| mockTransport.clearSentMessages(); | ||
|
|
||
| mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5)); | ||
|
|
||
| List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages(); | ||
| assertThat(sent).hasSize(1); | ||
| assertThat(sent.get(0)).isInstanceOf(McpSchema.JSONRPCNotification.class); | ||
| McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent.get(0); | ||
| assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED); | ||
| } | ||
|
|
||
| @Test | ||
| void addSubscribedResourceWithListChangedSendsBothNotifications() { | ||
| mcpAsyncServer = McpServer.async(mockTransportProvider) | ||
| .serverInfo("test-server", "1.0.0") | ||
| .capabilities(ServerCapabilities.builder().resources(true, true).build()) | ||
| .build(); | ||
|
|
||
| initializeSession(); | ||
| mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); | ||
| mockTransport.clearSentMessages(); | ||
|
|
||
| mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5)); | ||
|
|
||
| List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages(); | ||
| assertThat(sent).hasSize(2); | ||
|
|
||
| List<String> methods = sent.stream() | ||
| .filter(m -> m instanceof McpSchema.JSONRPCNotification) | ||
| .map(m -> ((McpSchema.JSONRPCNotification) m).method()) | ||
| .toList(); | ||
| assertThat(methods).containsExactly(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, | ||
| McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED); | ||
| } | ||
|
|
||
| @Test | ||
| void addUnsubscribedResourceDoesNotSendResourcesUpdatedNotification() { | ||
| mcpAsyncServer = McpServer.async(mockTransportProvider) | ||
| .serverInfo("test-server", "1.0.0") | ||
| .capabilities(ServerCapabilities.builder().resources(true, true).build()) | ||
| .build(); | ||
|
|
||
| initializeSession(); | ||
| // No subscribe call — resource URI is not in the subscription map | ||
| mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5)); | ||
|
|
||
| List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages(); | ||
| assertThat(sent).hasSize(1); | ||
| McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent.get(0); | ||
| assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED); | ||
| } | ||
|
|
||
| @Test | ||
| void addResourceAfterUnsubscribeDoesNotSendResourcesUpdatedNotification() { | ||
| mcpAsyncServer = McpServer.async(mockTransportProvider) | ||
| .serverInfo("test-server", "1.0.0") | ||
| .capabilities(ServerCapabilities.builder().resources(true, false).build()) | ||
| .build(); | ||
|
|
||
| initializeSession(); | ||
| mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI)); | ||
| mockTransportProvider.simulateIncomingMessage(unsubscribeRequest("req-2", RESOURCE_URI)); | ||
| mockTransport.clearSentMessages(); | ||
|
|
||
| mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5)); | ||
|
|
||
| // No notifications expected: not subscribed, listChanged=false | ||
| assertThat(mockTransport.getAllSentMessages()).isEmpty(); | ||
| } | ||
|
|
||
| // ------------------------------------------------------------------------- | ||
| // Helpers | ||
| // ------------------------------------------------------------------------- | ||
|
|
||
| /** | ||
| * Performs the MCP initialization handshake so that the session's exchangeSink is | ||
| * populated and subsequent request handlers can be invoked. | ||
| */ | ||
| private void initializeSession() { | ||
| mockTransportProvider.simulateIncomingMessage(new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, | ||
| McpSchema.METHOD_INITIALIZE, "init-req", new McpSchema.InitializeRequest( | ||
| ProtocolVersions.MCP_2025_11_25, null, new McpSchema.Implementation("test-client", "1.0.0")))); | ||
|
|
||
| mockTransportProvider.simulateIncomingMessage(new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, | ||
| McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)); | ||
|
|
||
| mockTransport.clearSentMessages(); | ||
| } | ||
|
|
||
| private static McpSchema.JSONRPCRequest subscribeRequest(String id, String uri) { | ||
| return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_SUBSCRIBE, id, | ||
| new McpSchema.SubscribeRequest(uri)); | ||
| } | ||
|
|
||
| private static McpSchema.JSONRPCRequest unsubscribeRequest(String id, String uri) { | ||
| return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, id, | ||
| new McpSchema.SubscribeRequest(uri)); | ||
| } | ||
|
|
||
| private static McpServerFeatures.AsyncResourceSpecification resourceSpec(String uri) { | ||
| McpSchema.Resource resource = McpSchema.Resource.builder().uri(uri).name("test-resource").build(); | ||
| return new McpServerFeatures.AsyncResourceSpecification(resource, | ||
| (exchange, req) -> Mono.just(new ReadResourceResult(List.of()))); | ||
| } | ||
|
|
||
| } |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SubscribeRequestis not needed right now, but perhaps the meta-information can be of use for later. For now I'd keep the field asMap<String, Set<String>>of uri -> set(session_id).io.modelcontextprotocol.server.McpAsyncServer#notifyResourcesUpdatedto filter the clients to notify about a resource update (look at uri fromio.modelcontextprotocol.spec.McpSchema.ResourcesUpdatedNotification#uri)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized this requires the
McpTransportProviderto expose an API that accepts the sessionId or a set of them. For subscription related notifications, it has to be the listening stream (because the act of subscribing which has to receive a response, so even if the request upgrades to an SSE stream, it has to be closed after the response). So it makes sense to expose it at this level.