Skip to content

Commit

Permalink
baby steps towards a Structured Concurrency API (#806)
Browse files Browse the repository at this point in the history
At the moment, `HTTPClient`'s entire API surface violates Structured
Concurrency. Both the creation & shutdown of a HTTP client as well as
making requests (#807) doesn't follow Structured Concurrency. Some of
the problems are:

1. Upon return of methods, resources are still in active use in other
threads/tasks
2. Cancellation doesn't always work

This PR is baby steps towards a Structured Concurrency API, starting
with start/shutdown of the HTTP client.

Co-authored-by: Johannes Weiss <johannes@jweiss.io>
  • Loading branch information
weissi and Johannes Weiss authored Feb 6, 2025
1 parent 81384de commit 89dc8d0
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 2 deletions.
12 changes: 12 additions & 0 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ extension HTTPClient {
/// - request: HTTP request to execute.
/// - deadline: Point in time by which the request must complete.
/// - logger: The logger to use for this request.
///
/// - warning: This method may violates Structured Concurrency because it returns a `HTTPClientResponse` that needs to be
/// streamed by the user. This means the request, the connection and other resources are still alive when the request returns.
///
/// - Returns: The response to the request. Note that the `body` of the response may not yet have been fully received.
public func execute(
_ request: HTTPClientRequest,
Expand All @@ -51,6 +55,10 @@ extension HTTPClient {
/// - request: HTTP request to execute.
/// - timeout: time the the request has to complete.
/// - logger: The logger to use for this request.
///
/// - warning: This method may violates Structured Concurrency because it returns a `HTTPClientResponse` that needs to be
/// streamed by the user. This means the request, the connection and other resources are still alive when the request returns.
///
/// - Returns: The response to the request. Note that the `body` of the response may not yet have been fully received.
public func execute(
_ request: HTTPClientRequest,
Expand All @@ -67,6 +75,8 @@ extension HTTPClient {

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClient {
/// - warning: This method may violates Structured Concurrency because it returns a `HTTPClientResponse` that needs to be
/// streamed by the user. This means the request, the connection and other resources are still alive when the request returns.
private func executeAndFollowRedirectsIfNeeded(
_ request: HTTPClientRequest,
deadline: NIODeadline,
Expand Down Expand Up @@ -116,6 +126,8 @@ extension HTTPClient {
}
}

/// - warning: This method may violates Structured Concurrency because it returns a `HTTPClientResponse` that needs to be
/// streamed by the user. This means the request, the connection and other resources are still alive when the request returns.
private func executeCancellable(
_ request: HTTPClientRequest.Prepared,
deadline: NIODeadline,
Expand Down
72 changes: 72 additions & 0 deletions Sources/AsyncHTTPClient/HTTPClient+StructuredConcurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2025 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging
import NIO

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClient {
#if compiler(>=6.0)
/// Start & automatically shut down a new ``HTTPClient``.
///
/// This method allows to start & automatically dispose of a ``HTTPClient`` following the principle of Structured Concurrency.
/// The ``HTTPClient`` is guaranteed to be shut down upon return, whether `body` throws or not.
///
/// This may be particularly useful if you cannot use the shared singleton (``HTTPClient/shared``).
public static func withHTTPClient<Return>(
eventLoopGroup: any EventLoopGroup = HTTPClient.defaultEventLoopGroup,
configuration: Configuration = Configuration(),
backgroundActivityLogger: Logger? = nil,
isolation: isolated (any Actor)? = #isolation,
_ body: (HTTPClient) async throws -> Return
) async throws -> Return {
let logger = (backgroundActivityLogger ?? HTTPClient.loggingDisabled)
let httpClient = HTTPClient(
eventLoopGroup: eventLoopGroup,
configuration: configuration,
backgroundActivityLogger: logger
)
return try await asyncDo {
try await body(httpClient)
} finally: { _ in
try await httpClient.shutdown()
}
}
#else
/// Start & automatically shut down a new ``HTTPClient``.
///
/// This method allows to start & automatically dispose of a ``HTTPClient`` following the principle of Structured Concurrency.
/// The ``HTTPClient`` is guaranteed to be shut down upon return, whether `body` throws or not.
///
/// This may be particularly useful if you cannot use the shared singleton (``HTTPClient/shared``).
public static func withHTTPClient<Return: Sendable>(
eventLoopGroup: any EventLoopGroup = HTTPClient.defaultEventLoopGroup,
configuration: Configuration = Configuration(),
backgroundActivityLogger: Logger? = nil,
_ body: (HTTPClient) async throws -> Return
) async throws -> Return {
let logger = (backgroundActivityLogger ?? HTTPClient.loggingDisabled)
let httpClient = HTTPClient(
eventLoopGroup: eventLoopGroup,
configuration: configuration,
backgroundActivityLogger: logger
)
return try await asyncDo {
try await body(httpClient)
} finally: { _ in
try await httpClient.shutdown()
}
}
#endif
}
11 changes: 9 additions & 2 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -885,19 +885,26 @@ extension HTTPClient {

/// Provides the result of this request.
///
/// - warning: This method may violates Structured Concurrency because doesn't respect cancellation.
///
/// - returns: The value of ``futureResult`` when it completes.
/// - throws: The error value of ``futureResult`` if it errors.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public func get() async throws -> Response {
try await self.promise.futureResult.get()
}

/// Cancels the request execution.
/// Initiate cancellation of a HTTP request.
///
/// This method will return immeidately and doesn't wait for the cancellation to complete.
public func cancel() {
self.fail(reason: HTTPClientError.cancelled)
}

/// Cancels the request execution with a custom `Error`.
/// Initiate cancellation of a HTTP request with an `error`.
///
/// This method will return immeidately and doesn't wait for the cancellation to complete.
///
/// - Parameter error: the error that is used to fail the promise
public func fail(reason error: Error) {
let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in
Expand Down
80 changes: 80 additions & 0 deletions Sources/AsyncHTTPClient/StructuredConcurrencyHelpers.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2025 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if compiler(>=6.0)
@inlinable
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func asyncDo<R>(
isolation: isolated (any Actor)? = #isolation,
_ body: () async throws -> sending R,
finally: sending @escaping ((any Error)?) async throws -> Void
) async throws -> sending R {
let result: R
do {
result = try await body()
} catch {
// `body` failed, we need to invoke `finally` with the `error`.

// This _looks_ unstructured but isn't really because we unconditionally always await the return.
// We need to have an uncancelled task here to assure this is actually running in case we hit a
// cancellation error.
try await Task {
try await finally(error)
}.value
throw error
}

// `body` succeeded, we need to invoke `finally` with `nil` (no error).

// This _looks_ unstructured but isn't really because we unconditionally always await the return.
// We need to have an uncancelled task here to assure this is actually running in case we hit a
// cancellation error.
try await Task {
try await finally(nil)
}.value
return result
}
#else
@inlinable
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func asyncDo<R: Sendable>(
_ body: () async throws -> R,
finally: @escaping @Sendable ((any Error)?) async throws -> Void
) async throws -> R {
let result: R
do {
result = try await body()
} catch {
// `body` failed, we need to invoke `finally` with the `error`.

// This _looks_ unstructured but isn't really because we unconditionally always await the return.
// We need to have an uncancelled task here to assure this is actually running in case we hit a
// cancellation error.
try await Task {
try await finally(error)
}.value
throw error
}

// `body` succeeded, we need to invoke `finally` with `nil` (no error).

// This _looks_ unstructured but isn't really because we unconditionally always await the return.
// We need to have an uncancelled task here to assure this is actually running in case we hit a
// cancellation error.
try await Task {
try await finally(nil)
}.value
return result
}
#endif
100 changes: 100 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClient+StructuredConcurrencyTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2025 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import AsyncHTTPClient
import NIO
import NIOFoundationCompat
import XCTest

final class HTTPClientStructuredConcurrencyTests: XCTestCase {
func testDoNothingWorks() async throws {
let actual = try await HTTPClient.withHTTPClient { httpClient in
"OK"
}
XCTAssertEqual("OK", actual)
}

func testShuttingDownTheClientInBodyLeadsToError() async {
do {
let actual = try await HTTPClient.withHTTPClient { httpClient in
try await httpClient.shutdown()
return "OK"
}
XCTFail("Expected error, got \(actual)")
} catch let error as HTTPClientError where error == .alreadyShutdown {
// OK
} catch {
XCTFail("unexpected error: \(error)")
}
}

func testBasicRequest() async throws {
let httpBin = HTTPBin()
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let actualBytes = try await HTTPClient.withHTTPClient { httpClient in
let response = try await httpClient.get(url: httpBin.baseURL).get()
XCTAssertEqual(response.status, .ok)
return response.body ?? ByteBuffer(string: "n/a")
}
let actual = try JSONDecoder().decode(RequestInfo.self, from: actualBytes)

XCTAssertGreaterThanOrEqual(actual.requestNumber, 0)
XCTAssertGreaterThanOrEqual(actual.connectionNumber, 0)
}

func testClientIsShutDownAfterReturn() async throws {
let leakedClient = try await HTTPClient.withHTTPClient { httpClient in
httpClient
}
do {
try await leakedClient.shutdown()
XCTFail("unexpected, shutdown should have failed")
} catch let error as HTTPClientError where error == .alreadyShutdown {
// OK
} catch {
XCTFail("unexpected error: \(error)")
}
}

func testClientIsShutDownOnThrowAlso() async throws {
struct TestError: Error {
var httpClient: HTTPClient
}

let leakedClient: HTTPClient
do {
try await HTTPClient.withHTTPClient { httpClient in
throw TestError(httpClient: httpClient)
}
XCTFail("unexpected, shutdown should have failed")
return
} catch let error as TestError {
// OK
leakedClient = error.httpClient
} catch {
XCTFail("unexpected error: \(error)")
return
}

do {
try await leakedClient.shutdown()
XCTFail("unexpected, shutdown should have failed")
} catch let error as HTTPClientError where error == .alreadyShutdown {
// OK
} catch {
XCTFail("unexpected error: \(error)")
}
}
}

0 comments on commit 89dc8d0

Please sign in to comment.