Skip to content

Commit a37d53d

Browse files
simplify: remove retryInterval from closeSSEStream callback
Remove the optional retryInterval parameter from closeSSEStream callback for simplicity. The transport's default retry interval (from priming events) is sufficient. This aligns with Python SDK.
1 parent 517480e commit a37d53d

File tree

5 files changed

+10
-105
lines changed

5 files changed

+10
-105
lines changed

src/examples/server/ssePollingExample.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ server.tool(
6262
await sleep(1000);
6363

6464
// Server decides to disconnect the client to free resources
65-
// Client will reconnect via GET with Last-Event-ID after retryInterval
65+
// Client will reconnect via GET with Last-Event-ID after the transport's retryInterval
6666
// Use extra.closeSSEStream callback - available when eventStore is configured
6767
if (extra.closeSSEStream) {
6868
console.log(`[${extra.sessionId}] Closing SSE stream to trigger client polling...`);
69-
extra.closeSSEStream({ retryInterval: 2000 });
69+
extra.closeSSEStream();
7070
}
7171

7272
// Continue processing while client is disconnected

src/server/streamableHttp.test.ts

Lines changed: 2 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,7 +1815,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
18151815
mcpServer = result.mcpServer;
18161816

18171817
// Track whether closeSSEStream callback was provided
1818-
let receivedCloseSSEStream: ((options?: { retryInterval?: number }) => void) | undefined;
1818+
let receivedCloseSSEStream: (() => void) | undefined;
18191819

18201820
// Register a tool that captures the extra.closeSSEStream callback
18211821
mcpServer.tool('test-callback-tool', 'Test tool', {}, async (_args, extra) => {
@@ -1872,7 +1872,7 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
18721872
mcpServer = result.mcpServer;
18731873

18741874
// Track whether closeSSEStream callback was provided
1875-
let receivedCloseSSEStream: ((options?: { retryInterval?: number }) => void) | undefined;
1875+
let receivedCloseSSEStream: (() => void) | undefined;
18761876

18771877
// Register a tool that captures the extra.closeSSEStream callback
18781878
mcpServer.tool('test-no-callback-tool', 'Test tool', {}, async (_args, extra) => {
@@ -1916,76 +1916,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
19161916
// Verify closeSSEStream callback was NOT provided
19171917
expect(receivedCloseSSEStream).toBeUndefined();
19181918
});
1919-
1920-
it('should send custom retry interval when closeSSEStream is called with retryInterval', async () => {
1921-
const result = await createTestServer({
1922-
sessionIdGenerator: () => randomUUID(),
1923-
eventStore: createEventStore(),
1924-
retryInterval: 1000 // Default
1925-
});
1926-
server = result.server;
1927-
transport = result.transport;
1928-
baseUrl = result.baseUrl;
1929-
mcpServer = result.mcpServer;
1930-
1931-
// Track tool execution state
1932-
let toolResolve: () => void;
1933-
const toolPromise = new Promise<void>(resolve => {
1934-
toolResolve = resolve;
1935-
});
1936-
1937-
// Register a tool that uses closeSSEStream with custom retry interval
1938-
mcpServer.tool('custom-retry-tool', 'Test tool', {}, async (_args, extra) => {
1939-
// Use closeSSEStream with custom retry interval
1940-
extra.closeSSEStream?.({ retryInterval: 5000 });
1941-
await toolPromise;
1942-
return { content: [{ type: 'text', text: 'Done' }] };
1943-
});
1944-
1945-
// Initialize to get session ID
1946-
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1947-
sessionId = initResponse.headers.get('mcp-session-id') as string;
1948-
expect(sessionId).toBeDefined();
1949-
1950-
// Call the tool
1951-
const toolCallRequest: JSONRPCMessage = {
1952-
jsonrpc: '2.0',
1953-
id: 202,
1954-
method: 'tools/call',
1955-
params: { name: 'custom-retry-tool', arguments: {} }
1956-
};
1957-
1958-
const postResponse = await fetch(baseUrl, {
1959-
method: 'POST',
1960-
headers: {
1961-
'Content-Type': 'application/json',
1962-
Accept: 'text/event-stream, application/json',
1963-
'mcp-session-id': sessionId,
1964-
'mcp-protocol-version': '2025-03-26'
1965-
},
1966-
body: JSON.stringify(toolCallRequest)
1967-
});
1968-
1969-
expect(postResponse.status).toBe(200);
1970-
1971-
// Collect all SSE data
1972-
const reader = postResponse.body?.getReader();
1973-
let allText = '';
1974-
while (true) {
1975-
const { done, value } = await reader!.read();
1976-
if (value) {
1977-
allText += new TextDecoder().decode(value);
1978-
}
1979-
if (done) break;
1980-
}
1981-
1982-
// Verify the custom retry interval was sent
1983-
// The stream should contain "retry: 5000" (the custom value)
1984-
expect(allText).toContain('retry: 5000');
1985-
1986-
// Clean up
1987-
toolResolve!();
1988-
});
19891919
});
19901920

19911921
// Test onsessionclosed callback

src/server/streamableHttp.ts

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { IncomingMessage, ServerResponse } from 'node:http';
22
import { Transport } from '../shared/transport.js';
33
import {
4-
CloseSSEStreamOptions,
54
MessageExtraInfo,
65
RequestInfo,
76
isInitializeRequest,
@@ -651,10 +650,10 @@ export class StreamableHTTPServerTransport implements Transport {
651650
// handle each message
652651
for (const message of messages) {
653652
// Build closeSSEStream callback for requests when eventStore is configured
654-
let closeSSEStream: ((options?: CloseSSEStreamOptions) => void) | undefined;
653+
let closeSSEStream: (() => void) | undefined;
655654
if (isJSONRPCRequest(message) && this._eventStore) {
656-
closeSSEStream = (options?: CloseSSEStreamOptions) => {
657-
this.closeSSEStream(message.id, options?.retryInterval);
655+
closeSSEStream = () => {
656+
this.closeSSEStream(message.id);
658657
};
659658
}
660659

@@ -803,22 +802,13 @@ export class StreamableHTTPServerTransport implements Transport {
803802
* Close an SSE stream for a specific request, triggering client reconnection.
804803
* Use this to implement polling behavior during long-running operations -
805804
* client will reconnect after the retry interval specified in the priming event.
806-
*
807-
* @param requestId - The request ID whose stream should be closed
808-
* @param retryInterval - Optional retry interval in milliseconds to send before closing.
809-
* If provided, sends a retry field to override the transport default.
810805
*/
811-
closeSSEStream(requestId: RequestId, retryInterval?: number): void {
806+
closeSSEStream(requestId: RequestId): void {
812807
const streamId = this._requestToStreamMapping.get(requestId);
813808
if (!streamId) return;
814809

815810
const stream = this._streamMapping.get(streamId);
816811
if (stream) {
817-
// If a custom retry interval is provided, send it before closing
818-
// Use single \n (not \n\n) to avoid triggering SSE event dispatch
819-
if (retryInterval !== undefined) {
820-
stream.write(`retry: ${retryInterval}\n`);
821-
}
822812
stream.end();
823813
this._streamMapping.delete(streamId);
824814
}

src/shared/protocol.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { AnySchema, AnyObjectSchema, SchemaOutput, safeParse } from '../server/z
22
import {
33
CancelledNotificationSchema,
44
ClientCapabilities,
5-
CloseSSEStreamOptions,
65
ErrorCode,
76
isJSONRPCError,
87
isJSONRPCRequest,
@@ -160,11 +159,8 @@ export type RequestHandlerExtra<SendRequestT extends Request, SendNotificationT
160159
* Closes the SSE stream for this request, triggering client reconnection.
161160
* Only available when using StreamableHTTPServerTransport with eventStore configured.
162161
* Use this to implement polling behavior during long-running operations.
163-
*
164-
* @param options - Optional configuration for the close operation
165-
* @param options.retryInterval - Retry interval in milliseconds to suggest to clients
166162
*/
167-
closeSSEStream?: (options?: CloseSSEStreamOptions) => void;
163+
closeSSEStream?: () => void;
168164
};
169165

170166
/**

src/types.ts

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1841,17 +1841,6 @@ export interface RequestInfo {
18411841
headers: IsomorphicHeaders;
18421842
}
18431843

1844-
/**
1845-
* Options for closing an SSE stream.
1846-
*/
1847-
export interface CloseSSEStreamOptions {
1848-
/**
1849-
* Retry interval in milliseconds to suggest to clients before closing.
1850-
* When set, sends an SSE retry field to override the transport's default.
1851-
*/
1852-
retryInterval?: number;
1853-
}
1854-
18551844
/**
18561845
* Extra information about a message.
18571846
*/
@@ -1870,7 +1859,7 @@ export interface MessageExtraInfo {
18701859
* Callback to close the SSE stream for this request, triggering client reconnection.
18711860
* Only available when using StreamableHTTPServerTransport with eventStore configured.
18721861
*/
1873-
closeSSEStream?: (options?: CloseSSEStreamOptions) => void;
1862+
closeSSEStream?: () => void;
18741863
}
18751864

18761865
/* JSON-RPC types */

0 commit comments

Comments
 (0)