Skip to content

Commit 756f89c

Browse files
committed
async/await support
Motivation: The async/await proposal has entered the review phase, so we may want to start looking into NIO with async/await. Modifications: - Add `EventLoopFuture.get() throws await` - Add `ChannelOutboundInvoker` `async` methods Result: If async/await were actually implemented, you could use NIO together with async/wait. So far, you can only compile your projects.
1 parent 9c15de3 commit 756f89c

File tree

5 files changed

+316
-1
lines changed

5 files changed

+316
-1
lines changed

Package.swift

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,23 @@
1515

1616
import PackageDescription
1717

18+
let nioSwiftSettings: [SwiftSetting]
19+
20+
#if compiler(>=5.4)
21+
nioSwiftSettings = [.unsafeFlags(["-Xfrontend", "-enable-experimental-concurrency"])]
22+
#else
23+
nioSwiftSettings = []
24+
#endif
25+
1826
var targets: [PackageDescription.Target] = [
1927
.target(name: "_NIO1APIShims",
2028
dependencies: ["NIO", "NIOHTTP1", "NIOTLS", "NIOFoundationCompat", "NIOWebSocket"]),
2129
.target(name: "NIO",
2230
dependencies: ["CNIOLinux",
2331
"CNIODarwin",
2432
"CNIOWindows",
25-
"NIOConcurrencyHelpers"]),
33+
"NIOConcurrencyHelpers"],
34+
swiftSettings: nioSwiftSettings),
2635
.target(name: "NIOFoundationCompat", dependencies: ["NIO"]),
2736
.target(name: "CNIOAtomics", dependencies: []),
2837
.target(name: "CNIOSHA1", dependencies: []),
@@ -81,6 +90,12 @@ var targets: [PackageDescription.Target] = [
8190
dependencies: ["NIO", "NIOFoundationCompat"]),
8291
]
8392

93+
#if compiler(>=5.4)
94+
targets.append(.target(name: "NIOAsyncAwaitDemo",
95+
dependencies: ["NIO", "NIOHTTP1"],
96+
swiftSettings: nioSwiftSettings))
97+
#endif
98+
8499
let package = Package(
85100
name: "swift-nio",
86101
products: [
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#if compiler(>=5.4)
2+
extension EventLoopFuture {
3+
public func get() async throws -> Value {
4+
return try await withUnsafeThrowingContinuation { cont in
5+
self.whenComplete { result in
6+
switch result {
7+
case .success(let value):
8+
cont.resume(returning: value)
9+
case .failure(let error):
10+
cont.resume(throwing: error)
11+
}
12+
}
13+
}
14+
}
15+
}
16+
17+
extension Channel {
18+
public func writeAndFlush<T>(_ any: T) async throws {
19+
try await self.writeAndFlush(any).get()
20+
}
21+
22+
/// Set `option` to `value` on this `Channel`.
23+
public func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) async throws {
24+
try await self.setOption(option, value: value).get()
25+
}
26+
27+
/// Get the value of `option` for this `Channel`.
28+
public func getOption<Option: ChannelOption>(_ option: Option) async throws -> Option.Value {
29+
return try await self.getOption(option).get()
30+
}
31+
}
32+
33+
extension ChannelOutboundInvoker {
34+
/// Register on an `EventLoop` and so have all its IO handled.
35+
///
36+
/// - returns: the future which will be notified once the operation completes.
37+
public func register(file: StaticString = #file, line: UInt = #line) async throws {
38+
try await self.register(file: file, line: line).get()
39+
}
40+
41+
/// Bind to a `SocketAddress`.
42+
/// - parameters:
43+
/// - to: the `SocketAddress` to which we should bind the `Channel`.
44+
/// - returns: the future which will be notified once the operation completes.
45+
public func bind(to address: SocketAddress, file: StaticString = #file, line: UInt = #line) async throws {
46+
try await self.bind(to: address, file: file, line: line).get()
47+
}
48+
49+
/// Connect to a `SocketAddress`.
50+
/// - parameters:
51+
/// - to: the `SocketAddress` to which we should connect the `Channel`.
52+
/// - returns: the future which will be notified once the operation completes.
53+
public func connect(to address: SocketAddress, file: StaticString = #file, line: UInt = #line) async throws {
54+
try await self.connect(to: address, file: file, line: line).get()
55+
}
56+
57+
/// Shortcut for calling `write` and `flush`.
58+
///
59+
/// - parameters:
60+
/// - data: the data to write
61+
/// - returns: the future which will be notified once the `write` operation completes.
62+
public func writeAndFlush(_ data: NIOAny, file: StaticString = #file, line: UInt = #line) async throws {
63+
try await self.writeAndFlush(data, file: file, line: line).get()
64+
}
65+
66+
/// Close the `Channel` and so the connection if one exists.
67+
///
68+
/// - parameters:
69+
/// - mode: the `CloseMode` that is used
70+
/// - returns: the future which will be notified once the operation completes.
71+
public func close(mode: CloseMode = .all, file: StaticString = #file, line: UInt = #line) async throws {
72+
try await self.close(mode: mode, file: file, line: line).get()
73+
}
74+
75+
/// Trigger a custom user outbound event which will flow through the `ChannelPipeline`.
76+
///
77+
/// - parameters:
78+
/// - event: the event itself.
79+
/// - returns: the future which will be notified once the operation completes.
80+
public func triggerUserOutboundEvent(_ event: Any, file: StaticString = #file, line: UInt = #line) async throws {
81+
try await self.triggerUserOutboundEvent(event, file: file, line: line).get()
82+
}
83+
}
84+
85+
extension ChannelPipeline {
86+
public func addHandler(_ handler: ChannelHandler,
87+
name: String? = nil,
88+
position: ChannelPipeline.Position = .last) async throws {
89+
return try await self.addHandler(handler, name: name, position: position).get()
90+
}
91+
}
92+
#endif
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import NIO
2+
import NIOHTTP1
3+
4+
struct AsyncChannelIO<Request, Response> {
5+
let channel: Channel
6+
7+
init(_ channel: Channel) {
8+
self.channel = channel
9+
}
10+
11+
func start() async throws -> AsyncChannelIO<Request, Response> {
12+
try await channel.pipeline.addHandler(RequestResponseHandler<HTTPRequestHead, NIOHTTPClientResponseFull>()).get()
13+
return self
14+
}
15+
16+
func sendRequest(_ request: Request) async throws -> Response {
17+
let responsePromise: EventLoopPromise<Response> = channel.eventLoop.makePromise()
18+
try await self.channel.writeAndFlush((request, responsePromise)).get()
19+
return try await responsePromise.futureResult.get()
20+
}
21+
22+
func close() async throws {
23+
try await self.channel.close()
24+
}
25+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2017-2019 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
16+
// THIS FILE IS MOSTLY COPIED FROM swift-nio-extras
17+
18+
import NIO
19+
import NIOHTTP1
20+
21+
public final class MakeFullRequestHandler: ChannelOutboundHandler {
22+
public typealias OutboundOut = HTTPClientRequestPart
23+
public typealias OutboundIn = HTTPRequestHead
24+
25+
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
26+
let req = self.unwrapOutboundIn(data)
27+
28+
context.write(self.wrapOutboundOut(.head(req)), promise: nil)
29+
context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
30+
}
31+
}
32+
33+
/// `RequestResponseHandler` receives a `Request` alongside an `EventLoopPromise<Response>` from the `Channel`'s
34+
/// outbound side. It will fulfill the promise with the `Response` once it's received from the `Channel`'s inbound
35+
/// side.
36+
///
37+
/// `RequestResponseHandler` does support pipelining `Request`s and it will send them pipelined further down the
38+
/// `Channel`. Should `RequestResponseHandler` receive an error from the `Channel`, it will fail all promises meant for
39+
/// the outstanding `Reponse`s and close the `Channel`. All requests enqueued after an error occured will be immediately
40+
/// failed with the first error the channel received.
41+
///
42+
/// `RequestResponseHandler` requires that the `Response`s arrive on `Channel` in the same order as the `Request`s
43+
/// were submitted.
44+
public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandler {
45+
public typealias InboundIn = Response
46+
public typealias InboundOut = Never
47+
public typealias OutboundIn = (Request, EventLoopPromise<Response>)
48+
public typealias OutboundOut = Request
49+
50+
private enum State {
51+
case operational
52+
case error(Error)
53+
54+
var isOperational: Bool {
55+
switch self {
56+
case .operational:
57+
return true
58+
case .error:
59+
return false
60+
}
61+
}
62+
}
63+
64+
private var state: State = .operational
65+
private var promiseBuffer: CircularBuffer<EventLoopPromise<Response>>
66+
67+
68+
/// Create a new `RequestResponseHandler`.
69+
///
70+
/// - parameters:
71+
/// - initialBufferCapacity: `RequestResponseHandler` saves the promises for all outstanding responses in a
72+
/// buffer. `initialBufferCapacity` is the initial capacity for this buffer. You usually do not need to set
73+
/// this parameter unless you intend to pipeline very deeply and don't want the buffer to resize.
74+
public init(initialBufferCapacity: Int = 4) {
75+
self.promiseBuffer = CircularBuffer(initialCapacity: initialBufferCapacity)
76+
}
77+
78+
public func channelInactive(context: ChannelHandlerContext) {
79+
switch self.state {
80+
case .error:
81+
// We failed any outstanding promises when we entered the error state and will fail any
82+
// new promises in write.
83+
assert(self.promiseBuffer.count == 0)
84+
case .operational:
85+
let promiseBuffer = self.promiseBuffer
86+
self.promiseBuffer.removeAll()
87+
promiseBuffer.forEach { promise in
88+
promise.fail(ChannelError.eof)
89+
}
90+
}
91+
context.fireChannelInactive()
92+
}
93+
94+
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
95+
guard self.state.isOperational else {
96+
// we're in an error state, ignore further responses
97+
assert(self.promiseBuffer.count == 0)
98+
return
99+
}
100+
101+
let response = self.unwrapInboundIn(data)
102+
let promise = self.promiseBuffer.removeFirst()
103+
104+
promise.succeed(response)
105+
}
106+
107+
public func errorCaught(context: ChannelHandlerContext, error: Error) {
108+
guard self.state.isOperational else {
109+
assert(self.promiseBuffer.count == 0)
110+
return
111+
}
112+
self.state = .error(error)
113+
let promiseBuffer = self.promiseBuffer
114+
self.promiseBuffer.removeAll()
115+
context.close(promise: nil)
116+
promiseBuffer.forEach {
117+
$0.fail(error)
118+
}
119+
}
120+
121+
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
122+
let (request, responsePromise) = self.unwrapOutboundIn(data)
123+
switch self.state {
124+
case .error(let error):
125+
assert(self.promiseBuffer.count == 0)
126+
responsePromise.fail(error)
127+
promise?.fail(error)
128+
case .operational:
129+
self.promiseBuffer.append(responsePromise)
130+
context.write(self.wrapOutboundOut(request), promise: promise)
131+
}
132+
}
133+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import NIO
2+
import NIOHTTP1
3+
import Dispatch
4+
5+
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
6+
defer {
7+
try! group.syncShutdownGracefully()
8+
}
9+
10+
func makeHTTPChannel(host: String, port: Int) async throws -> AsyncChannelIO<HTTPRequestHead, NIOHTTPClientResponseFull> {
11+
let channel = try await ClientBootstrap(group: group).connect(host: host, port: port).get()
12+
try await channel.pipeline.addHTTPClientHandlers().get()
13+
try await channel.pipeline.addHandler(NIOHTTPClientResponseAggregator(maxContentLength: 1_000_000))
14+
try await channel.pipeline.addHandler(MakeFullRequestHandler())
15+
return try await AsyncChannelIO<HTTPRequestHead, NIOHTTPClientResponseFull>(channel).start()
16+
}
17+
18+
func main() async {
19+
do {
20+
let channel = try await makeHTTPChannel(host: "httpbin.org", port: 80)
21+
print("OK, connected to \(channel)")
22+
23+
print("Sending request 1", terminator: "")
24+
let response1 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1,
25+
method: .GET,
26+
uri: "/base64/SGVsbG8gV29ybGQsIGZyb20gSFRUUEJpbiEgCg==",
27+
headers: ["host": "httpbin.org"]))
28+
print(", response:", String(buffer: response1.body ?? ByteBuffer()))
29+
30+
print("Sending request 2", terminator: "")
31+
let response2 = try await channel.sendRequest(HTTPRequestHead(version: .http1_1,
32+
method: .GET,
33+
uri: "/get",
34+
headers: ["host": "httpbin.org"]))
35+
print(", response:", String(buffer: response2.body ?? ByteBuffer()))
36+
37+
try await channel.close()
38+
print("all, done")
39+
} catch {
40+
print("ERROR: \(error)")
41+
}
42+
}
43+
44+
let dg = DispatchGroup()
45+
dg.enter()
46+
let task = Task.runDetached {
47+
await main()
48+
dg.leave()
49+
}
50+
dg.wait()

0 commit comments

Comments
 (0)