Skip to content

Commit 1d18a57

Browse files
committed
extract standalone get into a method which can be called by a client
1 parent 15c10a5 commit 1d18a57

File tree

2 files changed

+84
-66
lines changed

2 files changed

+84
-66
lines changed

src/client/streamableHttp.test.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,10 @@ describe("StreamableHTTPClientTransport", () => {
161161
statusText: "Method Not Allowed"
162162
});
163163

164+
// We expect the 405 error to be caught and handled gracefully
165+
// This should not throw an error that breaks the transport
164166
await transport.start();
167+
await expect(transport.openSseStream()).rejects.toThrow('Failed to open SSE stream: Method Not Allowed');
165168

166169
// Check that GET was attempted
167170
expect(global.fetch).toHaveBeenCalledWith(
@@ -206,6 +209,7 @@ describe("StreamableHTTPClientTransport", () => {
206209
transport.onmessage = messageSpy;
207210

208211
await transport.start();
212+
await transport.openSseStream();
209213

210214
// Give time for the SSE event to be processed
211215
await new Promise(resolve => setTimeout(resolve, 50));
@@ -233,7 +237,7 @@ describe("StreamableHTTPClientTransport", () => {
233237

234238
(global.fetch as jest.Mock)
235239
.mockResolvedValueOnce({
236-
ok: true,
240+
ok: true,
237241
status: 200,
238242
headers: new Headers({ "content-type": "text/event-stream" }),
239243
body: makeStream("request1")
@@ -255,16 +259,21 @@ describe("StreamableHTTPClientTransport", () => {
255259
]);
256260

257261
// Give time for SSE processing
258-
await new Promise(resolve => setTimeout(resolve, 50));
262+
await new Promise(resolve => setTimeout(resolve, 100));
259263

260264
// Both streams should have delivered their messages
261265
expect(messageSpy).toHaveBeenCalledTimes(2);
262-
expect(messageSpy).toHaveBeenCalledWith(
263-
expect.objectContaining({ result: { id: "request1" }, id: "request1" })
264-
);
265-
expect(messageSpy).toHaveBeenCalledWith(
266-
expect.objectContaining({ result: { id: "request2" }, id: "request2" })
267-
);
266+
267+
// Verify received messages without assuming specific order
268+
expect(messageSpy.mock.calls.some(call => {
269+
const msg = call[0];
270+
return msg.id === "request1" && msg.result?.id === "request1";
271+
})).toBe(true);
272+
273+
expect(messageSpy.mock.calls.some(call => {
274+
const msg = call[0];
275+
return msg.id === "request2" && msg.result?.id === "request2";
276+
})).toBe(true);
268277
});
269278

270279
it("should include last-event-id header when resuming a broken connection", async () => {
@@ -286,6 +295,7 @@ describe("StreamableHTTPClientTransport", () => {
286295
});
287296

288297
await transport.start();
298+
await transport.openSseStream();
289299
await new Promise(resolve => setTimeout(resolve, 50));
290300

291301
// Now simulate attempting to reconnect
@@ -296,7 +306,7 @@ describe("StreamableHTTPClientTransport", () => {
296306
body: null
297307
});
298308

299-
await transport.start();
309+
await transport.openSseStream();
300310

301311
// Check that Last-Event-ID was included
302312
const calls = (global.fetch as jest.Mock).mock.calls;

src/client/streamableHttp.ts

Lines changed: 65 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import { Transport } from "../shared/transport.js";
22
import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
33
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
4-
import { type ErrorEvent } from "eventsource";
54
import { EventSourceMessage, EventSourceParserStream } from 'eventsource-parser/stream';
65
export class StreamableHTTPError extends Error {
76
constructor(
87
public readonly code: number | undefined,
98
message: string | undefined,
10-
public readonly event: ErrorEvent,
119
) {
1210
super(`Streamable HTTP error: ${message}`);
1311
}
@@ -83,7 +81,7 @@ export class StreamableHTTPClientTransport implements Transport {
8381
throw new UnauthorizedError();
8482
}
8583

86-
return await this._startOrAuth();
84+
return await this._startOrAuthStandaloneSSE();
8785
}
8886

8987
private async _commonHeaders(): Promise<HeadersInit> {
@@ -102,7 +100,7 @@ export class StreamableHTTPClientTransport implements Transport {
102100
return headers;
103101
}
104102

105-
private async _startOrAuth(): Promise<void> {
103+
private async _startOrAuthStandaloneSSE(): Promise<void> {
106104
try {
107105
// Try to open an initial SSE stream with GET to listen for server messages
108106
// This is optional according to the spec - server may not support it
@@ -121,32 +119,76 @@ export class StreamableHTTPClientTransport implements Transport {
121119
signal: this._abortController?.signal,
122120
});
123121

124-
if (response.status === 405 || response.status === 404) {
125-
// Server doesn't support GET for SSE, which is allowed by the spec
126-
// We'll rely on SSE responses to POST requests for communication
127-
return;
128-
}
129-
130122
if (!response.ok) {
131123
if (response.status === 401 && this._authProvider) {
132124
// Need to authenticate
133125
return await this._authThenStart();
134126
}
135127

136-
const error = new Error(`Failed to open SSE stream: ${response.status} ${response.statusText}`);
128+
const error = new StreamableHTTPError(
129+
response.status,
130+
`Failed to open SSE stream: ${response.statusText}`,
131+
);
137132
this.onerror?.(error);
138133
throw error;
139134
}
140135

141136
// Successful connection, handle the SSE stream as a standalone listener
142-
const streamId = `initial-${Date.now()}`;
137+
const streamId = `standalone-sse-${Date.now()}`;
143138
this._handleSseStream(response.body, streamId);
144139
} catch (error) {
145140
this.onerror?.(error as Error);
146141
throw error;
147142
}
148143
}
149144

145+
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, streamId: string): void {
146+
if (!stream) {
147+
return;
148+
}
149+
150+
// Create a pipeline: binary stream -> text decoder -> SSE parser
151+
const eventStream = stream
152+
.pipeThrough(new TextDecoderStream())
153+
.pipeThrough(new EventSourceParserStream());
154+
155+
const reader = eventStream.getReader();
156+
this._activeStreams.set(streamId, reader);
157+
158+
const processStream = async () => {
159+
try {
160+
while (true) {
161+
const { done, value: event } = await reader.read();
162+
if (done) {
163+
this._activeStreams.delete(streamId);
164+
break;
165+
}
166+
167+
// Update last event ID if provided
168+
if (event.id) {
169+
this._lastEventId = event.id;
170+
}
171+
172+
// Handle message events (default event type is undefined per docs)
173+
// or explicit 'message' event type
174+
if (!event.event || event.event === 'message') {
175+
try {
176+
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
177+
this.onmessage?.(message);
178+
} catch (error) {
179+
this.onerror?.(error as Error);
180+
}
181+
}
182+
}
183+
} catch (error) {
184+
this._activeStreams.delete(streamId);
185+
this.onerror?.(error as Error);
186+
}
187+
};
188+
189+
processStream();
190+
}
191+
150192
async start() {
151193
if (this._activeStreams.size > 0) {
152194
throw new Error(
@@ -155,7 +197,6 @@ export class StreamableHTTPClientTransport implements Transport {
155197
}
156198

157199
this._abortController = new AbortController();
158-
return await this._startOrAuth();
159200
}
160201

161202
/**
@@ -271,50 +312,17 @@ export class StreamableHTTPClientTransport implements Transport {
271312
}
272313
}
273314

274-
private _handleSseStream(stream: ReadableStream<Uint8Array> | null, streamId: string): void {
275-
if (!stream) {
276-
return;
315+
/**
316+
* Opens SSE stream to receive messages from the server.
317+
*
318+
* This allows the server to push messages to the client without requiring the client
319+
* to first send a request via HTTP POST. Some servers may not support this feature.
320+
* If authentication is required but fails, this method will throw an UnauthorizedError.
321+
*/
322+
async openSseStream(): Promise<void> {
323+
if (!this._abortController) {
324+
this._abortController = new AbortController();
277325
}
278-
279-
// Create a pipeline: binary stream -> text decoder -> SSE parser
280-
const eventStream = stream
281-
.pipeThrough(new TextDecoderStream())
282-
.pipeThrough(new EventSourceParserStream());
283-
284-
const reader = eventStream.getReader();
285-
this._activeStreams.set(streamId, reader);
286-
287-
const processStream = async () => {
288-
try {
289-
while (true) {
290-
const { done, value: event } = await reader.read();
291-
if (done) {
292-
this._activeStreams.delete(streamId);
293-
break;
294-
}
295-
296-
// Update last event ID if provided
297-
if (event.id) {
298-
this._lastEventId = event.id;
299-
}
300-
301-
// Handle message events (default event type is undefined per docs)
302-
// or explicit 'message' event type
303-
if (!event.event || event.event === 'message') {
304-
try {
305-
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
306-
this.onmessage?.(message);
307-
} catch (error) {
308-
this.onerror?.(error as Error);
309-
}
310-
}
311-
}
312-
} catch (error) {
313-
this._activeStreams.delete(streamId);
314-
this.onerror?.(error as Error);
315-
}
316-
};
317-
318-
processStream();
326+
await this._startOrAuthStandaloneSSE();
319327
}
320328
}

0 commit comments

Comments
 (0)