Skip to content

Commit 6687ef5

Browse files
committed
async/await support
Motivation: The async/await proposal has entered the review phase, so we may want to start looking into NIO with async/await. Modifications: - Add `EventLoopFuture.get() throws await` - Add `ChannelOutboundInvoker` `async` methods Result: If async/await were actually implemented, you could use NIO together with async/wait. So far, you can only compile your projects.
1 parent 8f531c3 commit 6687ef5

File tree

5 files changed

+310
-0
lines changed

5 files changed

+310
-0
lines changed

Package.swift

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ var targets: [PackageDescription.Target] = [
6565
dependencies: ["NIO", "NIOHTTP1"]),
6666
.target(name: "NIOCrashTester",
6767
dependencies: ["NIO", "NIOHTTP1", "NIOWebSocket", "NIOFoundationCompat"]),
68+
.target(name: "NIOAsyncAwaitDemo",
69+
dependencies: ["NIO", "NIOHTTP1"]),
6870
.testTarget(name: "NIOTests",
6971
dependencies: ["NIO", "NIOFoundationCompat", "NIOTestUtils", "NIOConcurrencyHelpers"]),
7072
.testTarget(name: "NIOConcurrencyHelpersTests",

Sources/NIO/AsyncAwaitSupport.swift

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#if compiler(>=5.4) && $AsyncAwait
2+
extension EventLoopFuture {
3+
public func get() async throws -> Value {
4+
return try await withUnsafeThrowingContinuation { cont in
5+
self.whenComplete { result in
6+
switch result {
7+
case .success(let value):
8+
cont.resume(returning: value)
9+
case .failure(let error):
10+
cont.resume(throwing: error)
11+
}
12+
}
13+
}
14+
}
15+
}
16+
17+
extension Channel {
18+
public func writeAndFlush<T>(_ any: T) async throws {
19+
try await self.writeAndFlush(any).get()
20+
}
21+
22+
/// Set `option` to `value` on this `Channel`.
23+
public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) async throws {
24+
try await self.setOption(option, value: value).get()
25+
}
26+
27+
/// Get the value of `option` for this `Channel`.
28+
public func getOption<Option: ChannelOption>(_ option: Option) async throws -> Option.Value {
29+
return try await self.getOption(option).get()
30+
}
31+
}
32+
33+
extension ChannelOutboundInvoker {
34+
/// Register on an `EventLoop` and so have all its IO handled.
35+
///
36+
/// - returns: the future which will be notified once the operation completes.
37+
public func register(file: StaticString = #file, line: UInt = #line) async throws {
38+
try await self.register(file: file, line: line).get()
39+
}
40+
41+
/// Bind to a `SocketAddress`.
42+
/// - parameters:
43+
/// - to: the `SocketAddress` to which we should bind the `Channel`.
44+
/// - returns: the future which will be notified once the operation completes.
45+
public func bind(to address: SocketAddress, file: StaticString = #file, line: UInt = #line) async throws {
46+
try await self.bind(to: address, file: file, line: line).get()
47+
}
48+
49+
/// Connect to a `SocketAddress`.
50+
/// - parameters:
51+
/// - to: the `SocketAddress` to which we should connect the `Channel`.
52+
/// - returns: the future which will be notified once the operation completes.
53+
public func connect(to address: SocketAddress, file: StaticString = #file, line: UInt = #line) async throws {
54+
try await self.connect(to: address, file: file, line: line).get()
55+
}
56+
57+
/// Shortcut for calling `write` and `flush`.
58+
///
59+
/// - parameters:
60+
/// - data: the data to write
61+
/// - returns: the future which will be notified once the `write` operation completes.
62+
public func writeAndFlush(_ data: NIOAny, file: StaticString = #file, line: UInt = #line) async throws {
63+
try await self.writeAndFlush(data, file: file, line: line).get()
64+
}
65+
66+
/// Close the `Channel` and so the connection if one exists.
67+
///
68+
/// - parameters:
69+
/// - mode: the `CloseMode` that is used
70+
/// - returns: the future which will be notified once the operation completes.
71+
public func close(mode: CloseMode = .all, file: StaticString = #file, line: UInt = #line) async throws {
72+
try await self.close(mode: mode, file: file, line: line).get()
73+
}
74+
75+
/// Trigger a custom user outbound event which will flow through the `ChannelPipeline`.
76+
///
77+
/// - parameters:
78+
/// - event: the event itself.
79+
/// - returns: the future which will be notified once the operation completes.
80+
public func triggerUserOutboundEvent(_ event: Any, file: StaticString = #file, line: UInt = #line) async throws {
81+
try await self.triggerUserOutboundEvent(event, file: file, line: line).get()
82+
}
83+
}
84+
85+
extension ChannelPipeline {
86+
public func addHandler(_ handler: ChannelHandler,
87+
name: String? = nil,
88+
position: ChannelPipeline.Position = .last) async throws {
89+
return try await self.addHandler(handler, name: name, position: position).get()
90+
}
91+
}
92+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#if compiler(>=5.4) && $AsyncAwait
2+
import NIO
3+
import NIOHTTP1
4+
5+
struct AsyncChannelIO<Request, Response> {
6+
let channel: Channel
7+
8+
init(_ channel: Channel) {
9+
self.channel = channel
10+
}
11+
12+
func start() async throws -> AsyncChannelIO<Request, Response> {
13+
try await channel.pipeline.addHandler(RequestResponseHandler<HTTPRequestHead, NIOHTTPClientResponseFull>()).get()
14+
return self
15+
}
16+
17+
func sendRequest(_ request: Request) async throws -> Response {
18+
let responsePromise: EventLoopPromise<Response> = channel.eventLoop.makePromise()
19+
try await self.channel.writeAndFlush((request, responsePromise)).get()
20+
return try await responsePromise.futureResult.get()
21+
}
22+
23+
func close() async throws {
24+
try await self.channel.close()
25+
}
26+
}
27+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2017-2019 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
16+
// THIS FILE IS MOSTLY COPIED FROM swift-nio-extras
17+
18+
import NIO
19+
import NIOHTTP1
20+
21+
public final class MakeFullRequestHandler: ChannelOutboundHandler {
22+
public typealias OutboundOut = HTTPClientRequestPart
23+
public typealias OutboundIn = HTTPRequestHead
24+
25+
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
26+
let req = self.unwrapOutboundIn(data)
27+
28+
context.write(self.wrapOutboundOut(.head(req)), promise: nil)
29+
context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
30+
}
31+
}
32+
33+
/// `RequestResponseHandler` receives a `Request` alongside an `EventLoopPromise<Response>` from the `Channel`'s
34+
/// outbound side. It will fulfill the promise with the `Response` once it's received from the `Channel`'s inbound
35+
/// side.
36+
///
37+
/// `RequestResponseHandler` does support pipelining `Request`s and it will send them pipelined further down the
38+
/// `Channel`. Should `RequestResponseHandler` receive an error from the `Channel`, it will fail all promises meant for
39+
/// the outstanding `Reponse`s and close the `Channel`. All requests enqueued after an error occured will be immediately
40+
/// failed with the first error the channel received.
41+
///
42+
/// `RequestResponseHandler` requires that the `Response`s arrive on `Channel` in the same order as the `Request`s
43+
/// were submitted.
44+
public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandler {
45+
public typealias InboundIn = Response
46+
public typealias InboundOut = Never
47+
public typealias OutboundIn = (Request, EventLoopPromise<Response>)
48+
public typealias OutboundOut = Request
49+
50+
private enum State {
51+
case operational
52+
case error(Error)
53+
54+
var isOperational: Bool {
55+
switch self {
56+
case .operational:
57+
return true
58+
case .error:
59+
return false
60+
}
61+
}
62+
}
63+
64+
private var state: State = .operational
65+
private var promiseBuffer: CircularBuffer<EventLoopPromise<Response>>
66+
67+
68+
/// Create a new `RequestResponseHandler`.
69+
///
70+
/// - parameters:
71+
/// - initialBufferCapacity: `RequestResponseHandler` saves the promises for all outstanding responses in a
72+
/// buffer. `initialBufferCapacity` is the initial capacity for this buffer. You usually do not need to set
73+
/// this parameter unless you intend to pipeline very deeply and don't want the buffer to resize.
74+
public init(initialBufferCapacity: Int = 4) {
75+
self.promiseBuffer = CircularBuffer(initialCapacity: initialBufferCapacity)
76+
}
77+
78+
public func channelInactive(context: ChannelHandlerContext) {
79+
switch self.state {
80+
case .error:
81+
// We failed any outstanding promises when we entered the error state and will fail any
82+
// new promises in write.
83+
assert(self.promiseBuffer.count == 0)
84+
case .operational:
85+
let promiseBuffer = self.promiseBuffer
86+
self.promiseBuffer.removeAll()
87+
promiseBuffer.forEach { promise in
88+
promise.fail(ChannelError.eof)
89+
}
90+
}
91+
context.fireChannelInactive()
92+
}
93+
94+
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
95+
guard self.state.isOperational else {
96+
// we're in an error state, ignore further responses
97+
assert(self.promiseBuffer.count == 0)
98+
return
99+
}
100+
101+
let response = self.unwrapInboundIn(data)
102+
let promise = self.promiseBuffer.removeFirst()
103+
104+
promise.succeed(response)
105+
}
106+
107+
public func errorCaught(context: ChannelHandlerContext, error: Error) {
108+
guard self.state.isOperational else {
109+
assert(self.promiseBuffer.count == 0)
110+
return
111+
}
112+
self.state = .error(error)
113+
let promiseBuffer = self.promiseBuffer
114+
self.promiseBuffer.removeAll()
115+
context.close(promise: nil)
116+
promiseBuffer.forEach {
117+
$0.fail(error)
118+
}
119+
}
120+
121+
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
122+
let (request, responsePromise) = self.unwrapOutboundIn(data)
123+
switch self.state {
124+
case .error(let error):
125+
assert(self.promiseBuffer.count == 0)
126+
responsePromise.fail(error)
127+
promise?.fail(error)
128+
case .operational:
129+
self.promiseBuffer.append(responsePromise)
130+
context.write(self.wrapOutboundOut(request), promise: promise)
131+
}
132+
}
133+
}

Sources/NIOAsyncAwaitDemo/main.swift

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#if compiler(>=5.4) && $AsyncAwait
2+
import NIO
3+
import NIOHTTP1
4+
import Dispatch
5+
6+
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
7+
defer {
8+
try! group.syncShutdownGracefully()
9+
}
10+
11+
func makeHTTPChannel(host: String, port: Int) async throws -> AsyncChannelIO<HTTPRequestHead, NIOHTTPClientResponseFull> {
12+
let channel = try await ClientBootstrap(group: group).connect(host: host, port: port).get()
13+
try await channel.pipeline.addHTTPClientHandlers().get()
14+
try await channel.pipeline.addHandler(NIOHTTPClientResponseAggregator(maxContentLength: 1_000_000))
15+
try await channel.pipeline.addHandler(MakeFullRequestHandler())
16+
return try await AsyncChannelIO<HTTPRequestHead, NIOHTTPClientResponseFull>(channel).start()
17+
}
18+
19+
func main() async {
20+
do {
21+
let channel = try await makeHTTPChannel(host: "httpbin.org", port: 80)
22+
print("OK, connected to \(channel)")
23+
24+
print("Sending request 1", terminator: "")
25+
let response1 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1,
26+
method: .GET,
27+
uri: "/base64/SGVsbG8gV29ybGQsIGZyb20gSFRUUEJpbiEgCg==",
28+
headers: ["host": "httpbin.org"]))
29+
print(", response:", String(buffer: response1.body ?? ByteBuffer()))
30+
31+
print("Sending request 2", terminator: "")
32+
let response2 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1,
33+
method: .GET,
34+
uri: "/get",
35+
headers: ["host": "httpbin.org"]))
36+
print(", response:", String(buffer: response2.body ?? ByteBuffer()))
37+
38+
try await channel.close()
39+
print("all, done")
40+
} catch {
41+
print("ERROR: \(error)")
42+
}
43+
}
44+
45+
let dg = DispatchGroup()
46+
dg.enter()
47+
let task = Task.runDetached {
48+
await main()
49+
dg.leave()
50+
}
51+
dg.wait()
52+
#else
53+
print("ERROR: This demo only works with async/await enabled")
54+
print("Unfortunately, async/await is currency only supported on macOS.")
55+
print("Try: swift run -Xswiftc -Xfrontend -Xswiftc -enable-experimental-concurrency NIOAsyncAwaitDemo")
56+
#endif

0 commit comments

Comments
 (0)