Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions Sources/HTTPClientConformance/HTTPClientConformance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ struct BasicConformanceTests<Client: HTTPClient & ~Copyable> {
try await testNotFound()
try await testStatusOutOfRangeButValid()
try await testStressTest()
try await testEchoInterleave()
try await testGetConvenience()
try await testPostConvenience()
try await testCancelPreHeaders()
try await testCancelPreBody()

// TODO: URLSession client hangs because of a bug where single bytes cannot be sent.
// try await testEchoInterleave()

// TODO: URLSession client hangs because of a bug where single bytes cannot be sent and requests cannot outlive responses.
// try await testSpeakInterleave()

// TODO: Writing just an empty span causes an indefinite stall. The terminating chunk (size 0) is not written out on the wire.
// try await testEmptyChunkedBody()
}
Expand Down Expand Up @@ -386,22 +391,19 @@ struct BasicConformanceTests<Client: HTTPClient & ~Copyable> {
)

// Used to ping-pong between the client-side writer and reader
let writerWaiting: Mutex<CheckedContinuation<Void, Never>?> = .init(nil)
let (writerWaiting, continuation) = AsyncStream<Void>.makeStream()

try await client.perform(
request: request,
body: .restartable { writer in
var writer = writer

for _ in 0..<1000 {
// TODO: There's a bug that prevents a single byte from being
// successfully written out as a chunk. So write 2 bytes for now.
try await writer.write("AB".utf8.span)
// Write a 1-byte chunk
try await writer.write("A".utf8.span)

// Only proceed once the client receives the echo.
await withCheckedContinuation { continuation in
writerWaiting.withLock { $0 = continuation }
}
await writerWaiting.first(where: { true })
}
return nil
}
Expand All @@ -411,18 +413,60 @@ struct BasicConformanceTests<Client: HTTPClient & ~Copyable> {
var numberOfChunks = 0
try await reader.forEach { span in
numberOfChunks += 1
#expect(span.count == 2)
#expect(span.count == 1)
#expect(span[0] == UInt8(ascii: "A"))
#expect(span[1] == UInt8(ascii: "B"))

// Unblock the writer
writerWaiting.withLock { $0!.resume() }
continuation.yield()
}
#expect(numberOfChunks == 1000)
}
}
}

func testSpeakInterleave() async throws {
let request = HTTPRequest(
method: .post,
scheme: "http",
authority: "127.0.0.1:\(port)",
path: "/speak"
)

let client = try await clientFactory()

let (stream, continuation) = AsyncStream<String>.makeStream()

try await client.perform(
request: request,
body: .restartable { writer in
var writer = writer
var iterator = stream.makeAsyncIterator()

// Wait for a chunk from the server
while let chunk = await iterator.next() {
// Write it back to the server
try await writer.write(chunk.utf8.span)
}
return nil
}
) { response, responseBodyAndTrailers in
#expect(response.status == .ok)
let _ = try await responseBodyAndTrailers.consumeAndConclude { reader in
// Read all chunks from server
try await reader.forEach { span in
let chunk = String(copying: try UTF8Span(validating: span))
#expect(chunk == "A")

// Give chunk to the writer to echo back
continuation.yield(chunk)
}

// No more chunks from server. Stop writing as well.
continuation.finish()
}
}
}

func testCancelPreHeaders() async throws {
try await withThrowingTaskGroup { group in
let client = try await clientFactory()
Expand Down
31 changes: 30 additions & 1 deletion Sources/HTTPClientConformance/TestHTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,35 @@ func serve(server: NIOHTTPServer) async throws {
return nil
}
}
case "/speak":
// Send the headers for the response
let responseBodyAndTrailers = try await responseSender.send(.init(status: .ok))

// Needed since we are lacking call-once closures
var requestBodyAndTrailers = Optional(requestBodyAndTrailers)

try await responseBodyAndTrailers.produceAndConclude {
var writer = $0
let _ = try await requestBodyAndTrailers.take()!.consumeAndConclude {
var reader = $0

// Server writes 1000 1-byte chunks of "A" and expects each
// chunk to be written back by the client before proceeding
// with the next one.
for i in 0..<1000 {
// Write a single-byte chunk
try await writer.write("A".utf8.span)

// Wait for the client to write the same chunk to the request body
try await reader.read(maximumCount: 1) { span in
if span.count != 1 || span[0] != UInt8(ascii: "A") {
assertionFailure("Received unexpected span")
}
}
}
}
return nil
}
case "/stall":
// Wait for an hour (effectively never giving an answer)
try await Task.sleep(for: .seconds(60 * 60))
Expand All @@ -220,7 +249,7 @@ func serve(server: NIOHTTPServer) async throws {
try await responseBody.write([UInt8](repeating: UInt8(ascii: "A"), count: 1000).span)

// Wait for an hour (effectively never giving an answer)
try! await Task.sleep(for: .seconds(60 * 60))
try await Task.sleep(for: .seconds(60 * 60))

assertionFailure("Not expected to complete hour-long wait")

Expand Down
Loading