Skip to content

Commit 1985e47

Browse files
authored
WebSocket Transport (#43)
1 parent 6b01c58 commit 1985e47

File tree

17 files changed

+414
-207
lines changed

17 files changed

+414
-207
lines changed

Package.swift

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ let package = Package(
1717
.library(name: "RSocketNIOChannel", targets: ["RSocketNIOChannel"]),
1818

1919
// Transport protocol
20-
.library(name: "RSocketWebSocketTransport", targets: ["RSocketWebSocketTransport"]),
20+
.library(name: "RSocketWSTransport", targets: ["RSocketWSTransport"]),
2121
.library(name: "RSocketTCPTransport", targets: ["RSocketTCPTransport"]),
2222

2323
// Examples
@@ -60,7 +60,7 @@ let package = Package(
6060
]),
6161

6262
// Transport protocol
63-
.target(name: "RSocketWebSocketTransport", dependencies: [
63+
.target(name: "RSocketWSTransport", dependencies: [
6464
"RSocketCore",
6565
.product(name: "NIO", package: "swift-nio"),
6666
.product(name: "NIOHTTP1", package: "swift-nio"),
@@ -94,14 +94,17 @@ let package = Package(
9494
"ReactiveSwift",
9595
.product(name: "NIO", package: "swift-nio"),
9696
]),
97+
.testTarget(name: "RSocketWSTransportTests", dependencies: [
98+
"RSocketWSTransport"
99+
]),
97100

98101
// Examples
99102
.target(
100103
name: "TimerClientExample",
101104
dependencies: [
102105
"RSocketCore",
103106
"RSocketNIOChannel",
104-
"RSocketWebSocketTransport",
107+
"RSocketWSTransport",
105108
"RSocketReactiveSwift",
106109
.product(name: "ReactiveSwift", package: "ReactiveSwift"),
107110
.product(name: "ArgumentParser", package: "swift-argument-parser"),
@@ -113,7 +116,7 @@ let package = Package(
113116
dependencies: [
114117
"RSocketCore",
115118
"RSocketNIOChannel",
116-
"RSocketWebSocketTransport",
119+
"RSocketWSTransport",
117120
"RSocketReactiveSwift",
118121
.product(name: "ReactiveSwift", package: "ReactiveSwift"),
119122
.product(name: "ArgumentParser", package: "swift-argument-parser"),

Sources/Examples/TimerClient/main.swift

Lines changed: 30 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import ReactiveSwift
44
import RSocketCore
55
import RSocketNIOChannel
66
import RSocketReactiveSwift
7-
import RSocketWebSocketTransport
7+
import RSocketWSTransport
88

99
func route(_ route: String) -> Data {
1010
let encodedRoute = Data(route.utf8)
@@ -14,56 +14,48 @@ func route(_ route: String) -> Data {
1414
return encodedRouteLength + encodedRoute
1515
}
1616

17+
extension URL: ExpressibleByArgument {
18+
public init?(argument: String) {
19+
guard let url = URL(string: argument) else { return nil }
20+
self = url
21+
}
22+
public var defaultValueDescription: String { description }
23+
}
24+
1725
/// the server-side code can be found here -> https://github.com/rsocket/rsocket-demo/tree/master/src/main/kotlin/io/rsocket/demo/timer
1826
struct TimerClientExample: ParsableCommand {
1927
static var configuration = CommandConfiguration(
2028
abstract: "connects to an RSocket endpoint using WebSocket transport, requests a stream at the route `timer` and logs all events."
2129
)
2230

2331
@Option
24-
var host = "demo.rsocket.io/rsocket"
25-
26-
@Option
27-
var port = 80
32+
var url = URL(string: "wss://demo.rsocket.io/rsocket")!
2833

2934
@Option(help: "maximum number of responses that are taken before it cancels the stream")
3035
var limit = 10000
3136

32-
mutating func run() throws {
37+
func run() throws {
3338
let bootstrap = ClientBootstrap(
34-
config: ClientSetupConfig(
35-
timeBetweenKeepaliveFrames: 0,
36-
maxLifetime: 30_000,
37-
metadataEncodingMimeType: "message/x.rsocket.routing.v0",
38-
dataEncodingMimeType: "application/json"
39-
),
40-
transport: WSTransport(),
41-
timeout: .seconds(30)
39+
config: ClientSetupConfig(
40+
timeBetweenKeepaliveFrames: 30_000,
41+
maxLifetime: 60_000,
42+
metadataEncodingMimeType: "message/x.rsocket.routing.v0",
43+
dataEncodingMimeType: "application/json"
44+
),
45+
transport: WSTransport()
4246
)
43-
44-
let clientProducer = bootstrap.connect(host: host, port: port)
45-
46-
let clientProperty = Property<ReactiveSwiftClient?>(initial: nil, then: clientProducer.flatMapError { _ in
47-
.empty
48-
})
49-
50-
let streamSemaphore = DispatchSemaphore(value: 0)
51-
clientProperty
52-
.producer
53-
.skipNil()
54-
.flatMap(.latest) {
55-
$0.requester.requestStream(payload: Payload(
56-
metadata: route("timer"),
57-
data: Data()
58-
))
59-
}
60-
.map() { $0.data }
61-
.logEvents(identifier: "route.timer")
62-
.take(first: limit)
63-
.on(disposed: { streamSemaphore.signal() })
64-
.start()
65-
66-
streamSemaphore.wait()
47+
48+
let client = try bootstrap.connect(to: .init(url: url)).first()!.get()
49+
50+
try client.requester.requestStream(payload: Payload(
51+
metadata: route("timer"),
52+
data: Data()
53+
))
54+
.map() { String.init(decoding: $0.data, as: UTF8.self) }
55+
.logEvents(identifier: "route.timer")
56+
.take(first: limit)
57+
.wait()
58+
.get()
6759
}
6860
}
6961

Sources/Examples/TwitterClient/main.swift

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import ReactiveSwift
44
import RSocketCore
55
import RSocketNIOChannel
66
import RSocketReactiveSwift
7-
import RSocketWebSocketTransport
7+
import RSocketWSTransport
88

99
func route(_ route: String) -> Data {
1010
let encodedRoute = Data(route.utf8)
@@ -14,58 +14,56 @@ func route(_ route: String) -> Data {
1414
return encodedRouteLength + encodedRoute
1515
}
1616

17+
extension URL: ExpressibleByArgument {
18+
public init?(argument: String) {
19+
guard let url = URL(string: argument) else { return nil }
20+
self = url
21+
}
22+
public var defaultValueDescription: String { description }
23+
}
24+
1725
/// the server-side code can be found here -> https://github.com/rsocket/rsocket-demo/tree/master/src/main/kotlin/io/rsocket/demo/twitter
1826
struct TwitterClientExample: ParsableCommand {
1927
static var configuration = CommandConfiguration(
2028
abstract: "connects to an RSocket endpoint using WebSocket transport, requests a stream at the route `searchTweets` to search for tweets that match the `searchString` and logs all events."
2129
)
2230

2331
@Argument(help: "used to find tweets that match the given string")
24-
var searchString = "RSocket"
25-
26-
@Option
27-
var host = "demo.rsocket.io/rsocket"
32+
var searchString = "spring"
2833

2934
@Option
30-
var port = 80
35+
var url = URL(string: "wss://demo.rsocket.io/rsocket")!
3136

3237
@Option(help: "maximum number of tweets that are taken before it cancels the stream")
3338
var limit = 1000
3439

35-
mutating func run() throws {
40+
func run() throws {
3641
let bootstrap = ClientBootstrap(
37-
config: ClientSetupConfig(
38-
timeBetweenKeepaliveFrames: 0,
39-
maxLifetime: 30_000,
40-
metadataEncodingMimeType: "message/x.rsocket.routing.v0",
41-
dataEncodingMimeType: "application/json"
42-
),
43-
transport: WSTransport(),
44-
timeout: .seconds(30)
42+
config: ClientSetupConfig(
43+
timeBetweenKeepaliveFrames: 30_000,
44+
maxLifetime: 60_000,
45+
metadataEncodingMimeType: "message/x.rsocket.routing.v0",
46+
dataEncodingMimeType: "application/json"
47+
),
48+
transport: WSTransport()
4549
)
50+
51+
let client = try bootstrap.connect(to: .init(url: url)).first()!.get()
4652

47-
let clientProducer = bootstrap.connect(host: host, port: port)
48-
49-
let clientProperty = Property<ReactiveSwiftClient?>(initial: nil, then: clientProducer.flatMapError { _ in
50-
.empty
51-
})
52-
53-
let streamSemaphore = DispatchSemaphore(value: 0)
54-
let searchString = self.searchString
55-
clientProperty.producer
56-
.skipNil()
57-
.flatMap(.latest) {
58-
$0.requester.requestStream(payload: Payload(
59-
metadata: route("searchTweets"),
60-
data: Data(searchString.utf8)
61-
))
62-
}
63-
.logEvents(identifier: "route.searchTweets")
64-
.take(first: limit)
65-
.on(disposed: { streamSemaphore.signal() })
66-
.start()
67-
68-
streamSemaphore.wait();
53+
try client.requester.requestStream(payload: Payload(
54+
metadata: route("searchTweets"),
55+
data: Data(searchString.utf8)
56+
))
57+
.attemptMap { payload -> String in
58+
// pretty print json
59+
let json = try JSONSerialization.jsonObject(with: payload.data, options: [])
60+
let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted])
61+
return String(decoding: data, as: UTF8.self)
62+
}
63+
.logEvents(identifier: "route.searchTweets")
64+
.take(first: limit)
65+
.wait()
66+
.get()
6967
}
7068
}
7169

Sources/Examples/VanillaClient/main.swift

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,21 @@ struct VanillaClientExample: ParsableCommand {
1717
@Option
1818
var port = 7000
1919

20-
mutating func run() throws {
20+
func run() throws {
2121
let bootstrap = ClientBootstrap(
22-
config: ClientSetupConfig(
23-
timeBetweenKeepaliveFrames: 0,
24-
maxLifetime: 30_000,
25-
metadataEncodingMimeType: "application/octet-stream",
26-
dataEncodingMimeType: "application/octet-stream"
27-
),
28-
transport: TCPTransport(),
29-
timeout: .seconds(30)
22+
config: ClientSetupConfig(
23+
timeBetweenKeepaliveFrames: 30_000,
24+
maxLifetime: 60_000,
25+
metadataEncodingMimeType: "application/octet-stream",
26+
dataEncodingMimeType: "application/octet-stream"
27+
),
28+
transport: TCPTransport()
3029
)
30+
31+
let client = try bootstrap.connect(to: .init(host: host, port: port)).first()!.get()
3132

32-
let clientProducer = bootstrap.connect(host: host, port: port)
33-
34-
let client: Property<ReactiveSwiftClient?> = Property(initial: nil, then: clientProducer.flatMapError { _ in
35-
.empty
36-
})
37-
38-
let streamProducer: SignalProducer<Payload, Swift.Error> = client.producer.skipNil().flatMap(.latest) {
39-
$0.requester.requestStream(payload: .empty)
40-
}
41-
let requestProducer: SignalProducer<Payload, Swift.Error> = client.producer.skipNil().flatMap(.latest) {
42-
$0.requester.requestResponse(payload: Payload(data: Data("HelloWorld".utf8)))
43-
}
33+
let streamProducer = client.requester.requestStream(payload: .empty)
34+
let requestProducer = client.requester.requestResponse(payload: Payload(data: Data("HelloWorld".utf8)))
4435

4536
streamProducer.logEvents(identifier: "stream1").take(first: 1).start()
4637
streamProducer.logEvents(identifier: "stream3").take(first: 10).start()

Sources/RSocketCore/Channel Handler/SetupWriter.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,18 @@ internal final class SetupWriter: ChannelInboundHandler, RemovableChannelHandler
2727
self.setup = config
2828
self.connectedPromise = connectedPromise
2929
}
30-
30+
31+
func handlerAdded(context: ChannelHandlerContext) {
32+
if context.channel.isActive {
33+
onActive(context: context)
34+
}
35+
}
36+
3137
func channelActive(context: ChannelHandlerContext) {
38+
onActive(context: context)
39+
}
40+
41+
private func onActive(context: ChannelHandlerContext) {
3242
context.writeAndFlush(self.wrapOutboundOut(SetupFrameBody(
3343
honorsLease: false,
3444
version: .current,
@@ -39,7 +49,6 @@ internal final class SetupWriter: ChannelInboundHandler, RemovableChannelHandler
3949
dataEncodingMimeType: setup.dataEncodingMimeType,
4050
payload: setup.payload
4151
).asFrame()), promise: nil)
42-
context.fireChannelActive()
4352
context.channel.pipeline.removeHandler(context: context).eventLoop.assertInEventLoop()
4453
connectedPromise?.succeed(())
4554
}

Sources/RSocketCore/Client/ClientBootstrap.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,9 @@ import NIO
1919
public protocol ClientBootstrap {
2020
associatedtype Client
2121
associatedtype Responder
22-
func connect(host: String, port: Int, responder: Responder?) -> EventLoopFuture<Client>
22+
associatedtype Transport: TransportChannelHandler
23+
func connect(
24+
to endpoint: Transport.Endpoint,
25+
responder: Responder?
26+
) -> EventLoopFuture<Client>
2327
}

Sources/RSocketCore/Client/TransportChannelHandler.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,21 @@
1515
*/
1616

1717
import NIO
18+
import Foundation
19+
20+
public protocol Endpoint {
21+
var host: String { get }
22+
var port: Int { get }
23+
24+
/// if true but TLS is not configured, connecting to this endpoint will fail
25+
var requiresTLS: Bool { get }
26+
}
1827

1928
public protocol TransportChannelHandler {
29+
associatedtype Endpoint: RSocketCore.Endpoint
2030
func addChannelHandler(
2131
channel: Channel,
22-
host: String,
23-
port: Int,
32+
endpoint: Endpoint,
2433
upgradeComplete: @escaping () -> EventLoopFuture<Void>
2534
) -> EventLoopFuture<Void>
2635
}

0 commit comments

Comments
 (0)