Skip to content

Commit d583151

Browse files
authored
Add streamable HTTP transport for clients (#70)
* Rename file to Transport.swift * Rename file to StdioTransportTests.swift * Add .index-build/ to .gitignore * Implement HTTP client transport * Add test coverage for streamable HTTP client transport * Conditionally import FoundationNetworking * Conditionalize streaming for unsupported platforms * Import FoundationNetworking in tests * Add 5 minute timeout to CI jobs * Remove debug logging * Formatting * Skip SSE tests on platforms that don't support streaming
1 parent 4143f35 commit d583151

File tree

6 files changed

+726
-0
lines changed

6 files changed

+726
-0
lines changed

.github/workflows/ci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ permissions:
1212

1313
jobs:
1414
test:
15+
timeout-minutes: 5
1516
strategy:
1617
matrix:
1718
os: [macos-latest, ubuntu-latest]

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ DerivedData/
66
.swiftpm/configuration/registries.json
77
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
88
.netrc
9+
.index-build/
File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
import Foundation
2+
import Logging
3+
4+
#if canImport(FoundationNetworking)
5+
import FoundationNetworking
6+
#endif
7+
8+
public actor HTTPClientTransport: Actor, Transport {
9+
public let endpoint: URL
10+
private let session: URLSession
11+
public private(set) var sessionID: String?
12+
private let streaming: Bool
13+
private var streamingTask: Task<Void, Never>?
14+
private var lastEventID: String?
15+
public nonisolated let logger: Logger
16+
17+
private var isConnected = false
18+
private let messageStream: AsyncThrowingStream<Data, Swift.Error>
19+
private let messageContinuation: AsyncThrowingStream<Data, Swift.Error>.Continuation
20+
21+
public init(
22+
endpoint: URL,
23+
configuration: URLSessionConfiguration = .default,
24+
streaming: Bool = false,
25+
logger: Logger? = nil
26+
) {
27+
self.init(
28+
endpoint: endpoint,
29+
session: URLSession(configuration: configuration),
30+
streaming: streaming,
31+
logger: logger
32+
)
33+
}
34+
35+
internal init(
36+
endpoint: URL,
37+
session: URLSession,
38+
streaming: Bool = false,
39+
logger: Logger? = nil
40+
) {
41+
self.endpoint = endpoint
42+
self.session = session
43+
self.streaming = streaming
44+
45+
// Create message stream
46+
var continuation: AsyncThrowingStream<Data, Swift.Error>.Continuation!
47+
self.messageStream = AsyncThrowingStream { continuation = $0 }
48+
self.messageContinuation = continuation
49+
50+
self.logger =
51+
logger
52+
?? Logger(
53+
label: "mcp.transport.http.client",
54+
factory: { _ in SwiftLogNoOpLogHandler() }
55+
)
56+
}
57+
58+
/// Establishes connection with the transport
59+
public func connect() async throws {
60+
guard !isConnected else { return }
61+
isConnected = true
62+
63+
if streaming {
64+
// Start listening to server events
65+
streamingTask = Task { await startListeningForServerEvents() }
66+
}
67+
68+
logger.info("HTTP transport connected")
69+
}
70+
71+
/// Disconnects from the transport
72+
public func disconnect() async {
73+
guard isConnected else { return }
74+
isConnected = false
75+
76+
// Cancel streaming task if active
77+
streamingTask?.cancel()
78+
streamingTask = nil
79+
80+
// Cancel any in-progress requests
81+
session.invalidateAndCancel()
82+
83+
// Clean up message stream
84+
messageContinuation.finish()
85+
86+
logger.info("HTTP clienttransport disconnected")
87+
}
88+
89+
/// Sends data through an HTTP POST request
90+
public func send(_ data: Data) async throws {
91+
guard isConnected else {
92+
throw MCPError.internalError("Transport not connected")
93+
}
94+
95+
var request = URLRequest(url: endpoint)
96+
request.httpMethod = "POST"
97+
request.addValue("application/json, text/event-stream", forHTTPHeaderField: "Accept")
98+
request.addValue("application/json", forHTTPHeaderField: "Content-Type")
99+
request.httpBody = data
100+
101+
// Add session ID if available
102+
if let sessionID = sessionID {
103+
request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
104+
}
105+
106+
let (responseData, response) = try await session.data(for: request)
107+
108+
guard let httpResponse = response as? HTTPURLResponse else {
109+
throw MCPError.internalError("Invalid HTTP response")
110+
}
111+
112+
// Process the response based on content type and status code
113+
let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? ""
114+
115+
// Extract session ID if present
116+
if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
117+
self.sessionID = newSessionID
118+
logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"])
119+
}
120+
121+
// Handle different response types
122+
switch httpResponse.statusCode {
123+
case 200, 201, 202:
124+
// For SSE, the processing happens in the streaming task
125+
if contentType.contains("text/event-stream") {
126+
logger.debug("Received SSE response, processing in streaming task")
127+
// The streaming is handled by the SSE task if active
128+
return
129+
}
130+
131+
// For JSON responses, deliver the data directly
132+
if contentType.contains("application/json") && !responseData.isEmpty {
133+
logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"])
134+
messageContinuation.yield(responseData)
135+
}
136+
case 404:
137+
// If we get a 404 with a session ID, it means our session is invalid
138+
if sessionID != nil {
139+
logger.warning("Session has expired")
140+
sessionID = nil
141+
throw MCPError.internalError("Session expired")
142+
}
143+
throw MCPError.internalError("Endpoint not found")
144+
default:
145+
throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)")
146+
}
147+
}
148+
149+
/// Receives data in an async sequence
150+
public func receive() -> AsyncThrowingStream<Data, Swift.Error> {
151+
return AsyncThrowingStream { continuation in
152+
Task {
153+
for try await message in messageStream {
154+
continuation.yield(message)
155+
}
156+
continuation.finish()
157+
}
158+
}
159+
}
160+
161+
// MARK: - SSE
162+
163+
/// Starts listening for server events using SSE
164+
private func startListeningForServerEvents() async {
165+
guard isConnected else { return }
166+
167+
// Retry loop for connection drops
168+
while isConnected && !Task.isCancelled {
169+
do {
170+
try await connectToEventStream()
171+
} catch {
172+
if !Task.isCancelled {
173+
logger.error("SSE connection error: \(error)")
174+
// Wait before retrying
175+
try? await Task.sleep(nanoseconds: 1_000_000_000) // 1 second
176+
}
177+
}
178+
}
179+
}
180+
181+
#if canImport(FoundationNetworking)
182+
private func connectToEventStream() async throws {
183+
logger.warning("SSE is not supported on this platform")
184+
}
185+
#else
186+
/// Establishes an SSE connection to the server
187+
private func connectToEventStream() async throws {
188+
guard isConnected else { return }
189+
190+
var request = URLRequest(url: endpoint)
191+
request.httpMethod = "GET"
192+
request.addValue("text/event-stream", forHTTPHeaderField: "Accept")
193+
194+
// Add session ID if available
195+
if let sessionID = sessionID {
196+
request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
197+
}
198+
199+
// Add Last-Event-ID header for resumability if available
200+
if let lastEventID = lastEventID {
201+
request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID")
202+
}
203+
204+
logger.debug("Starting SSE connection")
205+
206+
// Create URLSession task for SSE
207+
let (stream, response) = try await session.bytes(for: request)
208+
209+
guard let httpResponse = response as? HTTPURLResponse else {
210+
throw MCPError.internalError("Invalid HTTP response")
211+
}
212+
213+
// Check response status
214+
guard httpResponse.statusCode == 200 else {
215+
throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)")
216+
}
217+
218+
// Extract session ID if present
219+
if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
220+
self.sessionID = newSessionID
221+
}
222+
223+
// Process the SSE stream
224+
var buffer = ""
225+
var eventType = ""
226+
var eventID: String?
227+
var eventData = ""
228+
229+
for try await byte in stream {
230+
if Task.isCancelled { break }
231+
232+
guard let char = String(bytes: [byte], encoding: .utf8) else { continue }
233+
buffer.append(char)
234+
235+
// Process complete lines
236+
while let newlineIndex = buffer.firstIndex(of: "\n") {
237+
let line = buffer[..<newlineIndex]
238+
buffer = String(buffer[buffer.index(after: newlineIndex)...])
239+
240+
// Empty line marks the end of an event
241+
if line.isEmpty || line == "\r" || line == "\n" || line == "\r\n" {
242+
if !eventData.isEmpty {
243+
// Process the event
244+
if eventType == "id" {
245+
lastEventID = eventID
246+
} else {
247+
// Default event type is "message" if not specified
248+
if let data = eventData.data(using: .utf8) {
249+
logger.debug(
250+
"SSE event received",
251+
metadata: [
252+
"type": "\(eventType.isEmpty ? "message" : eventType)",
253+
"id": "\(eventID ?? "none")",
254+
])
255+
messageContinuation.yield(data)
256+
}
257+
}
258+
259+
// Reset for next event
260+
eventType = ""
261+
eventData = ""
262+
}
263+
continue
264+
}
265+
266+
// Lines starting with ":" are comments
267+
if line.hasPrefix(":") { continue }
268+
269+
// Parse field: value format
270+
if let colonIndex = line.firstIndex(of: ":") {
271+
let field = String(line[..<colonIndex])
272+
var value = String(line[line.index(after: colonIndex)...])
273+
274+
// Trim leading space
275+
if value.hasPrefix(" ") {
276+
value = String(value.dropFirst())
277+
}
278+
279+
// Process based on field
280+
switch field {
281+
case "event":
282+
eventType = value
283+
case "data":
284+
if !eventData.isEmpty {
285+
eventData.append("\n")
286+
}
287+
eventData.append(value)
288+
case "id":
289+
if !value.contains("\0") { // ID must not contain NULL
290+
eventID = value
291+
lastEventID = value
292+
}
293+
case "retry":
294+
// Retry timing not implemented
295+
break
296+
default:
297+
// Unknown fields are ignored per SSE spec
298+
break
299+
}
300+
}
301+
}
302+
}
303+
}
304+
#endif
305+
}

0 commit comments

Comments
 (0)