Skip to content

Rework internals - preparation for fragmentation #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,5 @@ internal suspend fun Connection.receiveFrame(): Frame = receive().readFrame(pool

@OptIn(TransportApi::class)
internal suspend fun Connection.sendFrame(frame: Frame) {
val packet = frame.toPacket(pool)
packet.closeOnError { send(packet) }
frame.toPacket(pool).closeOnError { send(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*

@OptIn(TransportApi::class, RSocketLoggingApi::class)
public class RSocketConnector internal constructor(
Expand All @@ -43,18 +44,34 @@ public class RSocketConnector internal constructor(

private suspend fun connectOnce(transport: ClientTransport): RSocket {
val connection = transport.connect().wrapConnection()
val connectionConfig = connectionConfigProvider()

return connection.connect(isServer = false, interceptors, connectionConfig, acceptor) {
val setupFrame = SetupFrame(
version = Version.Current,
honorLease = false,
keepAlive = connectionConfig.keepAlive,
resumeToken = null,
payloadMimeType = connectionConfig.payloadMimeType,
payload = connectionConfig.setupPayload
val connectionConfig = try {
connectionConfigProvider()
} catch (cause: Throwable) {
connection.job.cancel("Connection config provider failed", cause)
throw cause
}
val setupFrame = SetupFrame(
version = Version.Current,
honorLease = false,
keepAlive = connectionConfig.keepAlive,
resumeToken = null,
payloadMimeType = connectionConfig.payloadMimeType,
payload = connectionConfig.setupPayload.copy() //copy needed, as it can be used in acceptor
)
try {
val requester = connection.connect(
isServer = false,
interceptors = interceptors,
connectionConfig = connectionConfig,
acceptor = acceptor
)
connection.sendFrame(setupFrame)
return requester
} catch (cause: Throwable) {
connectionConfig.setupPayload.release()
setupFrame.release()
connection.job.cancel("Connection establishment failed", cause)
throw cause
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ public class RSocketConnectorBuilder internal constructor() {
)

private companion object {
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor { EmptyRSocket() }
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor {
config.setupPayload.release()
EmptyRSocket()
}

private class EmptyRSocket : RSocket {
override val job: Job = Job()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,40 @@ public class RSocketServer internal constructor(
public fun <T> bind(
transport: ServerTransport<T>,
acceptor: ConnectionAcceptor,
): T = transport.start {
val connection = it.wrapConnection()
val setupFrame = connection.validateSetup()
connection.start(setupFrame, acceptor)
connection.job.join()
}

private suspend fun Connection.start(setupFrame: SetupFrame, acceptor: ConnectionAcceptor) {
val connectionConfig = ConnectionConfig(
keepAlive = setupFrame.keepAlive,
payloadMimeType = setupFrame.payloadMimeType,
setupPayload = setupFrame.payload
)
try {
connect(isServer = true, interceptors, connectionConfig, acceptor)
} catch (e: Throwable) {
failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor"))
}
}
): T = transport.start { it.wrapConnection().bind(acceptor).join() }

private suspend fun Connection.validateSetup(): SetupFrame {
val setupFrame = receiveFrame()
return when {
private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame().closeOnError { setupFrame ->
when {
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
setupFrame.honorLease -> failSetup(RSocketError.Setup.Unsupported("Lease is not supported"))
setupFrame.resumeToken != null -> failSetup(RSocketError.Setup.Unsupported("Resume is not supported"))
else -> setupFrame
else -> try {
connect(
isServer = true,
interceptors = interceptors,
connectionConfig = ConnectionConfig(
keepAlive = setupFrame.keepAlive,
payloadMimeType = setupFrame.payloadMimeType,
setupPayload = setupFrame.payload
),
acceptor = acceptor
)
job
} catch (e: Throwable) {
failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor"))
}
}
}

private fun Connection.wrapConnection(): Connection =
interceptors.wrapConnection(this)
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))

private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
sendFrame(ErrorFrame(0, error))
job.cancel("Connection establishment failed", error)
throw error
}

private fun Connection.wrapConnection(): Connection =
interceptors.wrapConnection(this)
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@
package io.rsocket.kotlin.internal

import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.native.concurrent.*

internal inline fun <T> Closeable.closeOnError(block: () -> T): T {
internal inline fun <T : Closeable, R> T.closeOnError(block: (T) -> R): R {
try {
return block()
return block(this)
} catch (e: Throwable) {
close()
throw e
}
}

internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
cancel(cause?.let { it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) })
}

@SharedImmutable
private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close

@Suppress("FunctionName")
internal fun <E : Closeable> SafeChannel(capacity: Int): Channel<E> = Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)

internal fun <E : Closeable> SendChannel<E>.safeOffer(element: E) {
trySend(element)
.onFailure { element.close() }
.getOrThrow() //TODO
internal fun <E : Closeable> SendChannel<E>.safeTrySend(element: E) {
trySend(element).onFailure { element.close() }
}

internal fun Channel<out Closeable>.fullClose(cause: Throwable?) {
close(cause) // close channel to provide right cause
cancel() // force call of onUndeliveredElement to release buffered elements
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,80 @@

package io.rsocket.kotlin.internal

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.*
import kotlinx.coroutines.*

@OptIn(TransportApi::class)
internal suspend inline fun Connection.connect(
isServer: Boolean,
interceptors: Interceptors,
connectionConfig: ConnectionConfig,
acceptor: ConnectionAcceptor,
beforeStart: () -> Unit = {},
acceptor: ConnectionAcceptor
): RSocket {
val state = RSocketState(this, connectionConfig.keepAlive)
val requester = RSocketRequester(state, StreamId(isServer)).let(interceptors::wrapRequester)
val connectionContext = ConnectionAcceptorContext(connectionConfig, requester)
val requestHandler = with(interceptors.wrapAcceptor(acceptor)) { connectionContext.accept() }.let(interceptors::wrapResponder)
beforeStart()
state.start(requestHandler)
val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive)
val prioritizer = Prioritizer()
val streamsStorage = StreamsStorage(isServer)
val requestJob = SupervisorJob(job)

requestJob.invokeOnCompletion {
prioritizer.close(it)
streamsStorage.cleanup(it)
connectionConfig.setupPayload.release()
}

val requestScope = CoroutineScope(requestJob + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> })
val connectionScope = CoroutineScope(job + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> })

val requester = interceptors.wrapRequester(RSocketRequester(job, prioritizer, streamsStorage, requestScope))
val requestHandler = interceptors.wrapResponder(
with(interceptors.wrapAcceptor(acceptor)) {
ConnectionAcceptorContext(connectionConfig, requester).accept()
}
)

// link completing of connection and requestHandler
job.invokeOnCompletion { requestHandler.job.cancel("Connection closed", it) }
requestHandler.job.invokeOnCompletion { if (it != null) job.cancel("Request handler failed", it) }

// start keepalive ticks
connectionScope.launch {
while (isActive) {
keepAliveHandler.tick()
prioritizer.send(KeepAliveFrame(true, 0, ByteReadPacket.Empty))
}
}

// start sending frames to connection
connectionScope.launch {
while (isActive) {
sendFrame(prioritizer.receive())
}
}

// start frame handling
connectionScope.launch {
val rSocketResponder = RSocketResponder(prioritizer, requestHandler, requestScope)
while (isActive) {
receiveFrame().closeOnError { frame ->
when (frame.streamId) {
0 -> when (frame) {
is MetadataPushFrame -> rSocketResponder.handleMetadataPush(frame.metadata)
is ErrorFrame -> job.cancel("Error frame received on 0 stream", frame.throwable)
is KeepAliveFrame -> {
keepAliveHandler.mark()
if (frame.respond) prioritizer.send(KeepAliveFrame(false, 0, frame.data)) else Unit
}
is LeaseFrame -> frame.release().also { error("lease isn't implemented") }
else -> frame.release()
}
else -> streamsStorage.handleFrame(frame, rSocketResponder)
}
}
}
}

return requester
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,23 @@

package io.rsocket.kotlin.internal

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.keepalive.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*

internal class KeepAliveHandler(
private val keepAlive: KeepAlive,
private val offerFrame: (frame: Frame) -> Unit,
) {
internal class KeepAliveHandler(private val keepAlive: KeepAlive) {
private val lastMark = atomic(currentMillis()) // mark initial timestamp for keepalive

private val lastMark = atomic(currentMillis())

fun receive(frame: KeepAliveFrame) {
fun mark() {
lastMark.value = currentMillis()
if (frame.respond) {
offerFrame(KeepAliveFrame(false, 0, frame.data))
}
}

fun startIn(scope: CoroutineScope) {
scope.launch {
while (isActive) {
delay(keepAlive.intervalMillis.toLong())
if (currentMillis() - lastMark.value >= keepAlive.maxLifetimeMillis) {
//for K/N
scope.cancel("Keep alive failed", RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetimeMillis} ms"))
break
}
offerFrame(KeepAliveFrame(true, 0, ByteReadPacket.Empty))
}
}
// return boolean because of native
suspend fun tick() {
delay(keepAlive.intervalMillis.toLong())
if (currentMillis() - lastMark.value < keepAlive.maxLifetimeMillis) return
throw RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetimeMillis} ms")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@
* limitations under the License.
*/

package io.rsocket.kotlin.internal.flow
package io.rsocket.kotlin.internal

import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.payload.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

internal class LimitingFlowCollector(
private val state: RSocketState,
private val streamId: Int,
initial: Int,
) : FlowCollector<Payload> {
internal suspend inline fun Flow<Payload>.collectLimiting(limiter: Limiter, crossinline action: suspend (value: Payload) -> Unit) {
collect { payload ->
payload.closeOnError {
limiter.useRequest()
action(it)
}
}
}

//TODO revisit 2 atomics
internal class Limiter(initial: Int) {
private val requests = atomic(initial)
private val awaiter = atomic<CancellableContinuation<Unit>?>(null)

Expand All @@ -38,12 +42,7 @@ internal class LimitingFlowCollector(
awaiter.getAndSet(null)?.takeIf(CancellableContinuation<Unit>::isActive)?.resume(Unit)
}

override suspend fun emit(value: Payload): Unit = value.closeOnError {
useRequest()
state.send(NextPayloadFrame(streamId, value))
}

private suspend fun useRequest() {
suspend fun useRequest() {
if (requests.getAndDecrement() > 0) {
currentCoroutineContext().ensureActive()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private class LoggingConnection(
}

override suspend fun send(packet: ByteReadPacket) {
logger.debug { "Send: ${packet.dumpFrameToString()}" }
logger.debug { "Send: ${packet.dumpFrameToString()}" }
delegate.send(packet)
}

Expand Down
Loading