Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions conformance-tests/conformance-baseline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@
# This file lists known failing scenarios that are expected to fail until fixed.
# See: https://github.com/modelcontextprotocol/conformance/blob/main/SDK_INTEGRATION.md

server:
# Resource subscription not implemented in SDK
- resources-subscribe
- resources-unsubscribe

client:
# SSE retry field handling not implemented
# - Client does not parse or respect retry: field timing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion;
import io.modelcontextprotocol.spec.McpSchema.ErrorCodes;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpSchema.PromptReference;
import io.modelcontextprotocol.spec.McpSchema.ResourceReference;
import io.modelcontextprotocol.spec.McpSchema.ResourcesUpdatedNotification;
import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest;
import io.modelcontextprotocol.spec.McpSchema.Tool;
import io.modelcontextprotocol.spec.McpServerSession;
Expand Down Expand Up @@ -109,6 +109,8 @@ public class McpAsyncServer {

private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceTemplateSpecification> resourceTemplates = new ConcurrentHashMap<>();

private final Map<String, McpSchema.SubscribeRequest> resourceSubscriptions = new ConcurrentHashMap<>();
Copy link
Member

@chemicL chemicL Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This construct should include a session ID so that we know which client to notify about which resource.
  2. The SubscribeRequest is not needed right now, but perhaps the meta-information can be of use for later. For now I'd keep the field as Map<String, Set<String>> of uri -> set(session_id).
  3. Use this map from io.modelcontextprotocol.server.McpAsyncServer#notifyResourcesUpdated to filter the clients to notify about a resource update (look at uri from io.modelcontextprotocol.spec.McpSchema.ResourcesUpdatedNotification#uri)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized this requires the McpTransportProvider to expose an API that accepts the sessionId or a set of them. For subscription related notifications, it has to be the listening stream (because the act of subscribing which has to receive a response, so even if the request upgrades to an SSE stream, it has to be closed after the response). So it makes sense to expose it at this level.


private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();

// FIXME: this field is deprecated and should be remvoed together with the
Expand Down Expand Up @@ -215,6 +217,11 @@ private Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());

if (this.serverCapabilities.resources().subscribe()) {
requestHandlers.put(McpSchema.METHOD_RESOURCES_SUBSCRIBE, resourcesSubscribeHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, resourcesUnsubscribeHandler());
}
}

// Add prompts API handlers if provider exists
Expand Down Expand Up @@ -560,15 +567,26 @@ public Mono<Void> addResource(McpServerFeatures.AsyncResourceSpecification resou
}

return Mono.defer(() -> {
var previous = this.resources.put(resourceSpecification.resource().uri(), resourceSpecification);
var resourceUri = resourceSpecification.resource().uri();

var previous = this.resources.put(resourceUri, resourceSpecification);
if (previous != null) {
logger.warn("Replace existing Resource with URI '{}'", resourceSpecification.resource().uri());
logger.warn("Replace existing Resource with URI '{}'", resourceUri);
}
else {
logger.debug("Added resource handler: {}", resourceSpecification.resource().uri());
logger.debug("Added resource handler: {}", resourceUri);
}

if (this.serverCapabilities.resources().subscribe()
&& this.resourceSubscriptions.containsKey(resourceUri)) {
Mono<Void> updated = this.notifyResourcesUpdated(new ResourcesUpdatedNotification(resourceUri));
if (this.serverCapabilities.resources().listChanged()) {
return updated.then(this.notifyResourcesListChanged());
}
return updated;
}
Comment on lines +580 to 587
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is unnecessary. A resource is not updated when it's added but when it's modify. This code block should be removed.

if (this.serverCapabilities.resources().listChanged()) {
return notifyResourcesListChanged();
return this.notifyResourcesListChanged();
}
return Mono.empty();
});
Expand Down Expand Up @@ -600,8 +618,9 @@ public Mono<Void> removeResource(String resourceUri) {
McpServerFeatures.AsyncResourceSpecification removed = this.resources.remove(resourceUri);
if (removed != null) {
logger.debug("Removed resource handler: {}", resourceUri);

if (this.serverCapabilities.resources().listChanged()) {
return notifyResourcesListChanged();
return this.notifyResourcesListChanged();
}
return Mono.empty();
}
Expand Down Expand Up @@ -734,6 +753,33 @@ private McpRequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHand
};
}

private McpRequestHandler<Object> resourcesSubscribeHandler() {
return (ex, params) -> {
McpSchema.SubscribeRequest resourceSubscribeRequest = jsonMapper.convertValue(params, new TypeRef<>() {
});

var resourceUri = resourceSubscribeRequest.uri();

this.resourceSubscriptions.put(resourceUri, resourceSubscribeRequest);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ex.sessionId() for the mapping.


return Mono.just(Map.of());
};
}

private McpRequestHandler<Object> resourcesUnsubscribeHandler() {

return (ex, params) -> {
McpSchema.SubscribeRequest resourceSubscribeRequest = jsonMapper.convertValue(params, new TypeRef<>() {
});

var resourceUri = resourceSubscribeRequest.uri();

this.resourceSubscriptions.remove(resourceUri);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only remove the particular session, not the resource uri from the mapping - only remove it if no sessions are left that have a subscription.


return Mono.just(Map.of());
};
}

private Optional<McpServerFeatures.AsyncResourceSpecification> findResourceSpecification(String uri) {
var result = this.resources.values()
.stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Copyright 2024-2024 the original author or authors.
*/

package io.modelcontextprotocol.server;

import java.time.Duration;
import java.util.List;

import io.modelcontextprotocol.MockMcpServerTransport;
import io.modelcontextprotocol.MockMcpServerTransportProvider;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.ReadResourceResult;
import io.modelcontextprotocol.spec.McpSchema.ServerCapabilities;
import io.modelcontextprotocol.spec.ProtocolVersions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

/**
* Tests for resource subscribe/unsubscribe support in {@link McpAsyncServer}.
*
* Covers the handlers registered under {@code resources/subscribe} and
* {@code resources/unsubscribe}, the capability guard that keeps them absent when
* {@code subscribe=false}, and the {@code notifications/resources/updated} notification
* emitted by {@link McpAsyncServer#addResource} when a subscribed URI is (re-)added.
*/
class ResourceSubscriptionTests {

private static final String RESOURCE_URI = "test://resource/item";

private MockMcpServerTransport mockTransport;

private MockMcpServerTransportProvider mockTransportProvider;

private McpAsyncServer mcpAsyncServer;

@BeforeEach
void setUp() {
mockTransport = new MockMcpServerTransport();
mockTransportProvider = new MockMcpServerTransportProvider(mockTransport);
}

@AfterEach
void tearDown() {
if (mcpAsyncServer != null) {
assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10)))
.doesNotThrowAnyException();
}
}

// -------------------------------------------------------------------------
// Capability guard
// -------------------------------------------------------------------------

@Test
void subscribeHandlerNotRegisteredWhenSubscribeCapabilityDisabled() {
mcpAsyncServer = McpServer.async(mockTransportProvider)
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().resources(false, false).build())
.build();

initializeSession();
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));

McpSchema.JSONRPCMessage response = mockTransport.getLastSentMessage();
assertThat(response).isInstanceOf(McpSchema.JSONRPCResponse.class);
McpSchema.JSONRPCResponse jsonResponse = (McpSchema.JSONRPCResponse) response;
assertThat(jsonResponse.error()).isNotNull();
assertThat(jsonResponse.error().code()).isEqualTo(McpSchema.ErrorCodes.METHOD_NOT_FOUND);
}

// -------------------------------------------------------------------------
// Subscribe / unsubscribe happy paths
// -------------------------------------------------------------------------

@Test
void subscribeRequestReturnsEmptyResult() {
mcpAsyncServer = McpServer.async(mockTransportProvider)
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().resources(true, false).build())
.build();

initializeSession();
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));

McpSchema.JSONRPCMessage response = mockTransport.getLastSentMessage();
assertThat(response).isInstanceOf(McpSchema.JSONRPCResponse.class);
McpSchema.JSONRPCResponse jsonResponse = (McpSchema.JSONRPCResponse) response;
assertThat(jsonResponse.id()).isEqualTo("req-1");
assertThat(jsonResponse.error()).isNull();
}

// -------------------------------------------------------------------------
// addResource notification behaviour
// -------------------------------------------------------------------------

@Test
void addSubscribedResourceSendsResourcesUpdatedNotification() {
mcpAsyncServer = McpServer.async(mockTransportProvider)
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().resources(true, false).build())
.build();

initializeSession();
// Client subscribes first
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));
mockTransport.clearSentMessages();

mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5));

List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages();
assertThat(sent).hasSize(1);
assertThat(sent.get(0)).isInstanceOf(McpSchema.JSONRPCNotification.class);
McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent.get(0);
assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED);
}

@Test
void addSubscribedResourceWithListChangedSendsBothNotifications() {
mcpAsyncServer = McpServer.async(mockTransportProvider)
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().resources(true, true).build())
.build();

initializeSession();
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));
mockTransport.clearSentMessages();

mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5));

List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages();
assertThat(sent).hasSize(2);

List<String> methods = sent.stream()
.filter(m -> m instanceof McpSchema.JSONRPCNotification)
.map(m -> ((McpSchema.JSONRPCNotification) m).method())
.toList();
assertThat(methods).containsExactly(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED);
}

@Test
void addUnsubscribedResourceDoesNotSendResourcesUpdatedNotification() {
mcpAsyncServer = McpServer.async(mockTransportProvider)
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().resources(true, true).build())
.build();

initializeSession();
// No subscribe call — resource URI is not in the subscription map
mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5));

List<McpSchema.JSONRPCMessage> sent = mockTransport.getAllSentMessages();
assertThat(sent).hasSize(1);
McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent.get(0);
assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED);
}

@Test
void addResourceAfterUnsubscribeDoesNotSendResourcesUpdatedNotification() {
mcpAsyncServer = McpServer.async(mockTransportProvider)
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().resources(true, false).build())
.build();

initializeSession();
mockTransportProvider.simulateIncomingMessage(subscribeRequest("req-1", RESOURCE_URI));
mockTransportProvider.simulateIncomingMessage(unsubscribeRequest("req-2", RESOURCE_URI));
mockTransport.clearSentMessages();

mcpAsyncServer.addResource(resourceSpec(RESOURCE_URI)).block(Duration.ofSeconds(5));

// No notifications expected: not subscribed, listChanged=false
assertThat(mockTransport.getAllSentMessages()).isEmpty();
}

// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------

/**
* Performs the MCP initialization handshake so that the session's exchangeSink is
* populated and subsequent request handlers can be invoked.
*/
private void initializeSession() {
mockTransportProvider.simulateIncomingMessage(new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
McpSchema.METHOD_INITIALIZE, "init-req", new McpSchema.InitializeRequest(
ProtocolVersions.MCP_2025_11_25, null, new McpSchema.Implementation("test-client", "1.0.0"))));

mockTransportProvider.simulateIncomingMessage(new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
McpSchema.METHOD_NOTIFICATION_INITIALIZED, null));

mockTransport.clearSentMessages();
}

private static McpSchema.JSONRPCRequest subscribeRequest(String id, String uri) {
return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_SUBSCRIBE, id,
new McpSchema.SubscribeRequest(uri));
}

private static McpSchema.JSONRPCRequest unsubscribeRequest(String id, String uri) {
return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, id,
new McpSchema.SubscribeRequest(uri));
}

private static McpServerFeatures.AsyncResourceSpecification resourceSpec(String uri) {
McpSchema.Resource resource = McpSchema.Resource.builder().uri(uri).name("test-resource").build();
return new McpServerFeatures.AsyncResourceSpecification(resource,
(exchange, req) -> Mono.just(new ReadResourceResult(List.of())));
}

}