Skip to content

Split ktor transports implementation #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" }
ktor-client-okhttp = { module = "io.ktor:ktor-client-okhttp", version.ref = "ktor" }
ktor-client-darwin = { module = "io.ktor:ktor-client-darwin", version.ref = "ktor" }
ktor-server-core = { module = "io.ktor:ktor-server-core", version.ref = "ktor" }
ktor-server-host-common = { module = "io.ktor:ktor-server-host-common", version.ref = "ktor" }
ktor-server-websockets = { module = "io.ktor:ktor-server-websockets", version.ref = "ktor" }
ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" }
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.server

import io.ktor.server.application.*
import io.ktor.server.routing.*
import io.rsocket.kotlin.*
import kotlinx.coroutines.*

@OptIn(DelicateCoroutinesApi::class)
public fun Route.rSocket(
path: String? = null,
protocol: String? = null,
acceptor: ConnectionAcceptor,
) {
val serverTransport = serverTransport(path, protocol)
val server = application.plugin(RSocketSupport).server
plugins {
rsocket.template.library
}

server.bind(serverTransport, acceptor)
kotlin {
configureCommon {
main {
dependencies {
api(projects.rsocketCore)
api(projects.rsocketTransportKtor.rsocketTransportKtorWebsocket)
//TODO ContentNegotiation will be here later
}
}
}
configureJvm()
configureJs()
configureNative()
}

description = "Ktor RSocket integration"
35 changes: 35 additions & 0 deletions rsocket-ktor/rsocket-ktor-client/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
rsocket.template.library
}

kotlin {
configureCommon {
main {
dependencies {
api(projects.rsocketKtor)
api(libs.ktor.client.websockets)
}
}
}
configureJvm()
configureJs()
configureNative()
}

description = "Ktor Client RSocket plugin"
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.ktor.client

import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.*
import kotlin.coroutines.*

public suspend fun HttpClient.rSocket(
request: HttpRequestBuilder.() -> Unit,
): RSocket = plugin(RSocketSupport).run {
connector.connect(KtorClientTransport(this@rSocket, request, bufferPool))
}

public suspend fun HttpClient.rSocket(
urlString: String,
secure: Boolean = false,
request: HttpRequestBuilder.() -> Unit = {},
): RSocket = rSocket {
url {
this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS
this.port = protocol.defaultPort
takeFrom(urlString)
}
request()
}

public suspend fun HttpClient.rSocket(
host: String? = null,
port: Int? = null,
path: String? = null,
secure: Boolean = false,
request: HttpRequestBuilder.() -> Unit = {},
): RSocket = rSocket {
url {
this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS
this.port = protocol.defaultPort
set(host = host, port = port, path = path)
}
request()
}

private class KtorClientTransport(
private val client: HttpClient,
private val request: HttpRequestBuilder.() -> Unit,
private val pool: ObjectPool<ChunkBuffer>
) : ClientTransport {
override val coroutineContext: CoroutineContext get() = client.coroutineContext

@TransportApi
override suspend fun connect(): Connection = WebSocketConnection(client.webSocketSession(request), pool)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,34 @@
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.client
package io.rsocket.kotlin.ktor.client

import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.plugins.websocket.*
import io.ktor.util.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.core.*

public class RSocketSupport(
public class RSocketSupport private constructor(
internal val connector: RSocketConnector,
internal val bufferPool: ObjectPool<ChunkBuffer>
) {

public class Config internal constructor() {
public var bufferPool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool
public var connector: RSocketConnector = RSocketConnector()
public fun connector(block: RSocketConnectorBuilder.() -> Unit) {
connector = RSocketConnector(block)
}
}

public companion object Feature : HttpClientPlugin<Config, RSocketSupport> {
public companion object Plugin : HttpClientPlugin<Config, RSocketSupport> {
override val key: AttributeKey<RSocketSupport> = AttributeKey("RSocket")
override fun prepare(block: Config.() -> Unit): RSocketSupport {
val connector = Config().apply(block).connector
return RSocketSupport(connector)
override fun prepare(block: Config.() -> Unit): RSocketSupport = Config().run {
block()
RSocketSupport(connector, bufferPool)
}

override fun install(plugin: RSocketSupport, scope: HttpClient) {
Expand Down
42 changes: 42 additions & 0 deletions rsocket-ktor/rsocket-ktor-server/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
rsocket.template.library
}

kotlin {
configureCommon {
main {
dependencies {
api(projects.rsocketKtor)
api(libs.ktor.server.websockets)
}
}
test {
dependencies {
implementation(projects.rsocketKtor.rsocketKtorClient)
implementation(projects.rsocketTransportTests) //port provider
implementation(libs.ktor.client.cio)
implementation(libs.ktor.server.cio)
}
}
}
configureJvm()
configureNative(NativeTargets.Nix)
}

description = "Ktor Server RSocket Plugin"
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,37 @@
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.server
package io.rsocket.kotlin.ktor.server

import io.ktor.server.application.*
import io.ktor.server.websocket.*
import io.ktor.util.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.core.*

public class RSocketSupport(
public class RSocketSupport private constructor(
internal val server: RSocketServer,
internal val bufferPool: ObjectPool<ChunkBuffer>
) {
public class Config internal constructor() {
public var bufferPool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool
public var server: RSocketServer = RSocketServer()
public fun server(block: RSocketServerBuilder.() -> Unit) {
server = RSocketServer(block)
}
}

public companion object Feature : BaseApplicationPlugin<Application, Config, RSocketSupport> {
override val key: AttributeKey<RSocketSupport> = AttributeKey("RSocket")
override fun install(pipeline: Application, configure: Config.() -> Unit): RSocketSupport {
pipeline.pluginOrNull(WebSockets)
?: error("RSocket require WebSockets to work. You must install WebSockets plugin first.")
val server = Config().apply(configure).server
return RSocketSupport(server)

return Config().run {
configure()
RSocketSupport(server, bufferPool)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.ktor.server

import io.ktor.server.application.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.*
import kotlinx.coroutines.*

public fun Route.rSocket(
path: String? = null,
protocol: String? = null,
acceptor: ConnectionAcceptor,
): Unit = application.plugin(RSocketSupport).run {
server.bindIn(application, KtorServerTransport(this@rSocket, path, protocol, bufferPool), acceptor)
}

private class KtorServerTransport(
private val route: Route,
private val path: String?,
private val protocol: String?,
private val pool: ObjectPool<ChunkBuffer>
) : ServerTransport<Unit> {
@TransportApi
override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit) {
val handler: suspend DefaultWebSocketServerSession.() -> Unit = {
val connection = WebSocketConnection(this, pool)
accept(connection)
}
when (path) {
null -> route.webSocket(protocol, handler)
else -> route.webSocket(path, protocol, handler)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import io.ktor.server.engine.*
import io.ktor.server.routing.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.ktor.client.*
import io.rsocket.kotlin.ktor.server.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.test.*
import io.rsocket.kotlin.transport.ktor.websocket.client.*
import io.rsocket.kotlin.transport.tests.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
Expand All @@ -33,8 +34,8 @@ import io.ktor.client.engine.cio.CIO as ClientCIO
import io.ktor.client.plugins.websocket.WebSockets as ClientWebSockets
import io.ktor.server.cio.CIO as ServerCIO
import io.ktor.server.websocket.WebSockets as ServerWebSockets
import io.rsocket.kotlin.transport.ktor.websocket.client.RSocketSupport as ClientRSocketSupport
import io.rsocket.kotlin.transport.ktor.websocket.server.RSocketSupport as ServerRSocketSupport
import io.rsocket.kotlin.ktor.client.RSocketSupport as ClientRSocketSupport
import io.rsocket.kotlin.ktor.server.RSocketSupport as ServerRSocketSupport

class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck {
private val port = PortProvider.next()
Expand Down
Loading