|
| 1 | +From 860ce1e0f2fb76266c27afc1708ae01637a88192 Mon Sep 17 00:00:00 2001 |
| 2 | +From: Andrew Druk <adruk@readdle.com> |
| 3 | +Date: Mon, 4 Aug 2025 14:53:38 +0300 |
| 4 | +Subject: [PATCH] Fix WebSocket buffered read Add support for fragmented |
| 5 | + messages |
| 6 | + |
| 7 | +Buffered socket reads could result in incomplete frame parsing due to incorrect assumptions about TCP delivery. This patch introduces proper accumulation of partial reads. |
| 8 | + |
| 9 | +Also adds handling for fragmented WebSocket messages split across multiple frames. |
| 10 | + |
| 11 | +(cherry picked from commit 940dd090506142e4198a11f24fe883bbe26166f0) |
| 12 | +--- |
| 13 | + .../WebSocket/WebSocketURLProtocol.swift | 25 +++- |
| 14 | + .../URLSession/libcurl/EasyHandle.swift | 4 +- |
| 15 | + Tests/Foundation/HTTPServer.swift | 99 ++++++++++++++-- |
| 16 | + Tests/Foundation/TestURLSession.swift | 107 +++++++++--------- |
| 17 | + 4 files changed, 169 insertions(+), 66 deletions(-) |
| 18 | + |
| 19 | +diff --git a/Sources/FoundationNetworking/URLSession/WebSocket/WebSocketURLProtocol.swift b/Sources/FoundationNetworking/URLSession/WebSocket/WebSocketURLProtocol.swift |
| 20 | +index 8216f23d..e35612dd 100644 |
| 21 | +--- a/Sources/FoundationNetworking/URLSession/WebSocket/WebSocketURLProtocol.swift |
| 22 | ++++ b/Sources/FoundationNetworking/URLSession/WebSocket/WebSocketURLProtocol.swift |
| 23 | +@@ -17,6 +17,9 @@ import Foundation |
| 24 | + import Dispatch |
| 25 | + |
| 26 | + internal class _WebSocketURLProtocol: _HTTPURLProtocol { |
| 27 | ++ |
| 28 | ++ private var messageData = Data() |
| 29 | ++ |
| 30 | + public required init(task: URLSessionTask, cachedResponse: CachedURLResponse?, client: URLProtocolClient?) { |
| 31 | + super.init(task: task, cachedResponse: nil, client: client) |
| 32 | + } |
| 33 | +@@ -118,14 +121,14 @@ internal class _WebSocketURLProtocol: _HTTPURLProtocol { |
| 34 | + lastRedirectBody = redirectBody |
| 35 | + } |
| 36 | + |
| 37 | +- let flags = easyHandle.getWebSocketFlags() |
| 38 | ++ let (offset, bytesLeft, flags) = easyHandle.getWebSocketMeta() |
| 39 | + |
| 40 | +- notifyTask(aboutReceivedData: data, flags: flags) |
| 41 | ++ notifyTask(aboutReceivedData: data, offset: offset, bytesLeft: bytesLeft, flags: flags) |
| 42 | + internalState = .transferInProgress(ts) |
| 43 | + return .proceed |
| 44 | + } |
| 45 | + |
| 46 | +- fileprivate func notifyTask(aboutReceivedData data: Data, flags: _EasyHandle.WebSocketFlags) { |
| 47 | ++ fileprivate func notifyTask(aboutReceivedData data: Data, offset: Int64, bytesLeft: Int64, flags: _EasyHandle.WebSocketFlags) { |
| 48 | + guard let t = self.task else { |
| 49 | + fatalError("Cannot notify") |
| 50 | + } |
| 51 | +@@ -159,10 +162,21 @@ internal class _WebSocketURLProtocol: _HTTPURLProtocol { |
| 52 | + } else if flags.contains(.pong) { |
| 53 | + task.noteReceivedPong() |
| 54 | + } else if flags.contains(.binary) { |
| 55 | +- let message = URLSessionWebSocketTask.Message.data(data) |
| 56 | ++ if bytesLeft > 0 || flags.contains(.cont) { |
| 57 | ++ messageData.append(data) |
| 58 | ++ return |
| 59 | ++ } |
| 60 | ++ messageData.append(data) |
| 61 | ++ let message = URLSessionWebSocketTask.Message.data(messageData) |
| 62 | + task.appendReceivedMessage(message) |
| 63 | ++ messageData = Data() // Reset for the next message |
| 64 | + } else if flags.contains(.text) { |
| 65 | +- guard let utf8 = String(data: data, encoding: .utf8) else { |
| 66 | ++ if bytesLeft > 0 || flags.contains(.cont) { |
| 67 | ++ messageData.append(data) |
| 68 | ++ return |
| 69 | ++ } |
| 70 | ++ messageData.append(data) |
| 71 | ++ guard let utf8 = String(data: messageData, encoding: .utf8) else { |
| 72 | + NSLog("Invalid utf8 message received from server \(data)") |
| 73 | + let error = NSError(domain: NSURLErrorDomain, code: NSURLErrorBadServerResponse, |
| 74 | + userInfo: [ |
| 75 | +@@ -175,6 +189,7 @@ internal class _WebSocketURLProtocol: _HTTPURLProtocol { |
| 76 | + } |
| 77 | + let message = URLSessionWebSocketTask.Message.string(utf8) |
| 78 | + task.appendReceivedMessage(message) |
| 79 | ++ messageData = Data() // Reset for the next message |
| 80 | + } else { |
| 81 | + NSLog("Unexpected message received from server \(data) \(flags)") |
| 82 | + let error = NSError(domain: NSURLErrorDomain, code: NSURLErrorBadServerResponse, |
| 83 | +diff --git a/Sources/FoundationNetworking/URLSession/libcurl/EasyHandle.swift b/Sources/FoundationNetworking/URLSession/libcurl/EasyHandle.swift |
| 84 | +index cfb9a9e5..c2ae72e6 100644 |
| 85 | +--- a/Sources/FoundationNetworking/URLSession/libcurl/EasyHandle.swift |
| 86 | ++++ b/Sources/FoundationNetworking/URLSession/libcurl/EasyHandle.swift |
| 87 | +@@ -402,10 +402,10 @@ extension _EasyHandle { |
| 88 | + } |
| 89 | + |
| 90 | + // Only valid to call within a didReceive(data:size:nmemb:) call |
| 91 | +- func getWebSocketFlags() -> WebSocketFlags { |
| 92 | ++ func getWebSocketMeta() -> (Int64, Int64, WebSocketFlags) { |
| 93 | + let metadataPointer = CFURLSessionEasyHandleWebSocketsMetadata(rawHandle) |
| 94 | + let flags = WebSocketFlags(rawValue: metadataPointer.pointee.flags) |
| 95 | +- return flags |
| 96 | ++ return (metadataPointer.pointee.offset, metadataPointer.pointee.bytesLeft, flags) |
| 97 | + } |
| 98 | + |
| 99 | + func receiveWebSocketsData() throws -> (Data, WebSocketFlags) { |
| 100 | +diff --git a/Tests/Foundation/HTTPServer.swift b/Tests/Foundation/HTTPServer.swift |
| 101 | +index ba5782b9..a4a4a38b 100644 |
| 102 | +--- a/Tests/Foundation/HTTPServer.swift |
| 103 | ++++ b/Tests/Foundation/HTTPServer.swift |
| 104 | +@@ -919,6 +919,8 @@ public class TestURLSessionServer: CustomStringConvertible { |
| 105 | + "Connection: Upgrade"] |
| 106 | + |
| 107 | + let expectFullRequestResponseTests: Bool |
| 108 | ++ let bufferedSendingTests: Bool |
| 109 | ++ let fragmentedTests: Bool |
| 110 | + let sendClosePacket: Bool |
| 111 | + let completeUpgrade: Bool |
| 112 | + |
| 113 | +@@ -926,14 +928,32 @@ public class TestURLSessionServer: CustomStringConvertible { |
| 114 | + switch uri { |
| 115 | + case "/web-socket": |
| 116 | + expectFullRequestResponseTests = true |
| 117 | ++ bufferedSendingTests = false |
| 118 | ++ fragmentedTests = false |
| 119 | ++ completeUpgrade = true |
| 120 | ++ sendClosePacket = true |
| 121 | ++ case "/web-socket/buffered-sending": |
| 122 | ++ expectFullRequestResponseTests = true |
| 123 | ++ bufferedSendingTests = true |
| 124 | ++ fragmentedTests = false |
| 125 | ++ completeUpgrade = true |
| 126 | ++ sendClosePacket = true |
| 127 | ++ case "/web-socket/fragmented": |
| 128 | ++ expectFullRequestResponseTests = true |
| 129 | ++ bufferedSendingTests = false |
| 130 | ++ fragmentedTests = true |
| 131 | + completeUpgrade = true |
| 132 | + sendClosePacket = true |
| 133 | + case "/web-socket/semi-abrupt-close": |
| 134 | + expectFullRequestResponseTests = false |
| 135 | ++ bufferedSendingTests = false |
| 136 | ++ fragmentedTests = false |
| 137 | + completeUpgrade = true |
| 138 | + sendClosePacket = false |
| 139 | + case "/web-socket/abrupt-close": |
| 140 | + expectFullRequestResponseTests = false |
| 141 | ++ bufferedSendingTests = false |
| 142 | ++ fragmentedTests = false |
| 143 | + completeUpgrade = false |
| 144 | + sendClosePacket = false |
| 145 | + default: |
| 146 | +@@ -949,6 +969,8 @@ public class TestURLSessionServer: CustomStringConvertible { |
| 147 | + } |
| 148 | + responseHeaders.append("Sec-WebSocket-Protocol: \(expectedProtocol)") |
| 149 | + expectFullRequestResponseTests = false |
| 150 | ++ bufferedSendingTests = false |
| 151 | ++ fragmentedTests = false |
| 152 | + completeUpgrade = true |
| 153 | + sendClosePacket = true |
| 154 | + } |
| 155 | +@@ -983,10 +1005,41 @@ public class TestURLSessionServer: CustomStringConvertible { |
| 156 | + NSLog("Invalid string frame") |
| 157 | + throw InternalServerError.badBody |
| 158 | + } |
| 159 | +- |
| 160 | +- // Send a string message |
| 161 | +- let sendStringFrame = Data([0x81, UInt8(stringPayload.count)]) + stringPayload |
| 162 | +- try httpServer.tcpSocket.writeRawData(sendStringFrame) |
| 163 | ++ |
| 164 | ++ if bufferedSendingTests { |
| 165 | ++ // Send a string message in chunks of 2 bytes |
| 166 | ++ let sendStringFrame = Data([0x81, UInt8(stringPayload.count)]) + stringPayload |
| 167 | ++ let bufferSize = 2 // Let's assume the server has a buffer size of 2 bytes |
| 168 | ++ for i in stride(from: 0, to: sendStringFrame.count, by: bufferSize) { |
| 169 | ++ let end = min(i + bufferSize, sendStringFrame.count) |
| 170 | ++ let chunk = sendStringFrame.subdata(in: i..<end) |
| 171 | ++ try httpServer.tcpSocket.writeRawData(chunk) |
| 172 | ++ Thread.sleep(forTimeInterval: 0.1) // Sleep to simulate buffered sending |
| 173 | ++ } |
| 174 | ++ } |
| 175 | ++ else if fragmentedTests { |
| 176 | ++ // Send a string message fragmented by 1 byte |
| 177 | ++ for (i, byte) in stringPayload.enumerated() { |
| 178 | ++ var frame = Data() |
| 179 | ++ let isFirst = i == 0 |
| 180 | ++ let isLast = i == stringPayload.count - 1 |
| 181 | ++ |
| 182 | ++ let finBit: UInt8 = isLast ? 0x80 : 0x00 |
| 183 | ++ let opcode: UInt8 = isFirst ? 0x1 : 0x0 // 0x1 = text, 0x0 = continuation |
| 184 | ++ let header: UInt8 = finBit | opcode |
| 185 | ++ |
| 186 | ++ frame.append(header) |
| 187 | ++ frame.append(0x01) // payload length 1, unmasked |
| 188 | ++ frame.append(byte) |
| 189 | ++ |
| 190 | ++ try httpServer.tcpSocket.writeRawData(frame) |
| 191 | ++ } |
| 192 | ++ } |
| 193 | ++ else { |
| 194 | ++ // Send a string message |
| 195 | ++ let sendStringFrame = Data([0x81, UInt8(stringPayload.count)]) + stringPayload |
| 196 | ++ try httpServer.tcpSocket.writeRawData(sendStringFrame) |
| 197 | ++ } |
| 198 | + |
| 199 | + // Receive a data message |
| 200 | + guard let dataFrame = try httpServer.tcpSocket.readData(), |
| 201 | +@@ -996,10 +1049,40 @@ public class TestURLSessionServer: CustomStringConvertible { |
| 202 | + NSLog("Invalid data frame") |
| 203 | + throw InternalServerError.badBody |
| 204 | + } |
| 205 | +- |
| 206 | +- // Send a data message |
| 207 | +- let sendDataFrame = Data([0x82, UInt8(dataPayload.count)]) + dataPayload |
| 208 | +- try httpServer.tcpSocket.writeRawData(sendDataFrame) |
| 209 | ++ |
| 210 | ++ if bufferedSendingTests { |
| 211 | ++ let sendDataFrame = Data([0x82, UInt8(dataPayload.count)]) + dataPayload |
| 212 | ++ let bufferSize = 2 // Let's assume the server has a buffer size of 2 bytes |
| 213 | ++ for i in stride(from: 0, to: sendDataFrame.count, by: bufferSize) { |
| 214 | ++ let end = min(i + bufferSize, sendDataFrame.count) |
| 215 | ++ let chunk = sendDataFrame.subdata(in: i..<end) |
| 216 | ++ try httpServer.tcpSocket.writeRawData(chunk) |
| 217 | ++ Thread.sleep(forTimeInterval: 0.1) // Sleep to simulate buffered sending |
| 218 | ++ } |
| 219 | ++ } |
| 220 | ++ else if fragmentedTests { |
| 221 | ++ // Send a data message fragmented by 1 byte |
| 222 | ++ for (i, byte) in dataPayload.enumerated() { |
| 223 | ++ var frame = Data() |
| 224 | ++ let isFirst = i == 0 |
| 225 | ++ let isLast = i == dataPayload.count - 1 |
| 226 | ++ |
| 227 | ++ let finBit: UInt8 = isLast ? 0x80 : 0x00 |
| 228 | ++ let opcode: UInt8 = isFirst ? 0x2 : 0x0 // 0x2 = text, 0x0 = continuation |
| 229 | ++ let header: UInt8 = finBit | opcode |
| 230 | ++ |
| 231 | ++ frame.append(header) |
| 232 | ++ frame.append(0x01) // payload length 1, unmasked |
| 233 | ++ frame.append(byte) |
| 234 | ++ |
| 235 | ++ try httpServer.tcpSocket.writeRawData(frame) |
| 236 | ++ } |
| 237 | ++ } |
| 238 | ++ else { |
| 239 | ++ // Send a data message |
| 240 | ++ let sendDataFrame = Data([0x82, UInt8(dataPayload.count)]) + dataPayload |
| 241 | ++ try httpServer.tcpSocket.writeRawData(sendDataFrame) |
| 242 | ++ } |
| 243 | + |
| 244 | + // Receive a ping |
| 245 | + guard let pingFrame = try httpServer.tcpSocket.readData(), |
| 246 | +diff --git a/Tests/Foundation/TestURLSession.swift b/Tests/Foundation/TestURLSession.swift |
| 247 | +index fa200abc..07a33a67 100644 |
| 248 | +--- a/Tests/Foundation/TestURLSession.swift |
| 249 | ++++ b/Tests/Foundation/TestURLSession.swift |
| 250 | +@@ -2182,59 +2182,64 @@ final class TestURLSession: LoopbackServerTest, @unchecked Sendable { |
| 251 | + print("libcurl lacks WebSockets support, skipping \(#function)") |
| 252 | + return |
| 253 | + } |
| 254 | +- |
| 255 | +- let urlString = "ws://127.0.0.1:\(TestURLSession.serverPort)/web-socket" |
| 256 | +- let url = try XCTUnwrap(URL(string: urlString)) |
| 257 | +- let request = URLRequest(url: url) |
| 258 | +- |
| 259 | +- let delegate = SessionDelegate(with: expectation(description: "\(urlString): Connect")) |
| 260 | +- let task = delegate.runWebSocketTask(with: request, timeoutInterval: 4) |
| 261 | +- |
| 262 | +- // We interleave sending and receiving, as the test HTTPServer implementation is barebones, and can't handle receiving more than one frame at a time. So, this back-and-forth acts as a gating mechanism |
| 263 | +- try await task.send(.string("Hello")) |
| 264 | +- |
| 265 | +- let stringMessage = try await task.receive() |
| 266 | +- switch stringMessage { |
| 267 | +- case .string(let str): |
| 268 | +- XCTAssert(str == "Hello") |
| 269 | +- default: |
| 270 | +- XCTFail("Unexpected String Message") |
| 271 | +- } |
| 272 | +- |
| 273 | +- try await task.send(.data(Data([0x20, 0x22, 0x10, 0x03]))) |
| 274 | +- |
| 275 | +- let dataMessage = try await task.receive() |
| 276 | +- switch dataMessage { |
| 277 | +- case .data(let data): |
| 278 | +- XCTAssert(data == Data([0x20, 0x22, 0x10, 0x03])) |
| 279 | +- default: |
| 280 | +- XCTFail("Unexpected Data Message") |
| 281 | +- } |
| 282 | +- |
| 283 | +- do { |
| 284 | +- try await task.sendPing() |
| 285 | +- // Server hasn't closed the connection yet |
| 286 | +- } catch { |
| 287 | +- // Server closed the connection before we could process the pong |
| 288 | +- let urlError = try XCTUnwrap(error as? URLError) |
| 289 | +- XCTAssertEqual(urlError._nsError.code, NSURLErrorNetworkConnectionLost) |
| 290 | +- } |
| 291 | + |
| 292 | +- await fulfillment(of: [delegate.expectation], timeout: 50) |
| 293 | +- |
| 294 | +- do { |
| 295 | +- _ = try await task.receive() |
| 296 | +- XCTFail("Expected to throw when receiving on closed task") |
| 297 | +- } catch { |
| 298 | +- let urlError = try XCTUnwrap(error as? URLError) |
| 299 | +- XCTAssertEqual(urlError._nsError.code, NSURLErrorNetworkConnectionLost) |
| 300 | ++ func testWebSocket(withURL urlString: String) async throws -> Void { |
| 301 | ++ let url = try XCTUnwrap(URL(string: urlString)) |
| 302 | ++ let request = URLRequest(url: url) |
| 303 | ++ |
| 304 | ++ let delegate = SessionDelegate(with: expectation(description: "\(urlString): Connect")) |
| 305 | ++ let task = delegate.runWebSocketTask(with: request, timeoutInterval: 4) |
| 306 | ++ |
| 307 | ++ // We interleave sending and receiving, as the test HTTPServer implementation is barebones, and can't handle receiving more than one frame at a time. So, this back-and-forth acts as a gating mechanism |
| 308 | ++ try await task.send(.string("Hello")) |
| 309 | ++ |
| 310 | ++ let stringMessage = try await task.receive() |
| 311 | ++ switch stringMessage { |
| 312 | ++ case .string(let str): |
| 313 | ++ XCTAssert(str == "Hello") |
| 314 | ++ default: |
| 315 | ++ XCTFail("Unexpected String Message") |
| 316 | ++ } |
| 317 | ++ |
| 318 | ++ try await task.send(.data(Data([0x20, 0x22, 0x10, 0x03]))) |
| 319 | ++ |
| 320 | ++ let dataMessage = try await task.receive() |
| 321 | ++ switch dataMessage { |
| 322 | ++ case .data(let data): |
| 323 | ++ XCTAssert(data == Data([0x20, 0x22, 0x10, 0x03])) |
| 324 | ++ default: |
| 325 | ++ XCTFail("Unexpected Data Message") |
| 326 | ++ } |
| 327 | ++ |
| 328 | ++ do { |
| 329 | ++ try await task.sendPing() |
| 330 | ++ // Server hasn't closed the connection yet |
| 331 | ++ } catch { |
| 332 | ++ // Server closed the connection before we could process the pong |
| 333 | ++ let urlError = try XCTUnwrap(error as? URLError) |
| 334 | ++ XCTAssertEqual(urlError._nsError.code, NSURLErrorNetworkConnectionLost) |
| 335 | ++ } |
| 336 | ++ |
| 337 | ++ await fulfillment(of: [delegate.expectation], timeout: 50) |
| 338 | ++ |
| 339 | ++ do { |
| 340 | ++ _ = try await task.receive() |
| 341 | ++ XCTFail("Expected to throw when receiving on closed task") |
| 342 | ++ } catch { |
| 343 | ++ let urlError = try XCTUnwrap(error as? URLError) |
| 344 | ++ XCTAssertEqual(urlError._nsError.code, NSURLErrorNetworkConnectionLost) |
| 345 | ++ } |
| 346 | ++ |
| 347 | ++ let callbacks = [ "urlSession(_:webSocketTask:didOpenWithProtocol:)", |
| 348 | ++ "urlSession(_:webSocketTask:didCloseWith:reason:)", |
| 349 | ++ "urlSession(_:task:didCompleteWithError:)" ] |
| 350 | ++ XCTAssertEqual(delegate.callbacks.count, callbacks.count) |
| 351 | ++ XCTAssertEqual(delegate.callbacks, callbacks, "Callbacks for \(#function)") |
| 352 | + } |
| 353 | +- |
| 354 | +- let callbacks = [ "urlSession(_:webSocketTask:didOpenWithProtocol:)", |
| 355 | +- "urlSession(_:webSocketTask:didCloseWith:reason:)", |
| 356 | +- "urlSession(_:task:didCompleteWithError:)" ] |
| 357 | +- XCTAssertEqual(delegate.callbacks.count, callbacks.count) |
| 358 | +- XCTAssertEqual(delegate.callbacks, callbacks, "Callbacks for \(#function)") |
| 359 | ++ |
| 360 | ++ try await testWebSocket(withURL: "ws://127.0.0.1:\(TestURLSession.serverPort)/web-socket") |
| 361 | ++ try await testWebSocket(withURL: "ws://127.0.0.1:\(TestURLSession.serverPort)/web-socket/buffered-sending") |
| 362 | ++ try await testWebSocket(withURL: "ws://127.0.0.1:\(TestURLSession.serverPort)/web-socket/fragmented") |
| 363 | + } |
| 364 | + |
| 365 | + func test_webSocketShared() async throws { |
| 366 | +-- |
| 367 | +2.46.0 |
| 368 | + |
0 commit comments