Skip to content

Commit 36856c9

Browse files
committed
async/await prototype
Signed-off-by: David Nadoba <dnadoba@gmail.com>
1 parent 1985e47 commit 36856c9

File tree

4 files changed

+285
-3
lines changed

4 files changed

+285
-3
lines changed

Package.resolved

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@ let package = Package(
1919
// Transport protocol
2020
.library(name: "RSocketWSTransport", targets: ["RSocketWSTransport"]),
2121
.library(name: "RSocketTCPTransport", targets: ["RSocketTCPTransport"]),
22+
.library(name: "RSocketAsync", targets: ["RSocketAsync"]),
2223

2324
// Examples
2425
.executable(name: "timer-client-example", targets: ["TimerClientExample"]),
2526
.executable(name: "twitter-client-example", targets: ["TwitterClientExample"]),
2627
.executable(name: "vanilla-client-example", targets: ["VanillaClientExample"]),
28+
.executable(name: "async-twitter-client-example", targets: ["AsyncTwitterClientExample"]),
2729
],
2830
dependencies: [
2931
.package(url: "https://github.com/ReactiveCocoa/ReactiveSwift.git", from: "6.6.0"),
30-
.package(url: "https://github.com/apple/swift-nio", from: "2.26.0"),
32+
.package(url: "https://github.com/apple/swift-nio", .revision("4220c7a16a5ee0abb7da150bd3d4444940a20cc2")),
3133
.package(url: "https://github.com/apple/swift-nio-extras", from: "1.8.0"),
3234
.package(url: "https://github.com/apple/swift-nio-transport-services", from: "1.9.2"),
3335
.package(url: "https://github.com/apple/swift-nio-ssl", from: "2.10.4"),
@@ -46,6 +48,11 @@ let package = Package(
4648
"RSocketCore",
4749
.product(name: "ReactiveSwift", package: "ReactiveSwift")
4850
]),
51+
.target(name: "RSocketAsync", dependencies: [
52+
"RSocketCore",
53+
.product(name: "NIO", package: "swift-nio"),
54+
.product(name: "_NIOConcurrency", package: "swift-nio"),
55+
]),
4956

5057
// Channel
5158
.target(name: "RSocketTSChannel", dependencies: [
@@ -135,6 +142,25 @@ let package = Package(
135142
],
136143
path: "Sources/Examples/VanillaClient"
137144
),
145+
.target(
146+
name: "AsyncTwitterClientExample",
147+
dependencies: [
148+
"RSocketCore",
149+
"RSocketNIOChannel",
150+
"RSocketWSTransport",
151+
"RSocketAsync",
152+
.product(name: "ArgumentParser", package: "swift-argument-parser"),
153+
.product(name: "NIO", package: "swift-nio"),
154+
.product(name: "_NIOConcurrency", package: "swift-nio"),
155+
],
156+
path: "Sources/Examples/AsyncTwitterClient",
157+
swiftSettings: [
158+
.unsafeFlags([
159+
"-Xfrontend",
160+
"-enable-experimental-concurrency"
161+
])
162+
]
163+
),
138164
],
139165
swiftLanguageVersions: [.v5]
140166
)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#if compiler(>=5.4) && $AsyncAwait
2+
import ArgumentParser
3+
import Foundation
4+
import NIO
5+
import RSocketAsync
6+
import RSocketCore
7+
import RSocketNIOChannel
8+
import RSocketReactiveSwift
9+
import RSocketWSTransport
10+
11+
func route(_ route: String) -> Data {
12+
let encodedRoute = Data(route.utf8)
13+
precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded")
14+
let encodedRouteLength = Data([UInt8(encodedRoute.count)])
15+
16+
return encodedRouteLength + encodedRoute
17+
}
18+
19+
extension URL: ExpressibleByArgument {
20+
public init?(argument: String) {
21+
guard let url = URL(string: argument) else { return nil }
22+
self = url
23+
}
24+
public var defaultValueDescription: String { description }
25+
}
26+
27+
/// the server-side code can be found here -> https://github.com/rsocket/rsocket-demo/tree/master/src/main/kotlin/io/rsocket/demo/twitter
28+
struct TwitterClientExample: ParsableCommand {
29+
static var configuration = CommandConfiguration(
30+
abstract: "connects to an RSocket endpoint using WebSocket transport, requests a stream at the route `searchTweets` to search for tweets that match the `searchString` and logs all events."
31+
)
32+
33+
@Argument(help: "used to find tweets that match the given string")
34+
var searchString = "spring"
35+
36+
@Option
37+
var url = URL(string: "wss://demo.rsocket.io/rsocket")!
38+
39+
@Option(help: "maximum number of tweets that are taken before it cancels the stream")
40+
var limit = 1000
41+
42+
func run() throws {
43+
let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1)
44+
defer { try! eventLoop.syncShutdownGracefully() }
45+
let promise = eventLoop.next().makePromise(of: Void.self)
46+
promise.completeWithAsync {
47+
try await self.runAsync()
48+
}
49+
try promise.futureResult.wait()
50+
}
51+
func runAsync() async throws {
52+
let bootstrap = ClientBootstrap(
53+
config: ClientSetupConfig(
54+
timeBetweenKeepaliveFrames: 0,
55+
maxLifetime: 30_000,
56+
metadataEncodingMimeType: "message/x.rsocket.routing.v0",
57+
dataEncodingMimeType: "application/json"
58+
),
59+
transport: WSTransport(),
60+
timeout: .seconds(30)
61+
)
62+
let client = try await bootstrap.connect(to: .init(url: url))
63+
64+
let stream = client.requester.requestStream(payload: Payload(
65+
metadata: route("searchTweets"),
66+
data: Data(searchString.utf8)
67+
))
68+
69+
for try await payload in stream.prefix(limit) {
70+
let json = try JSONSerialization.jsonObject(with: payload.data, options: [])
71+
let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted])
72+
let string = String(decoding: data, as: UTF8.self)
73+
print(string)
74+
}
75+
}
76+
}
77+
78+
TwitterClientExample.main()
79+
#endif

Sources/RSocketAsync/AsyncAwait.swift

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
#if compiler(>=5.4) && $AsyncAwait
2+
import RSocketCore
3+
import _Concurrency
4+
import NIO
5+
import _NIOConcurrency
6+
7+
public protocol RSocket {
8+
func requestResponse(payload: Payload) async throws -> Payload
9+
func requestStream(payload: Payload) -> AsyncStreamSequence
10+
}
11+
12+
public struct RequesterAdapter: RSocket {
13+
private let requester: RSocketCore.RSocket
14+
private let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1)
15+
public init(requester: RSocketCore.RSocket) {
16+
self.requester = requester
17+
}
18+
public func requestResponse(payload: Payload) async throws -> Payload {
19+
struct RequestResponseOperator: UnidirectionalStream {
20+
var promise: EventLoopPromise<Payload>
21+
func onNext(_ payload: Payload, isCompletion: Bool) {
22+
assert(isCompletion)
23+
promise.succeed(payload)
24+
}
25+
26+
func onComplete() {
27+
assertionFailure("request response does not support \(#function)")
28+
}
29+
30+
func onRequestN(_ requestN: Int32) {
31+
assertionFailure("request response does not support \(#function)")
32+
}
33+
34+
func onCancel() {
35+
promise.fail(Error.canceled(message: "onCancel"))
36+
}
37+
38+
func onError(_ error: Error) {
39+
promise.fail(error)
40+
}
41+
42+
func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) {
43+
assertionFailure("request response does not support \(#function)")
44+
}
45+
}
46+
let promise = eventLoop.next().makePromise(of: Payload.self)
47+
let stream = RequestResponseOperator(promise: promise)
48+
_ = requester.requestResponse(payload: payload, responderStream: stream)
49+
return try await promise.futureResult.get()
50+
}
51+
52+
public func requestStream(payload: Payload) -> AsyncStreamSequence {
53+
AsyncStreamSequence(payload: payload, requester: requester, eventLoop: eventLoop.next())
54+
}
55+
}
56+
57+
public struct AsyncStreamSequence: AsyncSequence {
58+
public typealias AsyncIterator = AsyncStreamIterator
59+
60+
public typealias Element = Payload
61+
62+
fileprivate init(payload: Payload, requester: RSocketCore.RSocket, eventLoop: EventLoop) {
63+
self.payload = payload
64+
self.requester = requester
65+
self.eventLoop = eventLoop
66+
}
67+
private var payload: Payload
68+
private var requester: RSocketCore.RSocket
69+
private var eventLoop: EventLoop
70+
public func makeAsyncIterator() -> AsyncStreamIterator {
71+
let stream = AsyncStreamIterator(eventLoop: eventLoop)
72+
stream.subscription = requester.stream(payload: payload, initialRequestN: 0, responderStream: stream)
73+
return stream
74+
}
75+
}
76+
77+
public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStream {
78+
fileprivate init(
79+
eventLoop: EventLoop
80+
) {
81+
self.eventLoop = eventLoop
82+
}
83+
84+
private enum Event {
85+
case next(Payload, isCompletion: Bool)
86+
case error(Error)
87+
case complete
88+
case cancel
89+
}
90+
private var eventLoop: EventLoop
91+
private var event: EventLoopPromise<Event>? = nil
92+
private var isCompleted: Bool = false
93+
fileprivate var subscription: Subscription! = nil
94+
public func onNext(_ payload: Payload, isCompletion: Bool) {
95+
eventLoop.execute { [self] in
96+
assert(event != nil)
97+
event?.succeed(.next(payload, isCompletion: isCompletion))
98+
}
99+
100+
}
101+
102+
public func onComplete() {
103+
eventLoop.execute { [self] in
104+
assert(event != nil)
105+
event?.succeed(.complete)
106+
}
107+
}
108+
109+
public func onRequestN(_ requestN: Int32) {
110+
assertionFailure("request response does not support \(#function)")
111+
}
112+
113+
public func onCancel() {
114+
eventLoop.execute { [self] in
115+
assert(event != nil)
116+
event?.succeed(.cancel)
117+
}
118+
}
119+
120+
public func onError(_ error: Error) {
121+
eventLoop.execute { [self] in
122+
assert(event != nil)
123+
event?.succeed(.error(error))
124+
}
125+
}
126+
127+
public func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) {
128+
assertionFailure("request response does not support \(#function)")
129+
}
130+
public func next() async throws -> Payload? {
131+
let p = eventLoop.makePromise(of: Optional<Payload>.self)
132+
p.completeWithAsync { [self] in
133+
guard !isCompleted else { return nil }
134+
assert(event == nil)
135+
let promise = eventLoop.makePromise(of: Event.self)
136+
event = promise
137+
subscription.onRequestN(1)
138+
let result = try await promise.futureResult.get()
139+
event = nil
140+
switch result {
141+
case let .next(payload, isCompletion):
142+
self.isCompleted = isCompletion
143+
return payload
144+
case .complete:
145+
self.isCompleted = true
146+
return nil
147+
case .cancel:
148+
self.isCompleted = true
149+
return nil
150+
case let .error(error):
151+
self.isCompleted = true
152+
throw error
153+
}
154+
}
155+
return try await p.futureResult.get()
156+
}
157+
deinit {
158+
subscription.onCancel()
159+
}
160+
}
161+
162+
public struct AsyncClient {
163+
private let coreClient: RSocketCore.CoreClient
164+
165+
public var requester: RSocket { RequesterAdapter(requester: coreClient.requester) }
166+
167+
public init(_ coreClient: RSocketCore.CoreClient) {
168+
self.coreClient = coreClient
169+
}
170+
}
171+
172+
extension RSocketCore.ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket {
173+
public func connect(to endpoint: Transport.Endpoint) async throws -> AsyncClient {
174+
AsyncClient(try await connect(to: endpoint, responder: nil).get())
175+
}
176+
}
177+
#endif

0 commit comments

Comments
 (0)