Skip to content

Finalize new transport API and improve stability of QUIC implementation #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,6 @@ import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

private val RSocketSupportConfigKey = AttributeKey<RSocketSupportConfig.Internal>("RSocketSupportConfig")
Expand Down Expand Up @@ -66,9 +65,7 @@ private class RSocketSupportTarget(
override val coroutineContext: CoroutineContext get() = client.coroutineContext

@RSocketTransportApi
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
client.webSocket(request) {
handler.handleKtorWebSocketConnection(this)
}
override suspend fun connectClient(): RSocketConnection {
return KtorWebSocketConnection(client.webSocketSession(request))
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@ import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
import kotlinx.coroutines.*

private val RSocketSupportConfigKey = AttributeKey<RSocketSupportConfig.Internal>("RSocketSupportConfig")

Expand Down Expand Up @@ -54,8 +55,8 @@ internal fun Route.rSocketHandler(acceptor: ConnectionAcceptor): suspend Default
val config = application.attributes.getOrNull(RSocketSupportConfigKey)
?: error("Plugin RSocketSupport is not installed. Consider using `install(RSocketSupport)` in server config first.")

val handler = config.server.createHandler(acceptor)
return {
handler.handleKtorWebSocketConnection(this)
config.server.acceptConnection(acceptor, KtorWebSocketConnection(this))
awaitCancellation()
}
}
19 changes: 6 additions & 13 deletions rsocket-core/api/rsocket-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt {
}

public final class io/rsocket/kotlin/core/RSocketServer {
public final fun acceptConnection (Lio/rsocket/kotlin/ConnectionAcceptor;Lio/rsocket/kotlin/transport/RSocketConnection;)V
public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
public final fun createHandler (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketConnectionHandler;
public final fun startServer (Lio/rsocket/kotlin/transport/RSocketServerTarget;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -751,31 +751,24 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketClientTarget : kotlinx/coroutines/CoroutineScope {
public abstract fun connectClient (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;)Lkotlinx/coroutines/Job;
public abstract fun connectClient (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketConnection {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketConnectionHandler {
public abstract fun handleConnection (Lio/rsocket/kotlin/transport/RSocketConnection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract interface class io/rsocket/kotlin/transport/RSocketConnection : kotlinx/coroutines/CoroutineScope {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection : io/rsocket/kotlin/transport/RSocketConnection {
public abstract fun acceptStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun createStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : java/lang/AutoCloseable {
public abstract fun close ()V
public abstract fun isClosedForSend ()Z
public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : kotlinx/coroutines/CoroutineScope {
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun sendFrame (Lkotlinx/io/Buffer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun setSendPriority (I)V
}

public abstract interface class io/rsocket/kotlin/transport/RSocketSequentialConnection : io/rsocket/kotlin/transport/RSocketConnection {
public abstract fun isClosedForSend ()Z
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun sendFrame (ILkotlinx/io/Buffer;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
Expand All @@ -784,7 +777,7 @@ public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstanc
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerTarget : kotlinx/coroutines/CoroutineScope {
public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun startServer (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope {
Expand All @@ -809,7 +802,7 @@ public abstract interface class io/rsocket/kotlin/transport/ServerTransport {
}

public final class io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue {
public fun <init> (I)V
public fun <init> ()V
public final fun cancel ()V
public final fun close ()V
public final fun dequeueFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,23 +20,16 @@ import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import kotlinx.io.*

// send/receive setup, resume, resume ok, lease, error
@RSocketTransportApi
internal abstract class ConnectionEstablishmentContext(
private val frameCodec: FrameCodec,
protected val frameCodec: FrameCodec,
) {
protected abstract suspend fun receiveFrameRaw(): Buffer?
protected abstract suspend fun sendFrame(frame: Buffer)
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))
protected abstract suspend fun receiveConnectionFrameRaw(): Buffer?
protected abstract suspend fun sendConnectionFrameRaw(frame: Buffer)

// only setup|lease|resume|resume_ok|error frames
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
expectedStreamId = 0,
frame = receiveFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
)
protected suspend fun sendFrameConnectionFrame(frame: Frame): Unit = sendConnectionFrameRaw(frameCodec.encodeFrame(frame))

suspend fun sendSetup(
version: Version,
Expand All @@ -45,5 +38,11 @@ internal abstract class ConnectionEstablishmentContext(
resumeToken: Buffer?,
payloadMimeType: PayloadMimeType,
payload: Payload,
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
): Unit = sendFrameConnectionFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))

// only setup|lease|resume|resume_ok|error frames
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
expectedStreamId = 0,
frame = receiveConnectionFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
)
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2024 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,20 +18,15 @@ package io.rsocket.kotlin.connection

import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.operation.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlinx.io.*
import kotlin.coroutines.*

@RSocketTransportApi
internal class ConnectionInbound(
// requestContext
override val coroutineContext: CoroutineContext,
private val requestsScope: CoroutineScope,
private val responder: RSocket,
private val keepAliveHandler: KeepAliveHandler,
) : CoroutineScope {
) {
fun handleFrame(frame: Frame): Unit = when (frame) {
is MetadataPushFrame -> receiveMetadataPush(frame.metadata)
is KeepAliveFrame -> receiveKeepAlive(frame.respond, frame.data, frame.lastPosition)
Expand All @@ -42,9 +37,9 @@ internal class ConnectionInbound(
}

private fun receiveMetadataPush(metadata: Buffer) {
launch {
requestsScope.launch {
responder.metadataPush(metadata)
}.invokeOnCompletion { metadata.close() }
}.invokeOnCompletion { metadata.clear() }
}

@Suppress("UNUSED_PARAMETER") // will be used later
Expand Down
Loading