Skip to content

Commit 9f8cf8d

Browse files
[test] mcp in web package test
1 parent ee1fca4 commit 9f8cf8d

File tree

10 files changed

+630
-17
lines changed

10 files changed

+630
-17
lines changed

packages/web/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"@hookform/resolvers": "^3.9.0",
4343
"@iconify/react": "^5.1.0",
4444
"@iizukak/codemirror-lang-wgsl": "^0.3.0",
45+
"@modelcontextprotocol/sdk": "^1.10.2",
4546
"@radix-ui/react-alert-dialog": "^1.1.5",
4647
"@radix-ui/react-avatar": "^1.1.2",
4748
"@radix-ui/react-dialog": "^1.1.4",
@@ -128,6 +129,7 @@
128129
"react-hotkeys-hook": "^4.5.1",
129130
"react-icons": "^5.3.0",
130131
"react-resizable-panels": "^2.1.1",
132+
"redis": "^4.7.0",
131133
"server-only": "^0.0.1",
132134
"sharp": "^0.33.5",
133135
"strip-json-comments": "^5.0.1",
Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
2+
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
3+
import { IncomingHttpHeaders, IncomingMessage, ServerResponse } from "http";
4+
import { createClient } from "redis";
5+
import { Socket } from "net";
6+
import { Readable } from "stream";
7+
import { ServerOptions } from "@modelcontextprotocol/sdk/server/index.js";
8+
import { maxDuration } from "./sse/route";
9+
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
10+
import { env } from "@/env.mjs";
11+
12+
interface SerializedRequest {
13+
requestId: string;
14+
url: string;
15+
method: string;
16+
body: string;
17+
headers: IncomingHttpHeaders;
18+
}
19+
20+
export function initializeMcpApiHandler(
21+
initializeServer: (server: McpServer) => void,
22+
serverOptions: ServerOptions = {}
23+
) {
24+
const redis = createClient({
25+
url: env.REDIS_URL,
26+
});
27+
const redisPublisher = createClient({
28+
url: env.REDIS_URL,
29+
});
30+
redis.on("error", (err) => {
31+
console.error("Redis error", err);
32+
});
33+
redisPublisher.on("error", (err) => {
34+
console.error("Redis error", err);
35+
});
36+
const redisPromise = Promise.all([redis.connect(), redisPublisher.connect()]);
37+
38+
let servers: McpServer[] = [];
39+
40+
let statelessServer: McpServer;
41+
const statelessTransport = new StreamableHTTPServerTransport({
42+
sessionIdGenerator: undefined,
43+
});
44+
return async function mcpApiHandler(req: Request, res: ServerResponse) {
45+
await redisPromise;
46+
const url = new URL(req.url || "", "https://example.com");
47+
if (url.pathname === "/mcp") {
48+
if (req.method === "GET") {
49+
console.log("Received GET MCP request");
50+
res.writeHead(405).end(
51+
JSON.stringify({
52+
jsonrpc: "2.0",
53+
error: {
54+
code: -32000,
55+
message: "Method not allowed.",
56+
},
57+
id: null,
58+
})
59+
);
60+
return;
61+
}
62+
if (req.method === "DELETE") {
63+
console.log("Received DELETE MCP request");
64+
res.writeHead(405).end(
65+
JSON.stringify({
66+
jsonrpc: "2.0",
67+
error: {
68+
code: -32000,
69+
message: "Method not allowed.",
70+
},
71+
id: null,
72+
})
73+
);
74+
return;
75+
}
76+
console.log("Got new MCP connection", req.url, req.method);
77+
78+
if (!statelessServer) {
79+
statelessServer = new McpServer(
80+
{
81+
name: "mcp-typescript server on vercel",
82+
version: "0.1.0",
83+
},
84+
serverOptions
85+
);
86+
87+
initializeServer(statelessServer);
88+
await statelessServer.connect(statelessTransport);
89+
}
90+
91+
// Parse the request body
92+
let bodyContent;
93+
if (req.method === "POST") {
94+
const contentType = req.headers.get("content-type") || "";
95+
if (contentType.includes("application/json")) {
96+
bodyContent = await req.json();
97+
} else {
98+
bodyContent = await req.text();
99+
}
100+
}
101+
102+
const incomingRequest = createFakeIncomingMessage({
103+
method: req.method,
104+
url: req.url,
105+
headers: Object.fromEntries(req.headers),
106+
body: bodyContent,
107+
});
108+
await statelessTransport.handleRequest(incomingRequest, res);
109+
} else if (url.pathname === "/sse") {
110+
console.log("Got new SSE connection");
111+
112+
const transport = new SSEServerTransport("/message", res);
113+
const sessionId = transport.sessionId;
114+
const server = new McpServer(
115+
{
116+
name: "mcp-typescript server on vercel",
117+
version: "0.1.0",
118+
},
119+
serverOptions
120+
);
121+
initializeServer(server);
122+
123+
servers.push(server);
124+
125+
server.server.onclose = () => {
126+
console.log("SSE connection closed");
127+
servers = servers.filter((s) => s !== server);
128+
};
129+
130+
let logs: {
131+
type: "log" | "error";
132+
messages: string[];
133+
}[] = [];
134+
// This ensures that we logs in the context of the right invocation since the subscriber
135+
// is not itself invoked in request context.
136+
function logInContext(severity: "log" | "error", ...messages: string[]) {
137+
logs.push({
138+
type: severity,
139+
messages,
140+
});
141+
}
142+
143+
// Handles messages originally received via /message
144+
const handleMessage = async (message: string) => {
145+
console.log("Received message from Redis", message);
146+
logInContext("log", "Received message from Redis", message);
147+
const request = JSON.parse(message) as SerializedRequest;
148+
149+
// Make in IncomingMessage object because that is what the SDK expects.
150+
const req = createFakeIncomingMessage({
151+
method: request.method,
152+
url: request.url,
153+
headers: request.headers,
154+
body: request.body, // This could already be an object from earlier parsing
155+
});
156+
const syntheticRes = new ServerResponse(req);
157+
let status = 100;
158+
let body = "";
159+
syntheticRes.writeHead = (statusCode: number) => {
160+
status = statusCode;
161+
return syntheticRes;
162+
};
163+
syntheticRes.end = (b: unknown) => {
164+
body = b as string;
165+
return syntheticRes;
166+
};
167+
await transport.handlePostMessage(req, syntheticRes);
168+
169+
await redisPublisher.publish(
170+
`responses:${sessionId}:${request.requestId}`,
171+
JSON.stringify({
172+
status,
173+
body,
174+
})
175+
);
176+
177+
if (status >= 200 && status < 300) {
178+
logInContext(
179+
"log",
180+
`Request ${sessionId}:${request.requestId} succeeded: ${body}`
181+
);
182+
} else {
183+
logInContext(
184+
"error",
185+
`Message for ${sessionId}:${request.requestId} failed with status ${status}: ${body}`
186+
);
187+
}
188+
};
189+
190+
const interval = setInterval(() => {
191+
for (const log of logs) {
192+
console[log.type].call(console, ...log.messages);
193+
}
194+
logs = [];
195+
}, 100);
196+
197+
await redis.subscribe(`requests:${sessionId}`, handleMessage);
198+
console.log(`Subscribed to requests:${sessionId}`);
199+
200+
let timeout: NodeJS.Timeout;
201+
let resolveTimeout: (value: unknown) => void;
202+
const waitPromise = new Promise((resolve) => {
203+
resolveTimeout = resolve;
204+
timeout = setTimeout(() => {
205+
resolve("max duration reached");
206+
}, (maxDuration - 5) * 1000);
207+
});
208+
209+
async function cleanup() {
210+
clearTimeout(timeout);
211+
clearInterval(interval);
212+
await redis.unsubscribe(`requests:${sessionId}`, handleMessage);
213+
console.log("Done");
214+
res.statusCode = 200;
215+
res.end();
216+
}
217+
req.signal.addEventListener("abort", () =>
218+
resolveTimeout("client hang up")
219+
);
220+
221+
await server.connect(transport);
222+
const closeReason = await waitPromise;
223+
console.log(closeReason);
224+
await cleanup();
225+
} else if (url.pathname === "/message") {
226+
console.log("Received message");
227+
228+
const body = await req.text();
229+
let parsedBody;
230+
try {
231+
parsedBody = JSON.parse(body);
232+
} catch (e) {
233+
parsedBody = body;
234+
}
235+
236+
const sessionId = url.searchParams.get("sessionId") || "";
237+
if (!sessionId) {
238+
res.statusCode = 400;
239+
res.end("No sessionId provided");
240+
return;
241+
}
242+
const requestId = crypto.randomUUID();
243+
const serializedRequest: SerializedRequest = {
244+
requestId,
245+
url: req.url || "",
246+
method: req.method || "",
247+
body: parsedBody,
248+
headers: Object.fromEntries(req.headers.entries()),
249+
};
250+
251+
// Handles responses from the /sse endpoint.
252+
await redis.subscribe(
253+
`responses:${sessionId}:${requestId}`,
254+
(message) => {
255+
clearTimeout(timeout);
256+
const response = JSON.parse(message) as {
257+
status: number;
258+
body: string;
259+
};
260+
res.statusCode = response.status;
261+
res.end(response.body);
262+
}
263+
);
264+
265+
// Queue the request in Redis so that a subscriber can pick it up.
266+
// One queue per session.
267+
await redisPublisher.publish(
268+
`requests:${sessionId}`,
269+
JSON.stringify(serializedRequest)
270+
);
271+
console.log(`Published requests:${sessionId}`, serializedRequest);
272+
273+
let timeout = setTimeout(async () => {
274+
await redis.unsubscribe(`responses:${sessionId}:${requestId}`);
275+
res.statusCode = 408;
276+
res.end("Request timed out");
277+
}, 10 * 1000);
278+
279+
res.on("close", async () => {
280+
clearTimeout(timeout);
281+
await redis.unsubscribe(`responses:${sessionId}:${requestId}`);
282+
});
283+
} else {
284+
res.statusCode = 404;
285+
res.end("Not found");
286+
}
287+
};
288+
}
289+
290+
// Define the options interface
291+
interface FakeIncomingMessageOptions {
292+
method?: string;
293+
url?: string;
294+
headers?: IncomingHttpHeaders;
295+
body?: string | Buffer | Record<string, any> | null;
296+
socket?: Socket;
297+
}
298+
299+
// Create a fake IncomingMessage
300+
function createFakeIncomingMessage(
301+
options: FakeIncomingMessageOptions = {}
302+
): IncomingMessage {
303+
const {
304+
method = "GET",
305+
url = "/",
306+
headers = {},
307+
body = null,
308+
socket = new Socket(),
309+
} = options;
310+
311+
// Create a readable stream that will be used as the base for IncomingMessage
312+
const readable = new Readable();
313+
readable._read = (): void => { }; // Required implementation
314+
315+
// Add the body content if provided
316+
if (body) {
317+
if (typeof body === "string") {
318+
readable.push(body);
319+
} else if (Buffer.isBuffer(body)) {
320+
readable.push(body);
321+
} else {
322+
// Ensure proper JSON-RPC format
323+
const bodyString = JSON.stringify(body);
324+
readable.push(bodyString);
325+
}
326+
readable.push(null); // Signal the end of the stream
327+
} else {
328+
readable.push(null); // Always end the stream even if no body
329+
}
330+
331+
// Create the IncomingMessage instance
332+
const req = new IncomingMessage(socket);
333+
334+
// Set the properties
335+
req.method = method;
336+
req.url = url;
337+
req.headers = headers;
338+
339+
// Copy over the stream methods
340+
req.push = readable.push.bind(readable);
341+
req.read = readable.read.bind(readable);
342+
// @ts-expect-error
343+
req.on = readable.on.bind(readable);
344+
req.pipe = readable.pipe.bind(readable);
345+
346+
return req;
347+
}

0 commit comments

Comments
 (0)