Skip to content

Commit d6ccf57

Browse files
committed
Add resource subscribe/unsubscribe support to McpAsyncServer
- Register resources/subscribe and resources/unsubscribe handlers when the subscribe server capability is enabled - Track active subscriptions in a ConcurrentHashMap keyed by resource URI - Emit notifications/resources/updated when addResource() is called for a subscribed URI; also send listChanged notification if enabled - Add ResourceSubscriptionTests covering the capability guard, subscribe/ unsubscribe happy paths, and the updated notification flow Note: as with some other notificaitons this impl. is not bound to user sessiosns Resolves: #837, #776 Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent 46bacda commit d6ccf57

File tree

2 files changed

+269
-6
lines changed

2 files changed

+269
-6
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion;
2626
import io.modelcontextprotocol.spec.McpSchema.ErrorCodes;
2727
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
28-
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
2928
import io.modelcontextprotocol.spec.McpSchema.PromptReference;
3029
import io.modelcontextprotocol.spec.McpSchema.ResourceReference;
30+
import io.modelcontextprotocol.spec.McpSchema.ResourcesUpdatedNotification;
3131
import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest;
3232
import io.modelcontextprotocol.spec.McpSchema.Tool;
3333
import io.modelcontextprotocol.spec.McpServerSession;
@@ -109,6 +109,8 @@ public class McpAsyncServer {
109109

110110
private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates = new ConcurrentHashMap<>();
111111

112+
private final Map<String, McpSchema.SubscribeRequest> resourceSubscriptions = new ConcurrentHashMap<>();
113+
112114
private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
113115

114116
// FIXME: this field is deprecated and should be remvoed together with the
@@ -215,6 +217,11 @@ private Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
215217
requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
216218
requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
217219
requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());
220+
221+
if (this.serverCapabilities.resources().subscribe()) {
222+
requestHandlers.put(McpSchema.METHOD_RESOURCES_SUBSCRIBE, resourcesSubscribeHandler());
223+
requestHandlers.put(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, resourcesUnsubscribeHandler());
224+
}
218225
}
219226

220227
// Add prompts API handlers if provider exists
@@ -560,15 +567,26 @@ public Mono<Void> addResource(McpServerFeatures.AsyncResourceSpecification resou
560567
}
561568

562569
return Mono.defer(() -> {
563-
var previous = this.resources.put(resourceSpecification.resource().uri(), resourceSpecification);
570+
var resourceUri = resourceSpecification.resource().uri();
571+
572+
var previous = this.resources.put(resourceUri, resourceSpecification);
564573
if (previous != null) {
565-
logger.warn("Replace existing Resource with URI '{}'", resourceSpecification.resource().uri());
574+
logger.warn("Replace existing Resource with URI '{}'", resourceUri);
566575
}
567576
else {
568-
logger.debug("Added resource handler: {}", resourceSpecification.resource().uri());
577+
logger.debug("Added resource handler: {}", resourceUri);
578+
}
579+
580+
if (this.serverCapabilities.resources().subscribe()
581+
&& this.resourceSubscriptions.containsKey(resourceUri)) {
582+
Mono<Void> updated = this.notifyResourcesUpdated(new ResourcesUpdatedNotification(resourceUri));
583+
if (this.serverCapabilities.resources().listChanged()) {
584+
return updated.then(this.notifyResourcesListChanged());
585+
}
586+
return updated;
569587
}
570588
if (this.serverCapabilities.resources().listChanged()) {
571-
return notifyResourcesListChanged();
589+
return this.notifyResourcesListChanged();
572590
}
573591
return Mono.empty();
574592
});
@@ -600,8 +618,9 @@ public Mono<Void> removeResource(String resourceUri) {
600618
McpServerFeatures.AsyncResourceSpecification removed = this.resources.remove(resourceUri);
601619
if (removed != null) {
602620
logger.debug("Removed resource handler: {}", resourceUri);
621+
603622
if (this.serverCapabilities.resources().listChanged()) {
604-
return notifyResourcesListChanged();
623+
return this.notifyResourcesListChanged();
605624
}
606625
return Mono.empty();
607626
}
@@ -734,6 +753,33 @@ private McpRequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHand
734753
};
735754
}
736755

756+
private McpRequestHandler<Object> resourcesSubscribeHandler() {
757+
return (ex, params) -> {
758+
McpSchema.SubscribeRequest resourceSubscribeRequest = jsonMapper.convertValue(params, new TypeRef<>() {
759+
});
760+
761+
var resourceUri = resourceSubscribeRequest.uri();
762+
763+
this.resourceSubscriptions.put(resourceUri, resourceSubscribeRequest);
764+
765+
return Mono.just(Map.of());
766+
};
767+
}
768+
769+
private McpRequestHandler<Object> resourcesUnsubscribeHandler() {
770+
771+
return (ex, params) -> {
772+
McpSchema.SubscribeRequest resourceSubscribeRequest = jsonMapper.convertValue(params, new TypeRef<>() {
773+
});
774+
775+
var resourceUri = resourceSubscribeRequest.uri();
776+
777+
this.resourceSubscriptions.remove(resourceUri);
778+
779+
return Mono.just(Map.of());
780+
};
781+
}
782+
737783
private Optional<McpServerFeatures.AsyncResourceSpecification> findResourceSpecification(String uri) {
738784
var result = this.resources.values()
739785
.stream()
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.server;
6+
7+
import java.time.Duration;
8+
import java.util.List;
9+
10+
import io.modelcontextprotocol.MockMcpServerTransport;
11+
import io.modelcontextprotocol.MockMcpServerTransportProvider;
12+
import io.modelcontextprotocol.spec.McpSchema;
13+
import io.modelcontextprotocol.spec.McpSchema.ReadResourceResult;
14+
import io.modelcontextprotocol.spec.McpSchema.ServerCapabilities;
15+
import io.modelcontextprotocol.spec.ProtocolVersions;
16+
import org.junit.jupiter.api.AfterEach;
17+
import org.junit.jupiter.api.BeforeEach;
18+
import org.junit.jupiter.api.Test;
19+
import reactor.core.publisher.Mono;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.assertThatCode;
23+
24+
/**
25+
* Tests for resource subscribe/unsubscribe support in {@link McpAsyncServer}.
26+
*
27+
* Covers the handlers registered under {@code resources/subscribe} and
28+
* {@code resources/unsubscribe}, the capability guard that keeps them absent when
29+
* {@code subscribe=false}, and the {@code notifications/resources/updated} notification
30+
* emitted by {@link McpAsyncServer#addResource} when a subscribed URI is (re-)added.
31+
*/
32+
class ResourceSubscriptionTests {
33+
34+
private static final String RESOURCE_URI = "test://resource/item";
35+
36+
private MockMcpServerTransport mockTransport;
37+
38+
private MockMcpServerTransportProvider mockTransportProvider;
39+
40+
private McpAsyncServer mcpAsyncServer;
41+
42+
@BeforeEach
43+
void setUp() {
44+
mockTransport = new MockMcpServerTransport();
45+
mockTransportProvider = new MockMcpServerTransportProvider(mockTransport);
46+
}
47+
48+
@AfterEach
49+
void tearDown() {
50+
if (mcpAsyncServer != null) {
51+
assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10)))
52+
.doesNotThrowAnyException();
53+
}
54+
}
55+
56+
// -------------------------------------------------------------------------
57+
// Capability guard
58+
// -------------------------------------------------------------------------
59+
60+
@Test
61+
void subscribeHandlerNotRegisteredWhenSubscribeCapabilityDisabled() {
62+
mcpAsyncServer = McpServer.async(mockTransportProvider)
63+
.serverInfo("test-server", "1.0.0")
64+
.capabilities(ServerCapabilities.builder().resources(false, false).build())
65+
.build();
66+
67+
initializeSession();
68+
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));
69+
70+
McpSchema.JSONRPCMessage response = mockTransport.getLastSentMessage();
71+
assertThat(response).isInstanceOf(McpSchema.JSONRPCResponse.class);
72+
McpSchema.JSONRPCResponse jsonResponse = (McpSchema.JSONRPCResponse) response;
73+
assertThat(jsonResponse.error()).isNotNull();
74+
assertThat(jsonResponse.error().code()).isEqualTo(McpSchema.ErrorCodes.METHOD_NOT_FOUND);
75+
}
76+
77+
// -------------------------------------------------------------------------
78+
// Subscribe / unsubscribe happy paths
79+
// -------------------------------------------------------------------------
80+
81+
@Test
82+
void subscribeRequestReturnsEmptyResult() {
83+
mcpAsyncServer = McpServer.async(mockTransportProvider)
84+
.serverInfo("test-server", "1.0.0")
85+
.capabilities(ServerCapabilities.builder().resources(true, false).build())
86+
.build();
87+
88+
initializeSession();
89+
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));
90+
91+
McpSchema.JSONRPCMessage response = mockTransport.getLastSentMessage();
92+
assertThat(response).isInstanceOf(McpSchema.JSONRPCResponse.class);
93+
McpSchema.JSONRPCResponse jsonResponse = (McpSchema.JSONRPCResponse) response;
94+
assertThat(jsonResponse.id()).isEqualTo("req-1");
95+
assertThat(jsonResponse.error()).isNull();
96+
}
97+
98+
// -------------------------------------------------------------------------
99+
// addResource notification behaviour
100+
// -------------------------------------------------------------------------
101+
102+
@Test
103+
void addSubscribedResourceSendsResourcesUpdatedNotification() {
104+
mcpAsyncServer = McpServer.async(mockTransportProvider)
105+
.serverInfo("test-server", "1.0.0")
106+
.capabilities(ServerCapabilities.builder().resources(true, false).build())
107+
.build();
108+
109+
initializeSession();
110+
// Client subscribes first
111+
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));
112+
mockTransport.clearSentMessages();
113+
114+
mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5));
115+
116+
List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages();
117+
assertThat(sent).hasSize(1);
118+
assertThat(sent.get(0)).isInstanceOf(McpSchema.JSONRPCNotification.class);
119+
McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent.get(0);
120+
assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED);
121+
}
122+
123+
@Test
124+
void addSubscribedResourceWithListChangedSendsBothNotifications() {
125+
mcpAsyncServer = McpServer.async(mockTransportProvider)
126+
.serverInfo("test-server", "1.0.0")
127+
.capabilities(ServerCapabilities.builder().resources(true, true).build())
128+
.build();
129+
130+
initializeSession();
131+
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));
132+
mockTransport.clearSentMessages();
133+
134+
mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5));
135+
136+
List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages();
137+
assertThat(sent).hasSize(2);
138+
139+
List<String> methods = sent.stream()
140+
.filter(m -> m instanceof McpSchema.JSONRPCNotification)
141+
.map(m -> ((McpSchema.JSONRPCNotification) m).method())
142+
.toList();
143+
assertThat(methods).containsExactly(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
144+
McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED);
145+
}
146+
147+
@Test
148+
void addUnsubscribedResourceDoesNotSendResourcesUpdatedNotification() {
149+
mcpAsyncServer = McpServer.async(mockTransportProvider)
150+
.serverInfo("test-server", "1.0.0")
151+
.capabilities(ServerCapabilities.builder().resources(true, true).build())
152+
.build();
153+
154+
initializeSession();
155+
// No subscribe call — resource URI is not in the subscription map
156+
mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5));
157+
158+
List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages();
159+
assertThat(sent).hasSize(1);
160+
McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent.get(0);
161+
assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED);
162+
}
163+
164+
@Test
165+
void addResourceAfterUnsubscribeDoesNotSendResourcesUpdatedNotification() {
166+
mcpAsyncServer = McpServer.async(mockTransportProvider)
167+
.serverInfo("test-server", "1.0.0")
168+
.capabilities(ServerCapabilities.builder().resources(true, false).build())
169+
.build();
170+
171+
initializeSession();
172+
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));
173+
mockTransportProvider.simulateIncomingMessage(unsubscribeRequest("req-2", RESOURCE_URI));
174+
mockTransport.clearSentMessages();
175+
176+
mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5));
177+
178+
// No notifications expected: not subscribed, listChanged=false
179+
assertThat(mockTransport.getAllSentMessages()).isEmpty();
180+
}
181+
182+
// -------------------------------------------------------------------------
183+
// Helpers
184+
// -------------------------------------------------------------------------
185+
186+
/**
187+
* Performs the MCP initialization handshake so that the session's exchangeSink is
188+
* populated and subsequent request handlers can be invoked.
189+
*/
190+
private void initializeSession() {
191+
mockTransportProvider.simulateIncomingMessage(new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
192+
McpSchema.METHOD_INITIALIZE, "init-req", new McpSchema.InitializeRequest(
193+
ProtocolVersions.MCP_2025_11_25, null, new McpSchema.Implementation("test-client", "1.0.0"))));
194+
195+
mockTransportProvider.simulateIncomingMessage(new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
196+
McpSchema.METHOD_NOTIFICATION_INITIALIZED, null));
197+
198+
mockTransport.clearSentMessages();
199+
}
200+
201+
private static McpSchema.JSONRPCRequest subscribeRequest(String id, String uri) {
202+
return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_SUBSCRIBE, id,
203+
new McpSchema.SubscribeRequest(uri));
204+
}
205+
206+
private static McpSchema.JSONRPCRequest unsubscribeRequest(String id, String uri) {
207+
return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, id,
208+
new McpSchema.SubscribeRequest(uri));
209+
}
210+
211+
private static McpServerFeatures.AsyncResourceSpecification resourceSpec(String uri) {
212+
McpSchema.Resource resource = McpSchema.Resource.builder().uri(uri).name("test-resource").build();
213+
return new McpServerFeatures.AsyncResourceSpecification(resource,
214+
(exchange, req) -> Mono.just(new ReadResourceResult(List.of())));
215+
}
216+
217+
}

0 commit comments

Comments
 (0)