Skip to content

improves transport api and lifecycle #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.*

@OptIn(ExperimentalStreamsApi::class)
@OptIn(ExperimentalStreamsApi::class, DelicateCoroutinesApi::class)
class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
private val requestStrategy = PrefetchStrategy(64, 0)

private val benchJob = Job()
lateinit var client: RSocket
lateinit var server: Job

lateinit var payload: Payload
lateinit var payloadsFlow: Flow<Payload>
Expand All @@ -40,9 +40,7 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
override fun setup() {
payload = createPayload(payloadSize)
payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } }

val localServer = LocalServer()
server = RSocketServer().bind(localServer) {
val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) {
RSocketRequestHandler {
requestResponse {
it.release()
Expand All @@ -59,14 +57,14 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
}
}
client = runBlocking {
RSocketConnector().connect(localServer)
RSocketConnector().connect(server)
}
}

override fun cleanup() {
runBlocking {
client.job.runCatching { cancelAndJoin() }
server.runCatching { cancelAndJoin() }
client.coroutineContext.job.cancelAndJoin()
benchJob.cancelAndJoin()
}
}

Expand Down
3 changes: 1 addition & 2 deletions examples/interactions/src/jvmMain/kotlin/ReconnectExample.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import kotlinx.coroutines.flow.*

@TransportApi
fun main(): Unit = runBlocking {
val server = LocalServer()
RSocketServer().bind(server) {
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
RSocketRequestHandler {
requestStream { requestPayload ->
val data = requestPayload.data.readText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import kotlinx.coroutines.flow.*

@TransportApi
fun main(): Unit = runBlocking {
val server = LocalServer()
RSocketServer().bind(server) {
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
RSocketRequestHandler {
requestStream { requestPayload ->
val data = requestPayload.data.readText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import kotlinx.coroutines.flow.*


fun main(): Unit = runBlocking {
val server = LocalServer()
RSocketServer().bind(server) {
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
RSocketRequestHandler {
requestChannel { init, request ->
println("Init with: ${init.data.readText()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*

fun main(): Unit = runBlocking {
val server = LocalServer()
RSocketServer().bind(server) {
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
RSocketRequestHandler {
requestResponse {
val data = it.data.readText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*

fun main(): Unit = runBlocking {
val server = LocalServer()
RSocketServer().bind(server) {
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
RSocketRequestHandler {
requestResponse {
val data = it.data.readText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(): Unit = runBlocking {
val server = LocalServer()
RSocketServer().bind(server) {
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
RSocketRequestHandler {
requestStream {
val data = it.data.readText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*

fun main(): Unit = runBlocking {
val server = LocalServer()
RSocketServer().bind(server) {
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
RSocketRequestHandler {
requestResponse {
val clientRequest = it.data.readText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ import kotlinx.coroutines.flow.*


fun main(): Unit = runBlocking {

val server = LocalServer()
RSocketServer().bind(server) {
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
val data = config.setupPayload.metadata?.readText() ?: error("Empty metadata")
RSocketRequestHandler {
when (data) {
Expand All @@ -43,8 +41,8 @@ fun main(): Unit = runBlocking {

suspend fun client1() {
val rSocketClient = RSocketConnector().connect(server)
rSocketClient.job.join()
println("Client 1 canceled: ${rSocketClient.job.isCancelled}")
rSocketClient.coroutineContext.job.join()
println("Client 1 canceled: ${rSocketClient.coroutineContext.job.isCancelled}")
try {
rSocketClient.requestResponse(Payload.Empty)
} catch (e: Throwable) {
Expand Down
8 changes: 4 additions & 4 deletions examples/multiplatform-chat/src/clientMain/kotlin/Api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

import io.ktor.client.*
import io.ktor.client.features.websocket.*
import io.ktor.network.selector.*
import io.ktor.util.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.ktor.*
import io.rsocket.kotlin.transport.ktor.client.*
import kotlinx.coroutines.*

class Api(rSocket: RSocket) {
private val proto = ConfiguredProtoBuf
Expand All @@ -42,9 +41,10 @@ suspend fun connectToApiUsingWS(name: String): Api {
return Api(client.rSocket(port = 9000))
}

@OptIn(InternalAPI::class)
suspend fun connectToApiUsingTCP(name: String): Api {
val transport = TcpClientTransport(SelectorManager(), "0.0.0.0", 8000)
val transport = TcpClientTransport("0.0.0.0", 8000, CoroutineExceptionHandler { coroutineContext, throwable ->
println("FAIL: $coroutineContext, $throwable")
})
return Api(connector(name).connect(transport))
}

Expand Down
6 changes: 2 additions & 4 deletions examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
*/

import io.ktor.application.*
import io.ktor.network.selector.*
import io.ktor.routing.*
import io.ktor.server.cio.*
import io.ktor.server.engine.*
import io.ktor.util.*
import io.ktor.websocket.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
Expand All @@ -31,7 +29,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.serialization.*

@OptIn(ExperimentalSerializationApi::class, ExperimentalMetadataApi::class, InternalAPI::class)
@OptIn(ExperimentalSerializationApi::class, ExperimentalMetadataApi::class, DelicateCoroutinesApi::class)
fun main() {
val proto = ConfiguredProtoBuf
val users = Users()
Expand Down Expand Up @@ -97,7 +95,7 @@ fun main() {
}

//start TCP server
rSocketServer.bind(TcpServerTransport(ActorSelectorManager(Dispatchers.IO), port = 9000), acceptor)
rSocketServer.bind(TcpServerTransport(port = 8000), acceptor)

//start WS server
embeddedServer(CIO, port = 9000) {
Expand Down
11 changes: 4 additions & 7 deletions playground/src/commonMain/kotlin/TCP.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,18 @@
* limitations under the License.
*/

import io.ktor.network.selector.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.ktor.*
import kotlin.coroutines.*


suspend fun runTcpClient(dispatcher: CoroutineContext) {
val transport = TcpClientTransport(SelectorManager(dispatcher), "0.0.0.0", 4444)
suspend fun runTcpClient() {
val transport = TcpClientTransport("0.0.0.0", 4444)
RSocketConnector().connect(transport).doSomething()
}

//to test nodejs tcp server
suspend fun testNodeJsServer(dispatcher: CoroutineContext) {
val transport = TcpClientTransport(SelectorManager(dispatcher), "127.0.0.1", 9000)
suspend fun testNodeJsServer() {
val transport = TcpClientTransport("127.0.0.1", 9000)
val client = RSocketConnector().connect(transport)

val response = client.requestResponse(buildPayload { data("Hello from JVM") })
Expand Down
6 changes: 2 additions & 4 deletions playground/src/jvmMain/kotlin/TcpClientApp.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

import kotlinx.coroutines.*
suspend fun main(): Unit = runTcpClient()

suspend fun main(): Unit = runTcpClient(Dispatchers.IO)

//suspend fun main(): Unit = testNodeJsServer(Dispatchers.IO)
//suspend fun main(): Unit = testNodeJsServer()
5 changes: 2 additions & 3 deletions playground/src/jvmMain/kotlin/TcpServerApp.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* limitations under the License.
*/

import io.ktor.network.selector.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.transport.ktor.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

suspend fun runTcpServer(dispatcher: CoroutineContext) {
val transport = TcpServerTransport(SelectorManager(dispatcher), "0.0.0.0", 4444)
RSocketServer().bind(transport, rSocketAcceptor).join()
val transport = TcpServerTransport("0.0.0.0", 4444)
RSocketServer().bindIn(CoroutineScope(dispatcher), transport, rSocketAcceptor).handlerJob.join()
}

suspend fun main(): Unit = runTcpServer(Dispatchers.IO)
5 changes: 1 addition & 4 deletions playground/src/nativeMain/kotlin/TcpApp.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
* limitations under the License.
*/

import io.ktor.util.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

@OptIn(InternalAPI::class)
fun main() {
runBlocking {
runTcpClient(EmptyCoroutineContext)
runTcpClient()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@ import kotlinx.coroutines.*
* That interface isn't stable for inheritance.
*/
@TransportApi
public interface Connection {
public val job: Job

public interface Connection : CoroutineScope {
public val pool: ObjectPool<ChunkBuffer> get() = ChunkBuffer.Pool

public suspend fun send(packet: ByteReadPacket)
public suspend fun receive(): ByteReadPacket
}

@OptIn(TransportApi::class)
internal suspend fun Connection.receiveFrame(): Frame = receive().readFrame(pool)
internal suspend inline fun <T> Connection.receiveFrame(block: (frame: Frame) -> T): T = receive().readFrame(pool).closeOnError(block)

@OptIn(TransportApi::class)
internal suspend fun Connection.sendFrame(frame: Frame) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

public interface RSocket {
public val job: Job
public interface RSocket : CoroutineScope {

public suspend fun metadataPush(metadata: ByteReadPacket) {
metadata.release()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.ktor.utils.io.core.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

public class RSocketRequestHandlerBuilder internal constructor() {
private var metadataPush: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)? = null
Expand Down Expand Up @@ -53,19 +54,29 @@ public class RSocketRequestHandlerBuilder internal constructor() {
requestChannel = block
}

internal fun build(job: Job): RSocket =
RSocketRequestHandler(job, metadataPush, fireAndForget, requestResponse, requestStream, requestChannel)
internal fun build(parentContext: CoroutineContext): RSocket =
RSocketRequestHandler(
parentContext + Job(parentContext[Job]),
metadataPush,
fireAndForget,
requestResponse,
requestStream,
requestChannel
)
}

@Suppress("FunctionName")
public fun RSocketRequestHandler(parentJob: Job? = null, configure: RSocketRequestHandlerBuilder.() -> Unit): RSocket {
public fun RSocketRequestHandler(
parentContext: CoroutineContext = EmptyCoroutineContext,
configure: RSocketRequestHandlerBuilder.() -> Unit
): RSocket {
val builder = RSocketRequestHandlerBuilder()
builder.configure()
return builder.build(Job(parentJob))
return builder.build(parentContext)
}

private class RSocketRequestHandler(
override val job: Job,
override val coroutineContext: CoroutineContext,
private val metadataPush: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)? = null,
private val fireAndForget: (suspend RSocket.(payload: Payload) -> Unit)? = null,
private val requestResponse: (suspend RSocket.(payload: Payload) -> Payload)? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ public class RSocketConnector internal constructor(
) {

public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) {
null -> connectOnce(transport)
else -> ReconnectableRSocket(
logger = loggerFactory.logger("io.rsocket.kotlin.connection"),
connect = { connectOnce(transport) },
predicate = reconnectPredicate
//TODO current coroutineContext job is overriden by transport coroutineContext jov
null -> withContext(transport.coroutineContext) { connectOnce(transport) }
else -> connectWithReconnect(
transport.coroutineContext,
loggerFactory.logger("io.rsocket.kotlin.connection"),
{ connectOnce(transport) },
reconnectPredicate,
)
}

Expand All @@ -48,7 +50,7 @@ public class RSocketConnector internal constructor(
val connectionConfig = try {
connectionConfigProvider()
} catch (cause: Throwable) {
connection.job.cancel("Connection config provider failed", cause)
connection.cancel("Connection config provider failed", cause)
throw cause
}
val setupFrame = SetupFrame(
Expand All @@ -60,7 +62,8 @@ public class RSocketConnector internal constructor(
payload = connectionConfig.setupPayload.copy() //copy needed, as it can be used in acceptor
)
try {
val requester = connection.connect(
val requester = connect(
connection = connection,
isServer = false,
maxFragmentSize = maxFragmentSize,
interceptors = interceptors,
Expand All @@ -72,7 +75,7 @@ public class RSocketConnector internal constructor(
} catch (cause: Throwable) {
connectionConfig.setupPayload.release()
setupFrame.release()
connection.job.cancel("Connection establishment failed", cause)
connection.cancel("Connection establishment failed", cause)
throw cause
}
}
Expand Down
Loading