Skip to content

async/await support #1701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var targets: [PackageDescription.Target] = [
"CNIODarwin",
"CNIOWindows",
"NIOConcurrencyHelpers"]),
.target(name: "_NIOConcurrency",
dependencies: ["NIO"]),
.target(name: "NIOFoundationCompat", dependencies: ["NIO"]),
.target(name: "CNIOAtomics", dependencies: []),
.target(name: "CNIOSHA1", dependencies: []),
Expand Down Expand Up @@ -65,6 +67,8 @@ var targets: [PackageDescription.Target] = [
dependencies: ["NIO", "NIOHTTP1"]),
.target(name: "NIOCrashTester",
dependencies: ["NIO", "NIOHTTP1", "NIOWebSocket", "NIOFoundationCompat"]),
.target(name: "NIOAsyncAwaitDemo",
dependencies: ["NIO", "NIOHTTP1", "_NIOConcurrency"]),
.testTarget(name: "NIOTests",
dependencies: ["NIO", "NIOFoundationCompat", "NIOTestUtils", "NIOConcurrencyHelpers"]),
.testTarget(name: "NIOConcurrencyHelpersTests",
Expand All @@ -86,6 +90,7 @@ let package = Package(
products: [
.library(name: "NIO", targets: ["NIO"]),
.library(name: "_NIO1APIShims", targets: ["_NIO1APIShims"]),
.library(name: "_NIOConcurrency", targets: ["_NIOConcurrency"]),
.library(name: "NIOTLS", targets: ["NIOTLS"]),
.library(name: "NIOHTTP1", targets: ["NIOHTTP1"]),
.library(name: "NIOConcurrencyHelpers", targets: ["NIOConcurrencyHelpers"]),
Expand Down
43 changes: 43 additions & 0 deletions Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import NIOHTTP1

#if compiler(>=5.4) // we cannot write this on one line with `&&` because Swift 5.0 doesn't like it...
#if compiler(>=5.4) && $AsyncAwait
struct AsyncChannelIO<Request, Response> {
let channel: Channel

init(_ channel: Channel) {
self.channel = channel
}

func start() async throws -> AsyncChannelIO<Request, Response> {
try await channel.pipeline.addHandler(RequestResponseHandler<HTTPRequestHead, NIOHTTPClientResponseFull>()).get()
return self
}

func sendRequest(_ request: Request) async throws -> Response {
let responsePromise: EventLoopPromise<Response> = channel.eventLoop.makePromise()
try await self.channel.writeAndFlush((request, responsePromise)).get()
return try await responsePromise.futureResult.get()
}

func close() async throws {
try await self.channel.close()
}
}
#endif
#endif
132 changes: 132 additions & 0 deletions Sources/NIOAsyncAwaitDemo/FullRequestResponse.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

// THIS FILE IS MOSTLY COPIED FROM swift-nio-extras

import NIO
import NIOHTTP1

public final class MakeFullRequestHandler: ChannelOutboundHandler {
public typealias OutboundOut = HTTPClientRequestPart
public typealias OutboundIn = HTTPRequestHead

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let req = self.unwrapOutboundIn(data)

context.write(self.wrapOutboundOut(.head(req)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
}
}

/// `RequestResponseHandler` receives a `Request` alongside an `EventLoopPromise<Response>` from the `Channel`'s
/// outbound side. It will fulfill the promise with the `Response` once it's received from the `Channel`'s inbound
/// side.
///
/// `RequestResponseHandler` does support pipelining `Request`s and it will send them pipelined further down the
/// `Channel`. Should `RequestResponseHandler` receive an error from the `Channel`, it will fail all promises meant for
/// the outstanding `Reponse`s and close the `Channel`. All requests enqueued after an error occured will be immediately
/// failed with the first error the channel received.
///
/// `RequestResponseHandler` requires that the `Response`s arrive on `Channel` in the same order as the `Request`s
/// were submitted.
public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandler {
public typealias InboundIn = Response
public typealias InboundOut = Never
public typealias OutboundIn = (Request, EventLoopPromise<Response>)
public typealias OutboundOut = Request

private enum State {
case operational
case error(Error)

var isOperational: Bool {
switch self {
case .operational:
return true
case .error:
return false
}
}
}

private var state: State = .operational
private var promiseBuffer: CircularBuffer<EventLoopPromise<Response>>


/// Create a new `RequestResponseHandler`.
///
/// - parameters:
/// - initialBufferCapacity: `RequestResponseHandler` saves the promises for all outstanding responses in a
/// buffer. `initialBufferCapacity` is the initial capacity for this buffer. You usually do not need to set
/// this parameter unless you intend to pipeline very deeply and don't want the buffer to resize.
public init(initialBufferCapacity: Int = 4) {
self.promiseBuffer = CircularBuffer(initialCapacity: initialBufferCapacity)
}

public func channelInactive(context: ChannelHandlerContext) {
switch self.state {
case .error:
// We failed any outstanding promises when we entered the error state and will fail any
// new promises in write.
assert(self.promiseBuffer.count == 0)
case .operational:
let promiseBuffer = self.promiseBuffer
self.promiseBuffer.removeAll()
promiseBuffer.forEach { promise in
promise.fail(ChannelError.eof)
}
}
context.fireChannelInactive()
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard self.state.isOperational else {
// we're in an error state, ignore further responses
assert(self.promiseBuffer.count == 0)
return
}

let response = self.unwrapInboundIn(data)
let promise = self.promiseBuffer.removeFirst()

promise.succeed(response)
}

public func errorCaught(context: ChannelHandlerContext, error: Error) {
guard self.state.isOperational else {
assert(self.promiseBuffer.count == 0)
return
}
self.state = .error(error)
let promiseBuffer = self.promiseBuffer
self.promiseBuffer.removeAll()
context.close(promise: nil)
promiseBuffer.forEach {
$0.fail(error)
}
}

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let (request, responsePromise) = self.unwrapOutboundIn(data)
switch self.state {
case .error(let error):
assert(self.promiseBuffer.count == 0)
responsePromise.fail(error)
promise?.fail(error)
case .operational:
self.promiseBuffer.append(responsePromise)
context.write(self.wrapOutboundOut(request), promise: promise)
}
}
}
77 changes: 77 additions & 0 deletions Sources/NIOAsyncAwaitDemo/main.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import _NIOConcurrency
import NIOHTTP1
import Dispatch

#if compiler(>=5.4) // we cannot write this on one line with `&&` because Swift 5.0 doesn't like it...
#if compiler(>=5.4) && $AsyncAwait

import _Concurrency

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
try! group.syncShutdownGracefully()
}

func makeHTTPChannel(host: String, port: Int) async throws -> AsyncChannelIO<HTTPRequestHead, NIOHTTPClientResponseFull> {
let channel = try await ClientBootstrap(group: group).connect(host: host, port: port).get()
try await channel.pipeline.addHTTPClientHandlers().get()
try await channel.pipeline.addHandler(NIOHTTPClientResponseAggregator(maxContentLength: 1_000_000))
try await channel.pipeline.addHandler(MakeFullRequestHandler())
return try await AsyncChannelIO<HTTPRequestHead, NIOHTTPClientResponseFull>(channel).start()
}

func main() async {
do {
let channel = try await makeHTTPChannel(host: "httpbin.org", port: 80)
print("OK, connected to \(channel)")

print("Sending request 1", terminator: "")
let response1 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1,
method: .GET,
uri: "/base64/SGVsbG8gV29ybGQsIGZyb20gSFRUUEJpbiEgCg==",
headers: ["host": "httpbin.org"]))
print(", response:", String(buffer: response1.body ?? ByteBuffer()))

print("Sending request 2", terminator: "")
let response2 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1,
method: .GET,
uri: "/get",
headers: ["host": "httpbin.org"]))
print(", response:", String(buffer: response2.body ?? ByteBuffer()))

try await channel.close()
print("all, done")
} catch {
print("ERROR: \(error)")
}
}

let dg = DispatchGroup()
dg.enter()
let task = Task.runDetached {
await main()
dg.leave()
}
dg.wait()
#else
print("ERROR: This demo only works with async/await enabled (NIO.System.hasAsyncAwaitSupport = \(NIO.System.hasAsyncAwaitSupport))")
print("Try: swift run -Xswiftc -Xfrontend -Xswiftc -enable-experimental-concurrency NIOAsyncAwaitDemo")
#endif
#else
print("ERROR: Concurrency only supported on Swift > 5.4.")
#endif
Loading