Skip to content

Commit 4b651b8

Browse files
feat: add closeStandaloneSSEStream for GET stream polling (#1203)
1 parent 5ceabfb commit 4b651b8

File tree

4 files changed

+310
-16
lines changed

4 files changed

+310
-16
lines changed

src/server/streamableHttp.test.ts

Lines changed: 271 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1738,7 +1738,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
17381738
expect(text).not.toContain('retry:');
17391739
});
17401740

1741-
it('should close POST SSE stream when closeSseStream is called', async () => {
1741+
it('should close POST SSE stream when extra.closeSSEStream is called', async () => {
17421742
const result = await createTestServer({
17431743
sessionIdGenerator: () => randomUUID(),
17441744
eventStore: createEventStore(),
@@ -1749,15 +1749,21 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
17491749
baseUrl = result.baseUrl;
17501750
mcpServer = result.mcpServer;
17511751

1752-
// Track tool execution state
1752+
// Track when stream close is called and tool completes
1753+
let streamCloseCalled = false;
17531754
let toolResolve: () => void;
1754-
const toolPromise = new Promise<void>(resolve => {
1755+
const toolCompletePromise = new Promise<void>(resolve => {
17551756
toolResolve = resolve;
17561757
});
17571758

1758-
// Register a blocking tool
1759-
mcpServer.tool('blocking-tool', 'A blocking tool', {}, async () => {
1760-
await toolPromise;
1759+
// Register a tool that closes its own SSE stream via extra callback
1760+
mcpServer.tool('close-stream-tool', 'Closes its own stream', {}, async (_args, extra) => {
1761+
// Close the SSE stream for this request
1762+
extra.closeSSEStream?.();
1763+
streamCloseCalled = true;
1764+
1765+
// Wait before returning so we can observe the stream closure
1766+
await toolCompletePromise;
17611767
return { content: [{ type: 'text', text: 'Done' }] };
17621768
});
17631769

@@ -1771,7 +1777,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
17711777
jsonrpc: '2.0',
17721778
id: 100,
17731779
method: 'tools/call',
1774-
params: { name: 'blocking-tool', arguments: {} }
1780+
params: { name: 'close-stream-tool', arguments: {} }
17751781
};
17761782

17771783
const postResponse = await fetch(baseUrl, {
@@ -1792,8 +1798,9 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
17921798
// Read the priming event
17931799
await reader!.read();
17941800

1795-
// Close the SSE stream
1796-
transport.closeSSEStream(100);
1801+
// Wait a moment for the tool to call closeSSEStream
1802+
await new Promise(resolve => setTimeout(resolve, 100));
1803+
expect(streamCloseCalled).toBe(true);
17971804

17981805
// Stream should now be closed
17991806
const { done } = await reader!.read();
@@ -1916,6 +1923,261 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
19161923
// Verify closeSSEStream callback was NOT provided
19171924
expect(receivedCloseSSEStream).toBeUndefined();
19181925
});
1926+
1927+
it('should provide closeStandaloneSSEStream callback in extra when eventStore is configured', async () => {
1928+
const result = await createTestServer({
1929+
sessionIdGenerator: () => randomUUID(),
1930+
eventStore: createEventStore(),
1931+
retryInterval: 1000
1932+
});
1933+
server = result.server;
1934+
transport = result.transport;
1935+
baseUrl = result.baseUrl;
1936+
mcpServer = result.mcpServer;
1937+
1938+
// Track whether closeStandaloneSSEStream callback was provided
1939+
let receivedCloseStandaloneSSEStream: (() => void) | undefined;
1940+
1941+
// Register a tool that captures the extra.closeStandaloneSSEStream callback
1942+
mcpServer.tool('test-standalone-callback-tool', 'Test tool', {}, async (_args, extra) => {
1943+
receivedCloseStandaloneSSEStream = extra.closeStandaloneSSEStream;
1944+
return { content: [{ type: 'text', text: 'Done' }] };
1945+
});
1946+
1947+
// Initialize to get session ID
1948+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1949+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1950+
expect(sessionId).toBeDefined();
1951+
1952+
// Call the tool
1953+
const toolCallRequest: JSONRPCMessage = {
1954+
jsonrpc: '2.0',
1955+
id: 203,
1956+
method: 'tools/call',
1957+
params: { name: 'test-standalone-callback-tool', arguments: {} }
1958+
};
1959+
1960+
const postResponse = await fetch(baseUrl, {
1961+
method: 'POST',
1962+
headers: {
1963+
'Content-Type': 'application/json',
1964+
Accept: 'text/event-stream, application/json',
1965+
'mcp-session-id': sessionId,
1966+
'mcp-protocol-version': '2025-03-26'
1967+
},
1968+
body: JSON.stringify(toolCallRequest)
1969+
});
1970+
1971+
expect(postResponse.status).toBe(200);
1972+
1973+
// Read all events to completion
1974+
const reader = postResponse.body?.getReader();
1975+
while (true) {
1976+
const { done } = await reader!.read();
1977+
if (done) break;
1978+
}
1979+
1980+
// Verify closeStandaloneSSEStream callback was provided
1981+
expect(receivedCloseStandaloneSSEStream).toBeDefined();
1982+
expect(typeof receivedCloseStandaloneSSEStream).toBe('function');
1983+
});
1984+
1985+
it('should close standalone GET SSE stream when extra.closeStandaloneSSEStream is called', async () => {
1986+
const result = await createTestServer({
1987+
sessionIdGenerator: () => randomUUID(),
1988+
eventStore: createEventStore(),
1989+
retryInterval: 1000
1990+
});
1991+
server = result.server;
1992+
transport = result.transport;
1993+
baseUrl = result.baseUrl;
1994+
mcpServer = result.mcpServer;
1995+
1996+
// Register a tool that closes the standalone SSE stream via extra callback
1997+
mcpServer.tool('close-standalone-stream-tool', 'Closes standalone stream', {}, async (_args, extra) => {
1998+
extra.closeStandaloneSSEStream?.();
1999+
return { content: [{ type: 'text', text: 'Stream closed' }] };
2000+
});
2001+
2002+
// Initialize to get session ID
2003+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
2004+
sessionId = initResponse.headers.get('mcp-session-id') as string;
2005+
expect(sessionId).toBeDefined();
2006+
2007+
// Open a standalone GET SSE stream
2008+
const sseResponse = await fetch(baseUrl, {
2009+
method: 'GET',
2010+
headers: {
2011+
Accept: 'text/event-stream',
2012+
'mcp-session-id': sessionId,
2013+
'mcp-protocol-version': '2025-03-26'
2014+
}
2015+
});
2016+
expect(sseResponse.status).toBe(200);
2017+
2018+
const getReader = sseResponse.body?.getReader();
2019+
2020+
// Send a notification to confirm GET stream is established
2021+
await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Stream established' });
2022+
2023+
// Read the notification to confirm stream is working
2024+
const { value } = await getReader!.read();
2025+
const text = new TextDecoder().decode(value);
2026+
expect(text).toContain('id: ');
2027+
expect(text).toContain('Stream established');
2028+
2029+
// Call the tool that closes the standalone SSE stream
2030+
const toolCallRequest: JSONRPCMessage = {
2031+
jsonrpc: '2.0',
2032+
id: 300,
2033+
method: 'tools/call',
2034+
params: { name: 'close-standalone-stream-tool', arguments: {} }
2035+
};
2036+
2037+
const postResponse = await fetch(baseUrl, {
2038+
method: 'POST',
2039+
headers: {
2040+
'Content-Type': 'application/json',
2041+
Accept: 'text/event-stream, application/json',
2042+
'mcp-session-id': sessionId,
2043+
'mcp-protocol-version': '2025-03-26'
2044+
},
2045+
body: JSON.stringify(toolCallRequest)
2046+
});
2047+
expect(postResponse.status).toBe(200);
2048+
2049+
// Read the POST response to completion
2050+
const postReader = postResponse.body?.getReader();
2051+
while (true) {
2052+
const { done } = await postReader!.read();
2053+
if (done) break;
2054+
}
2055+
2056+
// GET stream should now be closed - use a race with timeout to avoid hanging
2057+
const readPromise = getReader!.read();
2058+
const timeoutPromise = new Promise<{ done: boolean; value: undefined }>((_, reject) =>
2059+
setTimeout(() => reject(new Error('Stream did not close in time')), 1000)
2060+
);
2061+
2062+
const { done } = await Promise.race([readPromise, timeoutPromise]);
2063+
expect(done).toBe(true);
2064+
});
2065+
2066+
it('should allow client to reconnect after standalone SSE stream is closed via extra.closeStandaloneSSEStream', async () => {
2067+
const result = await createTestServer({
2068+
sessionIdGenerator: () => randomUUID(),
2069+
eventStore: createEventStore(),
2070+
retryInterval: 1000
2071+
});
2072+
server = result.server;
2073+
transport = result.transport;
2074+
baseUrl = result.baseUrl;
2075+
mcpServer = result.mcpServer;
2076+
2077+
// Register a tool that closes the standalone SSE stream
2078+
mcpServer.tool('close-standalone-for-reconnect', 'Closes standalone stream', {}, async (_args, extra) => {
2079+
extra.closeStandaloneSSEStream?.();
2080+
return { content: [{ type: 'text', text: 'Stream closed' }] };
2081+
});
2082+
2083+
// Initialize to get session ID
2084+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
2085+
sessionId = initResponse.headers.get('mcp-session-id') as string;
2086+
expect(sessionId).toBeDefined();
2087+
2088+
// Open a standalone GET SSE stream
2089+
const sseResponse = await fetch(baseUrl, {
2090+
method: 'GET',
2091+
headers: {
2092+
Accept: 'text/event-stream',
2093+
'mcp-session-id': sessionId,
2094+
'mcp-protocol-version': '2025-03-26'
2095+
}
2096+
});
2097+
expect(sseResponse.status).toBe(200);
2098+
2099+
const getReader = sseResponse.body?.getReader();
2100+
2101+
// Send a notification to get an event ID
2102+
await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Initial message' });
2103+
2104+
// Read the notification to get the event ID
2105+
const { value } = await getReader!.read();
2106+
const text = new TextDecoder().decode(value);
2107+
const idMatch = text.match(/id: ([^\n]+)/);
2108+
expect(idMatch).toBeTruthy();
2109+
const lastEventId = idMatch![1];
2110+
2111+
// Call the tool to close the standalone SSE stream
2112+
const toolCallRequest: JSONRPCMessage = {
2113+
jsonrpc: '2.0',
2114+
id: 301,
2115+
method: 'tools/call',
2116+
params: { name: 'close-standalone-for-reconnect', arguments: {} }
2117+
};
2118+
2119+
const postResponse = await fetch(baseUrl, {
2120+
method: 'POST',
2121+
headers: {
2122+
'Content-Type': 'application/json',
2123+
Accept: 'text/event-stream, application/json',
2124+
'mcp-session-id': sessionId,
2125+
'mcp-protocol-version': '2025-03-26'
2126+
},
2127+
body: JSON.stringify(toolCallRequest)
2128+
});
2129+
expect(postResponse.status).toBe(200);
2130+
2131+
// Read the POST response to completion
2132+
const postReader = postResponse.body?.getReader();
2133+
while (true) {
2134+
const { done } = await postReader!.read();
2135+
if (done) break;
2136+
}
2137+
2138+
// Wait for GET stream to close - use a race with timeout
2139+
const readPromise = getReader!.read();
2140+
const timeoutPromise = new Promise<{ done: boolean; value: undefined }>((_, reject) =>
2141+
setTimeout(() => reject(new Error('Stream did not close in time')), 1000)
2142+
);
2143+
const { done } = await Promise.race([readPromise, timeoutPromise]);
2144+
expect(done).toBe(true);
2145+
2146+
// Send a notification while client is disconnected
2147+
await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'Missed while disconnected' });
2148+
2149+
// Client reconnects with Last-Event-ID
2150+
const reconnectResponse = await fetch(baseUrl, {
2151+
method: 'GET',
2152+
headers: {
2153+
Accept: 'text/event-stream',
2154+
'mcp-session-id': sessionId,
2155+
'mcp-protocol-version': '2025-03-26',
2156+
'last-event-id': lastEventId
2157+
}
2158+
});
2159+
expect(reconnectResponse.status).toBe(200);
2160+
2161+
// Read the replayed notification
2162+
const reconnectReader = reconnectResponse.body?.getReader();
2163+
let allText = '';
2164+
const readWithTimeout = async () => {
2165+
const timeout = setTimeout(() => reconnectReader!.cancel(), 2000);
2166+
try {
2167+
while (!allText.includes('Missed while disconnected')) {
2168+
const { value, done } = await reconnectReader!.read();
2169+
if (done) break;
2170+
allText += new TextDecoder().decode(value);
2171+
}
2172+
} finally {
2173+
clearTimeout(timeout);
2174+
}
2175+
};
2176+
await readWithTimeout();
2177+
2178+
// Verify we received the notification that was sent while disconnected
2179+
expect(allText).toContain('Missed while disconnected');
2180+
});
19192181
});
19202182

19212183
// Test onsessionclosed callback

src/server/streamableHttp.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -651,13 +651,17 @@ export class StreamableHTTPServerTransport implements Transport {
651651
for (const message of messages) {
652652
// Build closeSSEStream callback for requests when eventStore is configured
653653
let closeSSEStream: (() => void) | undefined;
654+
let closeStandaloneSSEStream: (() => void) | undefined;
654655
if (isJSONRPCRequest(message) && this._eventStore) {
655656
closeSSEStream = () => {
656657
this.closeSSEStream(message.id);
657658
};
659+
closeStandaloneSSEStream = () => {
660+
this.closeStandaloneSSEStream();
661+
};
658662
}
659663

660-
this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream });
664+
this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream, closeStandaloneSSEStream });
661665
}
662666
// The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses
663667
// This will be handled by the send() method when responses are ready
@@ -814,6 +818,18 @@ export class StreamableHTTPServerTransport implements Transport {
814818
}
815819
}
816820

821+
/**
822+
* Close the standalone GET SSE stream, triggering client reconnection.
823+
* Use this to implement polling behavior for server-initiated notifications.
824+
*/
825+
closeStandaloneSSEStream(): void {
826+
const stream = this._streamMapping.get(this._standaloneSseStreamId);
827+
if (stream) {
828+
stream.end();
829+
this._streamMapping.delete(this._standaloneSseStreamId);
830+
}
831+
}
832+
817833
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
818834
let requestId = options?.relatedRequestId;
819835
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
@@ -829,19 +845,21 @@ export class StreamableHTTPServerTransport implements Transport {
829845
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
830846
throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request');
831847
}
832-
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
833-
if (standaloneSse === undefined) {
834-
// The spec says the server MAY send messages on the stream, so it's ok to discard if no stream
835-
return;
836-
}
837848

838849
// Generate and store event ID if event store is provided
850+
// Store even if stream is disconnected so events can be replayed on reconnect
839851
let eventId: string | undefined;
840852
if (this._eventStore) {
841853
// Stores the event and gets the generated event ID
842854
eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message);
843855
}
844856

857+
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
858+
if (standaloneSse === undefined) {
859+
// Stream is disconnected - event is stored for replay, nothing more to do
860+
return;
861+
}
862+
845863
// Send the message to the standalone SSE stream
846864
this.writeSSEEvent(standaloneSse, message, eventId);
847865
return;

0 commit comments

Comments
 (0)