Skip to content

Commit eb15c1c

Browse files
committed
netty TCP, WebSocket and QUIC transports
* QUIC implementation is still WIP
1 parent 8745aac commit eb15c1c

File tree

28 files changed

+2448
-0
lines changed

28 files changed

+2448
-0
lines changed

gradle/libs.versions.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ kotlinx-bcv = "0.14.0"
88

99
ktor = "2.3.8"
1010

11+
netty = "4.1.109.Final"
12+
netty-quic = "0.0.62.Final"
13+
14+
# for netty TLS tests
15+
bouncycastle = "1.78"
16+
1117
turbine = "1.0.0"
1218

1319
rsocket-java = "1.1.3"
@@ -37,6 +43,12 @@ ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" }
3743
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
3844
ktor-server-jetty = { module = "io.ktor:ktor-server-jetty", version.ref = "ktor" }
3945

46+
netty-handler = { module = "io.netty:netty-handler", version.ref = "netty" }
47+
netty-codec-http = { module = "io.netty:netty-codec-http", version.ref = "netty" }
48+
netty-codec-quic = { module = "io.netty.incubator:netty-incubator-codec-native-quic", version.ref = "netty-quic" }
49+
50+
bouncycastle = { module = "org.bouncycastle:bcpkix-jdk18on", version.ref = "bouncycastle" }
51+
4052
turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" }
4153

4254
rsocket-java-core = { module = 'io.rsocket:rsocket-core', version.ref = "rsocket-java" }
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
public final class io/rsocket/kotlin/transport/netty/internal/CoroutinesKt {
2+
public static final fun awaitChannel (Lio/netty/channel/ChannelFuture;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3+
public static final fun awaitFuture (Lio/netty/util/concurrent/Future;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4+
public static final fun callOnCancellation (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function1;)V
5+
public static final fun toByteBuf (Lio/ktor/utils/io/core/ByteReadPacket;)Lio/netty/buffer/ByteBuf;
6+
public static final fun toByteReadPacket (Lio/netty/buffer/ByteBuf;)Lio/ktor/utils/io/core/ByteReadPacket;
7+
}
8+
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import rsocketbuild.*
18+
19+
plugins {
20+
id("rsocketbuild.multiplatform-library")
21+
}
22+
23+
description = "rsocket-kotlin Netty transport utils"
24+
25+
kotlin {
26+
jvmTarget()
27+
28+
sourceSets {
29+
jvmMain.dependencies {
30+
implementation(projects.rsocketInternalIo)
31+
api(projects.rsocketCore)
32+
api(libs.netty.handler)
33+
}
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.netty.internal
18+
19+
import io.ktor.utils.io.core.*
20+
import io.netty.buffer.*
21+
import io.netty.channel.*
22+
import io.netty.util.concurrent.*
23+
import kotlinx.coroutines.*
24+
import kotlin.coroutines.*
25+
26+
@Suppress("UNCHECKED_CAST")
27+
public suspend inline fun <T> Future<T>.awaitFuture(): T = suspendCancellableCoroutine { cont ->
28+
addListener {
29+
when {
30+
it.isSuccess -> cont.resume(it.now as T)
31+
else -> cont.resumeWithException(it.cause())
32+
}
33+
}
34+
cont.invokeOnCancellation {
35+
cancel(true)
36+
}
37+
}
38+
39+
public suspend fun ChannelFuture.awaitChannel(): Channel {
40+
awaitFuture()
41+
return channel()
42+
}
43+
44+
// it should be used only for cleanup and so should not really block, only suspend
45+
public inline fun CoroutineScope.callOnCancellation(crossinline block: suspend () -> Unit) {
46+
launch(Dispatchers.Unconfined) {
47+
try {
48+
awaitCancellation()
49+
} catch (cause: Throwable) {
50+
withContext(NonCancellable) {
51+
try {
52+
block()
53+
} catch (suppressed: Throwable) {
54+
cause.addSuppressed(suppressed)
55+
}
56+
}
57+
throw cause
58+
}
59+
}
60+
}
61+
62+
// TODO: what to use: this or ByteReadPacket(msg.nioBuffer())
63+
public fun ByteBuf.toByteReadPacket(): ByteReadPacket = buildPacket { writeFully(nioBuffer()) }
64+
public fun ByteReadPacket.toByteBuf(): ByteBuf = Unpooled.wrappedBuffer(readByteBuffer())
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport : io/rsocket/kotlin/transport/RSocketTransport {
2+
public static final field Factory Lio/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport$Factory;
3+
public abstract fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/RSocketClientTarget;
4+
public abstract fun target (Ljava/net/InetSocketAddress;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
5+
}
6+
7+
public final class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
8+
}
9+
10+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
11+
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
12+
public abstract fun channel (Lkotlin/reflect/KClass;)V
13+
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
14+
public abstract fun codec (Lkotlin/jvm/functions/Function1;)V
15+
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
16+
public abstract fun quicBootstrap (Lkotlin/jvm/functions/Function1;)V
17+
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
18+
}
19+
20+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
21+
public abstract fun getLocalAddress ()Ljava/net/InetSocketAddress;
22+
}
23+
24+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport : io/rsocket/kotlin/transport/RSocketTransport {
25+
public static final field Factory Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport$Factory;
26+
public abstract fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/RSocketServerTarget;
27+
public abstract fun target (Ljava/net/InetSocketAddress;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
28+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport;Ljava/lang/String;IILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
29+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport;Ljava/net/InetSocketAddress;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
30+
}
31+
32+
public final class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
33+
}
34+
35+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
36+
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
37+
public abstract fun channel (Lkotlin/reflect/KClass;)V
38+
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
39+
public abstract fun codec (Lkotlin/jvm/functions/Function1;)V
40+
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
41+
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
42+
}
43+
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import rsocketbuild.*
18+
19+
plugins {
20+
id("rsocketbuild.multiplatform-library")
21+
}
22+
23+
description = "rsocket-kotlin Netty QUIC client/server transport implementation"
24+
25+
kotlin {
26+
jvmTarget()
27+
28+
sourceSets {
29+
jvmMain.dependencies {
30+
implementation(projects.rsocketTransportNettyInternal)
31+
implementation(projects.rsocketInternalIo)
32+
api(projects.rsocketCore)
33+
api(libs.netty.handler)
34+
api(libs.netty.codec.quic)
35+
}
36+
jvmTest.dependencies {
37+
implementation(projects.rsocketTransportTests)
38+
implementation(libs.bouncycastle)
39+
implementation(libs.netty.codec.quic.map {
40+
val javaOsName = System.getProperty("os.name")
41+
val javaOsArch = System.getProperty("os.arch")
42+
val suffix = when {
43+
javaOsName.contains("mac", ignoreCase = true) -> "osx"
44+
javaOsName.contains("linux", ignoreCase = true) -> "linux"
45+
javaOsName.contains("windows", ignoreCase = true) -> "windows"
46+
else -> error("Unknown os.name: $javaOsName")
47+
} + "-" + when (javaOsArch) {
48+
"x86_64", "amd64" -> "x86_64"
49+
"arm64", "aarch64" -> "aarch_64"
50+
else -> error("Unknown os.arch: $javaOsArch")
51+
}
52+
"$it:$suffix"
53+
})
54+
//implementation("ch.qos.logback:logback-classic:1.2.11")
55+
}
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.netty.quic
18+
19+
import io.netty.bootstrap.*
20+
import io.netty.channel.*
21+
import io.netty.channel.ChannelFactory
22+
import io.netty.channel.nio.*
23+
import io.netty.channel.socket.*
24+
import io.netty.channel.socket.nio.*
25+
import io.netty.incubator.codec.quic.*
26+
import io.rsocket.kotlin.internal.io.*
27+
import io.rsocket.kotlin.transport.*
28+
import io.rsocket.kotlin.transport.netty.internal.*
29+
import kotlinx.coroutines.*
30+
import java.net.*
31+
import kotlin.coroutines.*
32+
import kotlin.reflect.*
33+
34+
public sealed interface NettyQuicClientTransport : RSocketTransport {
35+
public fun target(remoteAddress: InetSocketAddress): RSocketClientTarget
36+
public fun target(host: String, port: Int): RSocketClientTarget
37+
38+
public companion object Factory :
39+
RSocketTransportFactory<NettyQuicClientTransport, NettyQuicClientTransportBuilder>(::NettyQuicClientTransportBuilderImpl)
40+
}
41+
42+
public sealed interface NettyQuicClientTransportBuilder : RSocketTransportBuilder<NettyQuicClientTransport> {
43+
public fun channel(cls: KClass<out DatagramChannel>)
44+
public fun channelFactory(factory: ChannelFactory<out DatagramChannel>)
45+
public fun eventLoopGroup(group: EventLoopGroup, manage: Boolean)
46+
47+
public fun bootstrap(block: Bootstrap.() -> Unit)
48+
public fun codec(block: QuicClientCodecBuilder.() -> Unit)
49+
public fun ssl(block: QuicSslContextBuilder.() -> Unit)
50+
public fun quicBootstrap(block: QuicChannelBootstrap.() -> Unit)
51+
}
52+
53+
private class NettyQuicClientTransportBuilderImpl : NettyQuicClientTransportBuilder {
54+
private var channelFactory: ChannelFactory<out DatagramChannel>? = null
55+
private var eventLoopGroup: EventLoopGroup? = null
56+
private var manageEventLoopGroup: Boolean = false
57+
private var bootstrap: (Bootstrap.() -> Unit)? = null
58+
private var codec: (QuicClientCodecBuilder.() -> Unit)? = null
59+
private var ssl: (QuicSslContextBuilder.() -> Unit)? = null
60+
private var quicBootstrap: (QuicChannelBootstrap.() -> Unit)? = null
61+
62+
override fun channel(cls: KClass<out DatagramChannel>) {
63+
this.channelFactory = ReflectiveChannelFactory(cls.java)
64+
}
65+
66+
override fun channelFactory(factory: ChannelFactory<out DatagramChannel>) {
67+
this.channelFactory = factory
68+
}
69+
70+
override fun eventLoopGroup(group: EventLoopGroup, manage: Boolean) {
71+
this.eventLoopGroup = group
72+
this.manageEventLoopGroup = manage
73+
}
74+
75+
override fun bootstrap(block: Bootstrap.() -> Unit) {
76+
bootstrap = block
77+
}
78+
79+
override fun codec(block: QuicClientCodecBuilder.() -> Unit) {
80+
codec = block
81+
}
82+
83+
override fun ssl(block: QuicSslContextBuilder.() -> Unit) {
84+
ssl = block
85+
}
86+
87+
override fun quicBootstrap(block: QuicChannelBootstrap.() -> Unit) {
88+
quicBootstrap = block
89+
}
90+
91+
@RSocketTransportApi
92+
override fun buildTransport(context: CoroutineContext): NettyQuicClientTransport {
93+
val codecHandler = QuicClientCodecBuilder().apply {
94+
// by default, we allow Int.MAX_VALUE of active stream
95+
initialMaxStreamsBidirectional(Int.MAX_VALUE.toLong())
96+
codec?.invoke(this)
97+
ssl?.let {
98+
sslContext(QuicSslContextBuilder.forClient().apply(it).build())
99+
}
100+
}.build()
101+
val bootstrap = Bootstrap().apply {
102+
bootstrap?.invoke(this)
103+
localAddress(0)
104+
handler(codecHandler)
105+
channelFactory(channelFactory ?: ReflectiveChannelFactory(NioDatagramChannel::class.java))
106+
group(eventLoopGroup ?: NioEventLoopGroup())
107+
}
108+
109+
return NettyQuicClientTransportImpl(
110+
coroutineContext = context.supervisorContext() + bootstrap.config().group().asCoroutineDispatcher(),
111+
bootstrap = bootstrap,
112+
quicBootstrap = quicBootstrap,
113+
manageBootstrap = manageEventLoopGroup
114+
)
115+
}
116+
}
117+
118+
private class NettyQuicClientTransportImpl(
119+
override val coroutineContext: CoroutineContext,
120+
private val bootstrap: Bootstrap,
121+
private val quicBootstrap: (QuicChannelBootstrap.() -> Unit)?,
122+
manageBootstrap: Boolean,
123+
) : NettyQuicClientTransport {
124+
init {
125+
if (manageBootstrap) callOnCancellation {
126+
bootstrap.config().group().shutdownGracefully().awaitFuture()
127+
}
128+
}
129+
130+
override fun target(remoteAddress: InetSocketAddress): NettyQuicClientTargetImpl = NettyQuicClientTargetImpl(
131+
coroutineContext = coroutineContext.supervisorContext(),
132+
bootstrap = bootstrap,
133+
quicBootstrap = quicBootstrap,
134+
remoteAddress = remoteAddress
135+
)
136+
137+
override fun target(host: String, port: Int): RSocketClientTarget = target(InetSocketAddress(host, port))
138+
}
139+
140+
private class NettyQuicClientTargetImpl(
141+
override val coroutineContext: CoroutineContext,
142+
private val bootstrap: Bootstrap,
143+
private val quicBootstrap: (QuicChannelBootstrap.() -> Unit)?,
144+
private val remoteAddress: SocketAddress,
145+
) : RSocketClientTarget {
146+
@RSocketTransportApi
147+
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
148+
QuicChannel.newBootstrap(bootstrap.bind().awaitChannel()).also { quicBootstrap?.invoke(it) }
149+
.handler(
150+
NettyQuicConnectionInitializer(handler, coroutineContext, isClient = true)
151+
).remoteAddress(remoteAddress).connect().awaitFuture()
152+
}
153+
}

0 commit comments

Comments
 (0)