Skip to content

Commit fefd925

Browse files
committed
async/await support
Motivation: The async/await proposal has entered the review phase, so we may want to start looking into NIO with async/await. Modifications: - Add `EventLoopFuture.get() throws await` - Add `ChannelOutboundInvoker` `async` methods Result: If async/await were actually implemented, you could use NIO together with async/wait. So far, you can only compile your projects.
1 parent 1376c7d commit fefd925

File tree

5 files changed

+444
-0
lines changed

5 files changed

+444
-0
lines changed

Package.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ var targets: [PackageDescription.Target] = [
2323
"CNIODarwin",
2424
"CNIOWindows",
2525
"NIOConcurrencyHelpers"]),
26+
.target(name: "_NIOConcurrency",
27+
dependencies: ["NIO"]),
2628
.target(name: "NIOFoundationCompat", dependencies: ["NIO"]),
2729
.target(name: "CNIOAtomics", dependencies: []),
2830
.target(name: "CNIOSHA1", dependencies: []),
@@ -65,6 +67,8 @@ var targets: [PackageDescription.Target] = [
6567
dependencies: ["NIO", "NIOHTTP1"]),
6668
.target(name: "NIOCrashTester",
6769
dependencies: ["NIO", "NIOHTTP1", "NIOWebSocket", "NIOFoundationCompat"]),
70+
.target(name: "NIOAsyncAwaitDemo",
71+
dependencies: ["NIO", "NIOHTTP1", "_NIOConcurrency"]),
6872
.testTarget(name: "NIOTests",
6973
dependencies: ["NIO", "NIOFoundationCompat", "NIOTestUtils", "NIOConcurrencyHelpers"]),
7074
.testTarget(name: "NIOConcurrencyHelpersTests",
@@ -86,6 +90,7 @@ let package = Package(
8690
products: [
8791
.library(name: "NIO", targets: ["NIO"]),
8892
.library(name: "_NIO1APIShims", targets: ["_NIO1APIShims"]),
93+
.library(name: "_NIOConcurrency", targets: ["_NIOConcurrency"]),
8994
.library(name: "NIOTLS", targets: ["NIOTLS"]),
9095
.library(name: "NIOHTTP1", targets: ["NIOHTTP1"]),
9196
.library(name: "NIOConcurrencyHelpers", targets: ["NIOConcurrencyHelpers"]),
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import NIOHTTP1
17+
18+
#if compiler(>=5.4) // we cannot write this on one line with `&&` because Swift 5.0 doesn't like it...
19+
#if compiler(>=5.4) && $AsyncAwait
20+
struct AsyncChannelIO<Request, Response> {
21+
let channel: Channel
22+
23+
init(_ channel: Channel) {
24+
self.channel = channel
25+
}
26+
27+
func start() async throws -> AsyncChannelIO<Request, Response> {
28+
try await channel.pipeline.addHandler(RequestResponseHandler<HTTPRequestHead, NIOHTTPClientResponseFull>()).get()
29+
return self
30+
}
31+
32+
func sendRequest(_ request: Request) async throws -> Response {
33+
let responsePromise: EventLoopPromise<Response> = channel.eventLoop.makePromise()
34+
try await self.channel.writeAndFlush((request, responsePromise)).get()
35+
return try await responsePromise.futureResult.get()
36+
}
37+
38+
func close() async throws {
39+
try await self.channel.close()
40+
}
41+
}
42+
#endif
43+
#endif
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
// THIS FILE IS MOSTLY COPIED FROM swift-nio-extras
16+
17+
import NIO
18+
import NIOHTTP1
19+
20+
public final class MakeFullRequestHandler: ChannelOutboundHandler {
21+
public typealias OutboundOut = HTTPClientRequestPart
22+
public typealias OutboundIn = HTTPRequestHead
23+
24+
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
25+
let req = self.unwrapOutboundIn(data)
26+
27+
context.write(self.wrapOutboundOut(.head(req)), promise: nil)
28+
context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
29+
}
30+
}
31+
32+
/// `RequestResponseHandler` receives a `Request` alongside an `EventLoopPromise<Response>` from the `Channel`'s
33+
/// outbound side. It will fulfill the promise with the `Response` once it's received from the `Channel`'s inbound
34+
/// side.
35+
///
36+
/// `RequestResponseHandler` does support pipelining `Request`s and it will send them pipelined further down the
37+
/// `Channel`. Should `RequestResponseHandler` receive an error from the `Channel`, it will fail all promises meant for
38+
/// the outstanding `Reponse`s and close the `Channel`. All requests enqueued after an error occured will be immediately
39+
/// failed with the first error the channel received.
40+
///
41+
/// `RequestResponseHandler` requires that the `Response`s arrive on `Channel` in the same order as the `Request`s
42+
/// were submitted.
43+
public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandler {
44+
public typealias InboundIn = Response
45+
public typealias InboundOut = Never
46+
public typealias OutboundIn = (Request, EventLoopPromise<Response>)
47+
public typealias OutboundOut = Request
48+
49+
private enum State {
50+
case operational
51+
case error(Error)
52+
53+
var isOperational: Bool {
54+
switch self {
55+
case .operational:
56+
return true
57+
case .error:
58+
return false
59+
}
60+
}
61+
}
62+
63+
private var state: State = .operational
64+
private var promiseBuffer: CircularBuffer<EventLoopPromise<Response>>
65+
66+
67+
/// Create a new `RequestResponseHandler`.
68+
///
69+
/// - parameters:
70+
/// - initialBufferCapacity: `RequestResponseHandler` saves the promises for all outstanding responses in a
71+
/// buffer. `initialBufferCapacity` is the initial capacity for this buffer. You usually do not need to set
72+
/// this parameter unless you intend to pipeline very deeply and don't want the buffer to resize.
73+
public init(initialBufferCapacity: Int = 4) {
74+
self.promiseBuffer = CircularBuffer(initialCapacity: initialBufferCapacity)
75+
}
76+
77+
public func channelInactive(context: ChannelHandlerContext) {
78+
switch self.state {
79+
case .error:
80+
// We failed any outstanding promises when we entered the error state and will fail any
81+
// new promises in write.
82+
assert(self.promiseBuffer.count == 0)
83+
case .operational:
84+
let promiseBuffer = self.promiseBuffer
85+
self.promiseBuffer.removeAll()
86+
promiseBuffer.forEach { promise in
87+
promise.fail(ChannelError.eof)
88+
}
89+
}
90+
context.fireChannelInactive()
91+
}
92+
93+
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
94+
guard self.state.isOperational else {
95+
// we're in an error state, ignore further responses
96+
assert(self.promiseBuffer.count == 0)
97+
return
98+
}
99+
100+
let response = self.unwrapInboundIn(data)
101+
let promise = self.promiseBuffer.removeFirst()
102+
103+
promise.succeed(response)
104+
}
105+
106+
public func errorCaught(context: ChannelHandlerContext, error: Error) {
107+
guard self.state.isOperational else {
108+
assert(self.promiseBuffer.count == 0)
109+
return
110+
}
111+
self.state = .error(error)
112+
let promiseBuffer = self.promiseBuffer
113+
self.promiseBuffer.removeAll()
114+
context.close(promise: nil)
115+
promiseBuffer.forEach {
116+
$0.fail(error)
117+
}
118+
}
119+
120+
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
121+
let (request, responsePromise) = self.unwrapOutboundIn(data)
122+
switch self.state {
123+
case .error(let error):
124+
assert(self.promiseBuffer.count == 0)
125+
responsePromise.fail(error)
126+
promise?.fail(error)
127+
case .operational:
128+
self.promiseBuffer.append(responsePromise)
129+
context.write(self.wrapOutboundOut(request), promise: promise)
130+
}
131+
}
132+
}

Sources/NIOAsyncAwaitDemo/main.swift

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import _NIOConcurrency
17+
import NIOHTTP1
18+
import Dispatch
19+
20+
#if compiler(>=5.4) // we cannot write this on one line with `&&` because Swift 5.0 doesn't like it...
21+
#if compiler(>=5.4) && $AsyncAwait
22+
23+
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
24+
defer {
25+
try! group.syncShutdownGracefully()
26+
}
27+
28+
func makeHTTPChannel(host: String, port: Int) async throws -> AsyncChannelIO<HTTPRequestHead, NIOHTTPClientResponseFull> {
29+
let channel = try await ClientBootstrap(group: group).connect(host: host, port: port).get()
30+
try await channel.pipeline.addHTTPClientHandlers().get()
31+
try await channel.pipeline.addHandler(NIOHTTPClientResponseAggregator(maxContentLength: 1_000_000))
32+
try await channel.pipeline.addHandler(MakeFullRequestHandler())
33+
return try await AsyncChannelIO<HTTPRequestHead, NIOHTTPClientResponseFull>(channel).start()
34+
}
35+
36+
func main() async {
37+
do {
38+
let channel = try await makeHTTPChannel(host: "httpbin.org", port: 80)
39+
print("OK, connected to \(channel)")
40+
41+
print("Sending request 1", terminator: "")
42+
let response1 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1,
43+
method: .GET,
44+
uri: "/base64/SGVsbG8gV29ybGQsIGZyb20gSFRUUEJpbiEgCg==",
45+
headers: ["host": "httpbin.org"]))
46+
print(", response:", String(buffer: response1.body ?? ByteBuffer()))
47+
48+
print("Sending request 2", terminator: "")
49+
let response2 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1,
50+
method: .GET,
51+
uri: "/get",
52+
headers: ["host": "httpbin.org"]))
53+
print(", response:", String(buffer: response2.body ?? ByteBuffer()))
54+
55+
try await channel.close()
56+
print("all, done")
57+
} catch {
58+
print("ERROR: \(error)")
59+
}
60+
}
61+
62+
let dg = DispatchGroup()
63+
dg.enter()
64+
let task = Task.runDetached {
65+
await main()
66+
dg.leave()
67+
}
68+
dg.wait()
69+
#else
70+
print("ERROR: This demo only works with async/await enabled (NIO.System.hasAsyncAwaitSupport = \(NIO.System.hasAsyncAwaitSupport))")
71+
print("Try: swift run -Xswiftc -Xfrontend -Xswiftc -enable-experimental-concurrency NIOAsyncAwaitDemo")
72+
#endif
73+
#else
74+
print("ERROR: Concurrency only supported on Swift > 5.4.")
75+
#endif

0 commit comments

Comments
 (0)