Skip to content

Commit 773fede

Browse files
committed
migrate nodejs tcp transport to new API
1 parent 6f59232 commit 773fede

File tree

5 files changed

+279
-2
lines changed

5 files changed

+279
-2
lines changed

rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,18 @@ internal fun ByteReadPacket.withLength(): ByteReadPacket = buildPacket {
2424
writePacket(this@withLength)
2525
}
2626

27-
internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPacket) -> Unit) {
28-
private var expectedFrameLength = 0 //TODO atomic for native
27+
internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPacket) -> Unit) : Closeable {
28+
private var closed = false
29+
private var expectedFrameLength = 0
2930
private val packetBuilder: BytePacketBuilder = BytePacketBuilder()
31+
32+
override fun close() {
33+
packetBuilder.close()
34+
closed = true
35+
}
36+
3037
inline fun write(write: BytePacketBuilder.() -> Unit) {
38+
if (closed) return
3139
packetBuilder.write()
3240
loop()
3341
}
@@ -39,6 +47,7 @@ internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPac
3947
expectedFrameLength = it.readInt24()
4048
if (it.remaining >= expectedFrameLength) build(it) // if has length and frame
4149
}
50+
4251
packetBuilder.size < expectedFrameLength -> return // not enough bytes to read frame
4352
else -> withTemp { build(it) } // enough bytes to read frame
4453
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.nodejs.tcp
18+
19+
import io.rsocket.kotlin.internal.io.*
20+
import io.rsocket.kotlin.transport.*
21+
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
22+
import kotlinx.coroutines.*
23+
import kotlin.coroutines.*
24+
25+
public sealed interface NodejsTcpClientTransport : RSocketTransport {
26+
public fun target(host: String, port: Int): RSocketClientTarget
27+
28+
public companion object Factory :
29+
RSocketTransportFactory<NodejsTcpClientTransport, NodejsTcpClientTransportBuilder>(::NodejsTcpClientTransportBuilderImpl)
30+
}
31+
32+
public sealed interface NodejsTcpClientTransportBuilder : RSocketTransportBuilder<NodejsTcpClientTransport> {
33+
public fun dispatcher(context: CoroutineContext)
34+
public fun inheritDispatcher(): Unit = dispatcher(EmptyCoroutineContext)
35+
}
36+
37+
private class NodejsTcpClientTransportBuilderImpl : NodejsTcpClientTransportBuilder {
38+
private var dispatcher: CoroutineContext = Dispatchers.Default
39+
40+
override fun dispatcher(context: CoroutineContext) {
41+
check(context[Job] == null) { "Dispatcher shouldn't contain job" }
42+
this.dispatcher = context
43+
}
44+
45+
@RSocketTransportApi
46+
override fun buildTransport(context: CoroutineContext): NodejsTcpClientTransport = NodejsTcpClientTransportImpl(
47+
coroutineContext = context.supervisorContext() + dispatcher,
48+
)
49+
}
50+
51+
private class NodejsTcpClientTransportImpl(
52+
override val coroutineContext: CoroutineContext,
53+
) : NodejsTcpClientTransport {
54+
override fun target(host: String, port: Int): RSocketClientTarget = NodejsTcpClientTargetImpl(
55+
coroutineContext = coroutineContext.supervisorContext(),
56+
host = host,
57+
port = port
58+
)
59+
}
60+
61+
private class NodejsTcpClientTargetImpl(
62+
override val coroutineContext: CoroutineContext,
63+
private val host: String,
64+
private val port: Int,
65+
) : RSocketClientTarget {
66+
@RSocketTransportApi
67+
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
68+
val socket = connect(port, host)
69+
handler.handleNodejsTcpConnection(socket)
70+
}
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.nodejs.tcp
18+
19+
import io.ktor.utils.io.core.*
20+
import io.ktor.utils.io.js.*
21+
import io.rsocket.kotlin.internal.io.*
22+
import io.rsocket.kotlin.transport.*
23+
import io.rsocket.kotlin.transport.internal.*
24+
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
25+
import kotlinx.coroutines.*
26+
import kotlinx.coroutines.channels.*
27+
import org.khronos.webgl.*
28+
29+
@RSocketTransportApi
30+
internal suspend fun RSocketConnectionHandler.handleNodejsTcpConnection(socket: Socket): Unit = coroutineScope {
31+
val outboundQueue = PrioritizationFrameQueue(Channel.BUFFERED)
32+
val inbound = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)
33+
34+
val closed = CompletableDeferred<Unit>()
35+
val frameAssembler = FrameWithLengthAssembler { inbound.trySend(it) }
36+
socket.on(
37+
onData = { frameAssembler.write { writeFully(it.buffer) } },
38+
onError = { closed.completeExceptionally(it) },
39+
onClose = {
40+
frameAssembler.close()
41+
if (!it) closed.complete(Unit)
42+
}
43+
)
44+
45+
val writerJob = launch {
46+
while (true) socket.writeFrame(outboundQueue.dequeueFrame() ?: break)
47+
}.onCompletion { outboundQueue.cancel() }
48+
49+
try {
50+
handleConnection(NodejsTcpConnection(outboundQueue, inbound))
51+
} finally {
52+
inbound.cancel()
53+
outboundQueue.close() // will cause `writerJob` completion
54+
// even if it was cancelled, we still need to close socket and await it closure
55+
withContext(NonCancellable) {
56+
writerJob.join()
57+
// close socket
58+
socket.destroy()
59+
closed.join()
60+
}
61+
}
62+
}
63+
64+
@RSocketTransportApi
65+
private class NodejsTcpConnection(
66+
private val outboundQueue: PrioritizationFrameQueue,
67+
private val inbound: ReceiveChannel<ByteReadPacket>,
68+
) : RSocketSequentialConnection {
69+
override val isClosedForSend: Boolean get() = outboundQueue.isClosedForSend
70+
override suspend fun sendFrame(streamId: Int, frame: ByteReadPacket) {
71+
return outboundQueue.enqueueFrame(streamId, frame)
72+
}
73+
74+
override suspend fun receiveFrame(): ByteReadPacket? {
75+
return inbound.receiveCatching().getOrNull()
76+
}
77+
}
78+
79+
private fun Socket.writeFrame(frame: ByteReadPacket) {
80+
val packet = buildPacket {
81+
writeInt24(frame.remaining.toInt())
82+
writePacket(frame)
83+
}
84+
write(Uint8Array(packet.readArrayBuffer()))
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.nodejs.tcp
18+
19+
import io.rsocket.kotlin.internal.io.*
20+
import io.rsocket.kotlin.transport.*
21+
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
22+
import kotlinx.coroutines.*
23+
import kotlin.coroutines.*
24+
25+
public sealed interface NodejsTcpServerInstance : RSocketServerInstance {
26+
public val host: String
27+
public val port: Int
28+
}
29+
30+
public sealed interface NodejsTcpServerTransport : RSocketTransport {
31+
public fun target(host: String, port: Int): RSocketServerTarget<NodejsTcpServerInstance>
32+
33+
public companion object Factory :
34+
RSocketTransportFactory<NodejsTcpServerTransport, NodejsTcpServerTransportBuilder>({ NodejsTcpServerTransportBuilderImpl })
35+
}
36+
37+
public sealed interface NodejsTcpServerTransportBuilder : RSocketTransportBuilder<NodejsTcpServerTransport> {
38+
public fun dispatcher(context: CoroutineContext)
39+
public fun inheritDispatcher(): Unit = dispatcher(EmptyCoroutineContext)
40+
}
41+
42+
private object NodejsTcpServerTransportBuilderImpl : NodejsTcpServerTransportBuilder {
43+
private var dispatcher: CoroutineContext = Dispatchers.Default
44+
45+
override fun dispatcher(context: CoroutineContext) {
46+
check(context[Job] == null) { "Dispatcher shouldn't contain job" }
47+
this.dispatcher = context
48+
}
49+
50+
@RSocketTransportApi
51+
override fun buildTransport(context: CoroutineContext): NodejsTcpServerTransport = NodejsTcpServerTransportImpl(
52+
coroutineContext = context.supervisorContext() + dispatcher,
53+
)
54+
}
55+
56+
private class NodejsTcpServerTransportImpl(
57+
override val coroutineContext: CoroutineContext,
58+
) : NodejsTcpServerTransport {
59+
override fun target(host: String, port: Int): RSocketServerTarget<NodejsTcpServerInstance> = NodejsTcpServerTargetImpl(
60+
coroutineContext = coroutineContext.supervisorContext(),
61+
host = host,
62+
port = port
63+
)
64+
}
65+
66+
private class NodejsTcpServerTargetImpl(
67+
override val coroutineContext: CoroutineContext,
68+
private val host: String,
69+
private val port: Int,
70+
) : RSocketServerTarget<NodejsTcpServerInstance> {
71+
72+
@RSocketTransportApi
73+
override suspend fun startServer(handler: RSocketConnectionHandler): NodejsTcpServerInstance {
74+
currentCoroutineContext().ensureActive()
75+
coroutineContext.ensureActive()
76+
77+
val serverJob = launch {
78+
val handlerScope = CoroutineScope(coroutineContext.supervisorContext())
79+
val server = createServer(port, host, {
80+
coroutineContext.job.cancel("Server closed")
81+
}) {
82+
handlerScope.launch { handler.handleNodejsTcpConnection(it) }
83+
}
84+
try {
85+
awaitCancellation()
86+
} finally {
87+
suspendCoroutine { cont -> server.close { cont.resume(Unit) } }
88+
}
89+
}
90+
91+
return NodejsTcpServerInstanceImpl(
92+
coroutineContext = coroutineContext + serverJob,
93+
host = host,
94+
port = port
95+
)
96+
}
97+
}
98+
99+
@RSocketTransportApi
100+
private class NodejsTcpServerInstanceImpl(
101+
override val coroutineContext: CoroutineContext,
102+
override val host: String,
103+
override val port: Int,
104+
) : NodejsTcpServerInstance

rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,11 @@ class TcpTransportTest : TransportTest() {
3434
server.close()
3535
}
3636
}
37+
38+
class NodejsTcpTransportTest : TransportTest() {
39+
override suspend fun before() {
40+
val port = PortProvider.next()
41+
startServer(NodejsTcpServerTransport(testContext).target("127.0.0.1", port))
42+
client = connectClient(NodejsTcpClientTransport(testContext).target("127.0.0.1", port))
43+
}
44+
}

0 commit comments

Comments
 (0)