Skip to content

Commit 1574b90

Browse files
committed
feat: W-18481168 - Build a stdio <--> SSE Proxy for MCP
1 parent 882b002 commit 1574b90

File tree

1 file changed

+217
-0
lines changed

1 file changed

+217
-0
lines changed

src/lib/proxy.ts

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
// Model Context Protocol spec 2025-03-26: stdio <-> Streamable HTTP/SSE proxy
2+
// Requires Node.js 20+ for native fetch and Headers
3+
import {createInterface} from 'readline/promises'
4+
5+
class MCPStdioToSSEProxy {
6+
private remoteUrl: URL;
7+
private sessionId: string | null = null;
8+
private handshakeState: 'awaiting-initialize' | 'awaiting-initialize-response' | 'awaiting-initialized' | 'ready' = 'awaiting-initialize';
9+
private pendingQueue: string[] = [];
10+
private rl = createInterface({input: process.stdin, crlfDelay: Number.POSITIVE_INFINITY});
11+
12+
constructor(remoteUrl: URL) {
13+
this.remoteUrl = remoteUrl
14+
}
15+
16+
private logError(msg: string) {
17+
process.stderr.write(msg + '\n')
18+
}
19+
20+
private isValidJsonRpc(msg: any): boolean {
21+
if (Array.isArray(msg)) {
22+
return msg.every(this.isValidJsonRpc.bind(this))
23+
}
24+
25+
return typeof msg === 'object' && msg !== null && msg.jsonrpc === '2.0'
26+
}
27+
28+
private async postToRemote(message: string, timeoutMs = 60000): Promise<Response> {
29+
const headers = new Headers({
30+
'Content-Type': 'application/json',
31+
Accept: 'application/json, text/event-stream',
32+
})
33+
if (this.sessionId) headers.set('Mcp-Session-Id', this.sessionId)
34+
35+
const controller = new AbortController()
36+
const timeout = setTimeout(() => controller.abort(), timeoutMs)
37+
try {
38+
const res = await fetch(this.remoteUrl, {
39+
method: 'POST',
40+
headers,
41+
body: message,
42+
signal: controller.signal,
43+
})
44+
clearTimeout(timeout)
45+
return res
46+
} catch (error: any) {
47+
clearTimeout(timeout)
48+
throw error
49+
}
50+
}
51+
52+
private async streamSSE(body: any): Promise<void> {
53+
let buffer = ''
54+
for await (const chunk of body) {
55+
buffer += chunk.toString('utf8')
56+
let idx: number
57+
while ((idx = buffer.indexOf('\n\n')) !== -1) {
58+
const eventBlock = buffer.slice(0, idx)
59+
buffer = buffer.slice(idx + 2)
60+
const lines = eventBlock.split('\n')
61+
let data = ''
62+
for (const line of lines) {
63+
if (line.startsWith('data:')) {
64+
data += line.slice(5).trim()
65+
}
66+
}
67+
68+
if (data) {
69+
process.stdout.write(data + '\n')
70+
}
71+
}
72+
}
73+
}
74+
75+
private async handleInitializeRequest(message: string, parsed: any): Promise<void> {
76+
if (!parsed || parsed.method !== 'initialize') {
77+
this.logError('First message must be an initialize request.')
78+
throw new Error('Invalid initialize request')
79+
}
80+
81+
this.handshakeState = 'awaiting-initialize-response'
82+
const res = await this.postToRemote(message)
83+
if (!this.sessionId && res.headers.has('mcp-session-id')) {
84+
this.sessionId = res.headers.get('mcp-session-id')
85+
}
86+
87+
const contentType = res.headers.get('content-type') || ''
88+
if (contentType.startsWith('text/event-stream')) {
89+
await this.streamSSE(res.body)
90+
} else if (res.body) {
91+
const text = await res.text()
92+
if (text.trim()) process.stdout.write(text.trim() + '\n')
93+
}
94+
95+
this.handshakeState = 'awaiting-initialized'
96+
}
97+
98+
private async handleInitializedNotification(message: string, parsed: any): Promise<void> {
99+
if (!parsed || parsed.method !== 'notifications/initialized') {
100+
this.logError('Second message must be notifications/initialized.')
101+
throw new Error('Invalid initialized notification')
102+
}
103+
104+
const res = await this.postToRemote(message)
105+
if (res.status === 404) {
106+
this.sessionId = null
107+
this.handshakeState = 'awaiting-initialize'
108+
this.logError('Session lost after initialized. Restarting handshake.')
109+
return
110+
}
111+
112+
this.handshakeState = 'ready'
113+
// Flush any queued messages
114+
for (const queued of this.pendingQueue) {
115+
await this.forwardMessage(queued)
116+
}
117+
118+
this.pendingQueue = []
119+
}
120+
121+
private async handleHandshake(message: string, parsed: any): Promise<void> {
122+
try {
123+
if (this.handshakeState === 'awaiting-initialize') {
124+
await this.handleInitializeRequest(message, parsed)
125+
} else if (this.handshakeState === 'awaiting-initialized') {
126+
await this.handleInitializedNotification(message, parsed)
127+
}
128+
} catch (error: any) {
129+
this.logError('Handshake error: ' + (error && error.message ? error.message : error))
130+
throw error
131+
}
132+
}
133+
134+
private async forwardMessage(message: string): Promise<void> {
135+
let parsed: unknown
136+
try {
137+
parsed = JSON.parse(message)
138+
} catch (error: any) {
139+
this.logError('Invalid JSON: ' + error.message)
140+
return
141+
}
142+
143+
if (!this.isValidJsonRpc(parsed)) {
144+
this.logError('Not a valid JSON-RPC 2.0 message.')
145+
return
146+
}
147+
148+
// Handshake enforcement
149+
if (this.handshakeState !== 'ready') {
150+
await this.handleHandshake(message, parsed)
151+
return
152+
}
153+
154+
// Normal operation
155+
try {
156+
const res = await this.postToRemote(message)
157+
if (res.status === 404) {
158+
// Session lost, restart handshake
159+
this.sessionId = null
160+
this.handshakeState = 'awaiting-initialize'
161+
this.logError('Session lost (404). Restarting handshake.')
162+
this.pendingQueue = []
163+
return
164+
}
165+
166+
const contentType = res.headers.get('content-type') || ''
167+
if (contentType.startsWith('text/event-stream')) {
168+
await this.streamSSE(res.body)
169+
return
170+
}
171+
172+
if (res.body) {
173+
const text = await res.text()
174+
if (text.trim()) process.stdout.write(text.trim() + '\n')
175+
}
176+
} catch (error: any) {
177+
if (error.name === 'AbortError') {
178+
this.logError('Request timed out.')
179+
} else {
180+
this.logError('Remote error: ' + (error && error.message ? error.message : error))
181+
}
182+
}
183+
}
184+
185+
public async run(): Promise<void> {
186+
for await (const line of this.rl) {
187+
const trimmed = line.trim()
188+
if (!trimmed) continue
189+
if (this.handshakeState === 'ready') {
190+
await this.forwardMessage(trimmed)
191+
} else if (
192+
this.handshakeState === 'awaiting-initialize' ||
193+
this.handshakeState === 'awaiting-initialize-response' ||
194+
this.handshakeState === 'awaiting-initialized'
195+
) {
196+
// During handshake, queue any extra messages
197+
if (this.handshakeState === 'awaiting-initialize') {
198+
await this.forwardMessage(trimmed)
199+
} else {
200+
this.pendingQueue.push(trimmed)
201+
}
202+
}
203+
}
204+
}
205+
206+
static async main(remoteUrl: URL) {
207+
const proxy = new MCPStdioToSSEProxy(remoteUrl)
208+
try {
209+
await proxy.run()
210+
} catch (error: any) {
211+
proxy.logError('Fatal error: ' + (error && error.message ? error.message : error))
212+
throw error
213+
}
214+
}
215+
}
216+
217+
export const main = MCPStdioToSSEProxy.main

0 commit comments

Comments
 (0)