Skip to content

Commit 6f59232

Browse files
committed
migrate ktor websocket transport to new API
1 parent a180a6f commit 6f59232

File tree

12 files changed

+536
-2
lines changed

12 files changed

+536
-2
lines changed

rsocket-transports/ktor-websocket-client/api/rsocket-transport-ktor-websocket-client.api

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,24 @@
1+
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport : io/rsocket/kotlin/transport/RSocketTransport {
2+
public static final field Factory Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport$Factory;
3+
public abstract fun target (Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
4+
public abstract fun target (Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
5+
public abstract fun target (Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
6+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
7+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
8+
}
9+
10+
public final class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
11+
}
12+
13+
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
14+
public abstract fun httpEngine (Lio/ktor/client/engine/HttpClientEngine;Lkotlin/jvm/functions/Function1;)V
15+
public abstract fun httpEngine (Lio/ktor/client/engine/HttpClientEngineFactory;Lkotlin/jvm/functions/Function1;)V
16+
public abstract fun httpEngine (Lkotlin/jvm/functions/Function1;)V
17+
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder;Lio/ktor/client/engine/HttpClientEngine;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
18+
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder;Lio/ktor/client/engine/HttpClientEngineFactory;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
19+
public abstract fun webSocketsConfig (Lkotlin/jvm/functions/Function1;)V
20+
}
21+
122
public final class io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransportKt {
223
public static final fun WebSocketClientTransport (Lio/ktor/client/engine/HttpClientEngineFactory;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ClientTransport;
324
public static final fun WebSocketClientTransport (Lio/ktor/client/engine/HttpClientEngineFactory;Ljava/lang/String;ZLkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ClientTransport;

rsocket-transports/ktor-websocket-client/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ kotlin {
3030
sourceSets {
3131
commonMain.dependencies {
3232
implementation(projects.rsocketTransportKtorWebsocketInternal)
33+
implementation(projects.rsocketInternalIo)
3334
api(projects.rsocketCore)
3435
api(libs.ktor.client.core)
3536
api(libs.ktor.client.websockets)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.ktor.websocket.client
18+
19+
import io.ktor.client.*
20+
import io.ktor.client.engine.*
21+
import io.ktor.client.plugins.websocket.*
22+
import io.ktor.client.request.*
23+
import io.ktor.http.*
24+
import io.rsocket.kotlin.internal.io.*
25+
import io.rsocket.kotlin.transport.*
26+
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
27+
import kotlinx.coroutines.*
28+
import kotlin.coroutines.*
29+
30+
public sealed interface KtorWebSocketClientTransport : RSocketTransport {
31+
public fun target(request: HttpRequestBuilder.() -> Unit): RSocketClientTarget
32+
public fun target(urlString: String, request: HttpRequestBuilder.() -> Unit = {}): RSocketClientTarget
33+
34+
public fun target(
35+
method: HttpMethod = HttpMethod.Get,
36+
host: String? = null,
37+
port: Int? = null,
38+
path: String? = null,
39+
request: HttpRequestBuilder.() -> Unit = {},
40+
): RSocketClientTarget
41+
42+
public companion object Factory :
43+
RSocketTransportFactory<KtorWebSocketClientTransport, KtorWebSocketClientTransportBuilder>(::KtorWebSocketClientTransportBuilderImpl)
44+
}
45+
46+
public sealed interface KtorWebSocketClientTransportBuilder : RSocketTransportBuilder<KtorWebSocketClientTransport> {
47+
public fun httpEngine(configure: HttpClientEngineConfig.() -> Unit)
48+
public fun httpEngine(engine: HttpClientEngine, configure: HttpClientEngineConfig.() -> Unit = {})
49+
public fun <T : HttpClientEngineConfig> httpEngine(factory: HttpClientEngineFactory<T>, configure: T.() -> Unit = {})
50+
51+
public fun webSocketsConfig(block: WebSockets.Config.() -> Unit)
52+
}
53+
54+
private class KtorWebSocketClientTransportBuilderImpl : KtorWebSocketClientTransportBuilder {
55+
private var httpClientFactory: HttpClientFactory = HttpClientFactory.Default
56+
private var webSocketsConfig: WebSockets.Config.() -> Unit = {}
57+
58+
override fun httpEngine(configure: HttpClientEngineConfig.() -> Unit) {
59+
this.httpClientFactory = HttpClientFactory.FromConfiguration(configure)
60+
}
61+
62+
override fun httpEngine(engine: HttpClientEngine, configure: HttpClientEngineConfig.() -> Unit) {
63+
this.httpClientFactory = HttpClientFactory.FromEngine(engine, configure)
64+
}
65+
66+
override fun <T : HttpClientEngineConfig> httpEngine(factory: HttpClientEngineFactory<T>, configure: T.() -> Unit) {
67+
this.httpClientFactory = HttpClientFactory.FromFactory(factory, configure)
68+
}
69+
70+
override fun webSocketsConfig(block: WebSockets.Config.() -> Unit) {
71+
this.webSocketsConfig = block
72+
}
73+
74+
@RSocketTransportApi
75+
override fun buildTransport(context: CoroutineContext): KtorWebSocketClientTransport {
76+
val httpClient = httpClientFactory.createHttpClient {
77+
install(WebSockets, webSocketsConfig)
78+
}
79+
// only dispatcher of a client is used - it looks like it's Dispatchers.IO now
80+
val newContext = context.supervisorContext() + (httpClient.coroutineContext[ContinuationInterceptor] ?: EmptyCoroutineContext)
81+
val newJob = newContext.job
82+
val httpClientJob = httpClient.coroutineContext.job
83+
84+
httpClientJob.invokeOnCompletion { newJob.cancel("HttpClient closed", it) }
85+
newJob.invokeOnCompletion { httpClientJob.cancel("KtorWebSocketClientTransport closed", it) }
86+
87+
return KtorWebSocketClientTransportImpl(
88+
coroutineContext = newContext,
89+
httpClient = httpClient,
90+
)
91+
}
92+
}
93+
94+
private class KtorWebSocketClientTransportImpl(
95+
override val coroutineContext: CoroutineContext,
96+
private val httpClient: HttpClient,
97+
) : KtorWebSocketClientTransport {
98+
override fun target(request: HttpRequestBuilder.() -> Unit): RSocketClientTarget = KtorWebSocketClientTargetImpl(
99+
coroutineContext = coroutineContext,
100+
httpClient = httpClient,
101+
request = request
102+
)
103+
104+
override fun target(
105+
urlString: String,
106+
request: HttpRequestBuilder.() -> Unit,
107+
): RSocketClientTarget = target(
108+
method = HttpMethod.Get, host = null, port = null, path = null,
109+
request = {
110+
url.protocol = URLProtocol.WS
111+
url.port = port
112+
113+
url.takeFrom(urlString)
114+
request()
115+
},
116+
)
117+
118+
override fun target(
119+
method: HttpMethod,
120+
host: String?,
121+
port: Int?,
122+
path: String?,
123+
request: HttpRequestBuilder.() -> Unit,
124+
): RSocketClientTarget = target {
125+
this.method = method
126+
url("ws", host, port, path)
127+
request()
128+
}
129+
}
130+
131+
private class KtorWebSocketClientTargetImpl(
132+
override val coroutineContext: CoroutineContext,
133+
private val httpClient: HttpClient,
134+
private val request: HttpRequestBuilder.() -> Unit,
135+
) : RSocketClientTarget {
136+
137+
@RSocketTransportApi
138+
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
139+
httpClient.webSocket(request) {
140+
handler.handleKtorWebSocketConnection(this)
141+
}
142+
}
143+
}
144+
145+
private sealed class HttpClientFactory {
146+
abstract fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient
147+
148+
object Default : HttpClientFactory() {
149+
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(block)
150+
}
151+
152+
class FromConfiguration(
153+
private val configure: HttpClientEngineConfig.() -> Unit,
154+
) : HttpClientFactory() {
155+
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient {
156+
engine(configure)
157+
block()
158+
}
159+
}
160+
161+
class FromEngine(
162+
private val engine: HttpClientEngine,
163+
private val configure: HttpClientEngineConfig.() -> Unit,
164+
) : HttpClientFactory() {
165+
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(engine) {
166+
engine(configure)
167+
block()
168+
}
169+
}
170+
171+
class FromFactory<T : HttpClientEngineConfig>(
172+
private val factory: HttpClientEngineFactory<T>,
173+
private val configure: T.() -> Unit,
174+
) : HttpClientFactory() {
175+
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(factory) {
176+
engine(configure)
177+
block()
178+
}
179+
}
180+
}

rsocket-transports/ktor-websocket-internal/api/rsocket-transport-ktor-websocket-internal.api

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
public final class io/rsocket/kotlin/transport/ktor/websocket/internal/KtorWebSocketConnectionKt {
2+
public static final fun handleKtorWebSocketConnection (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;Lio/ktor/websocket/WebSocketSession;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3+
}
4+
15
public final class io/rsocket/kotlin/transport/ktor/websocket/internal/WebSocketConnection : io/rsocket/kotlin/Connection, kotlinx/coroutines/CoroutineScope {
26
public fun <init> (Lio/ktor/websocket/WebSocketSession;)V
37
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.ktor.websocket.internal
18+
19+
import io.ktor.utils.io.core.*
20+
import io.ktor.websocket.*
21+
import io.rsocket.kotlin.internal.io.*
22+
import io.rsocket.kotlin.transport.*
23+
import io.rsocket.kotlin.transport.internal.*
24+
import kotlinx.coroutines.*
25+
import kotlinx.coroutines.channels.*
26+
27+
@RSocketTransportApi
28+
public suspend fun RSocketConnectionHandler.handleKtorWebSocketConnection(webSocketSession: WebSocketSession): Unit = coroutineScope {
29+
val outboundQueue = PrioritizationFrameQueue(Channel.BUFFERED)
30+
31+
val senderJob = launch {
32+
while (true) webSocketSession.send(outboundQueue.dequeueFrame()?.readBytes() ?: break)
33+
}.onCompletion { outboundQueue.cancel() }
34+
35+
try {
36+
handleConnection(KtorWebSocketConnection(outboundQueue, webSocketSession.incoming))
37+
} finally {
38+
webSocketSession.incoming.cancel()
39+
outboundQueue.close()
40+
withContext(NonCancellable) {
41+
senderJob.join() // await all frames sent
42+
webSocketSession.close()
43+
webSocketSession.coroutineContext.job.join()
44+
}
45+
}
46+
}
47+
48+
@RSocketTransportApi
49+
private class KtorWebSocketConnection(
50+
private val outboundQueue: PrioritizationFrameQueue,
51+
private val inbound: ReceiveChannel<Frame>,
52+
) : RSocketSequentialConnection {
53+
override val isClosedForSend: Boolean get() = outboundQueue.isClosedForSend
54+
55+
override suspend fun sendFrame(streamId: Int, frame: ByteReadPacket) {
56+
return outboundQueue.enqueueFrame(streamId, frame)
57+
}
58+
59+
override suspend fun receiveFrame(): ByteReadPacket? {
60+
val frame = inbound.receiveCatching().getOrNull() ?: return null
61+
return ByteReadPacket(frame.data)
62+
}
63+
}

rsocket-transports/ktor-websocket-server/api/rsocket-transport-ktor-websocket-server.api

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,30 @@
1+
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
2+
public abstract fun getConnectors ()Ljava/util/List;
3+
public abstract fun getPath ()Ljava/lang/String;
4+
public abstract fun getProtocol ()Ljava/lang/String;
5+
}
6+
7+
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport : io/rsocket/kotlin/transport/RSocketTransport {
8+
public static final field Factory Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory;
9+
public abstract fun target (Lio/ktor/server/engine/EngineConnectorConfig;Ljava/lang/String;Ljava/lang/String;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
10+
public abstract fun target (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
11+
public abstract fun target (Ljava/lang/String;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
12+
public abstract fun target (Ljava/util/List;Ljava/lang/String;Ljava/lang/String;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
13+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;Lio/ktor/server/engine/EngineConnectorConfig;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
14+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
15+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;Ljava/lang/String;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
16+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
17+
}
18+
19+
public final class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
20+
}
21+
22+
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
23+
public abstract fun httpEngine (Lio/ktor/server/engine/ApplicationEngineFactory;Lkotlin/jvm/functions/Function1;)V
24+
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransportBuilder;Lio/ktor/server/engine/ApplicationEngineFactory;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
25+
public abstract fun webSocketsConfig (Lkotlin/jvm/functions/Function1;)V
26+
}
27+
128
public final class io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransportKt {
229
public static final fun WebSocketServerTransport (Lio/ktor/server/engine/ApplicationEngineFactory;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ServerTransport;
330
public static final fun WebSocketServerTransport (Lio/ktor/server/engine/ApplicationEngineFactory;[Lio/ktor/server/engine/EngineConnectorConfig;Ljava/lang/String;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ServerTransport;

rsocket-transports/ktor-websocket-server/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ kotlin {
2929
sourceSets {
3030
commonMain.dependencies {
3131
implementation(projects.rsocketTransportKtorWebsocketInternal)
32+
implementation(projects.rsocketInternalIo)
3233
api(projects.rsocketCore)
3334
api(libs.ktor.server.host.common)
3435
api(libs.ktor.server.websockets)

0 commit comments

Comments
 (0)