Skip to content

Improve tests #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions buildSrc/src/main/kotlin/rsocket.template.transport.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plugins {
id("rsocket.template.library")
}

kotlin {
sourceSets {
commonTest {
dependencies {
implementation(project(":rsocket-transport-tests"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import io.rsocket.kotlin.frame.io.*
private const val FlagsMask: Int = 1023
private const val FrameTypeShift: Int = 10

public sealed class Frame : Closeable {
public abstract val type: FrameType
public abstract val streamId: Int
public abstract val flags: Int
internal sealed class Frame : Closeable {
abstract val type: FrameType
abstract val streamId: Int
abstract val flags: Int

protected abstract fun BytePacketBuilder.writeSelf()
protected abstract fun StringBuilder.appendFlags()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.rsocket.kotlin.frame

import io.rsocket.kotlin.frame.io.*

public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empty) {
internal enum class FrameType(val encodedType: Int, flags: Int = Flags.Empty) {
Reserved(0x00),

//CONNECTION
Expand All @@ -32,8 +32,14 @@ public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empt
//REQUEST
RequestFnF(0x05, Flags.CanHaveData or Flags.CanHaveMetadata or Flags.Fragmentable or Flags.Request),
RequestResponse(0x04, Flags.CanHaveData or Flags.CanHaveMetadata or Flags.Fragmentable or Flags.Request),
RequestStream(0x06, Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request),
RequestChannel(0x07, Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request),
RequestStream(
0x06,
Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request
),
RequestChannel(
0x07,
Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request
),

// DURING REQUEST
RequestN(0x08),
Expand All @@ -49,11 +55,11 @@ public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empt

Extension(0x3F, Flags.CanHaveData or Flags.CanHaveMetadata);

public val hasInitialRequest: Boolean = flags check Flags.HasInitialRequest
public val isRequestType: Boolean = flags check Flags.Request
public val isFragmentable: Boolean = flags check Flags.Fragmentable
public val canHaveMetadata: Boolean = flags check Flags.CanHaveMetadata
public val canHaveData: Boolean = flags check Flags.CanHaveData
val hasInitialRequest: Boolean = flags check Flags.HasInitialRequest
val isRequestType: Boolean = flags check Flags.Request
val isFragmentable: Boolean = flags check Flags.Fragmentable
val canHaveMetadata: Boolean = flags check Flags.CanHaveMetadata
val canHaveData: Boolean = flags check Flags.CanHaveData

private object Flags {
const val Empty = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.rsocket.kotlin

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.keepalive.*
Expand All @@ -39,7 +38,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck {
GlobalScope.async { accept(connection) }
}

val deferred = RSocketServer().bind(serverTransport) {
val deferred = TestServer().bind(serverTransport) {
sendingRSocket.complete(requester)
error(errorMessage)
}
Expand Down Expand Up @@ -67,6 +66,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck {
assertFalse(sender.isActive)
expectNoEventsIn(100)
}
connection.coroutineContext.job.join()
val error = connection.coroutineContext.job.getCancellationException().cause
assertTrue(error is RSocketError.Setup.Rejected)
assertEquals(errorMessage, error.message)
Expand All @@ -77,7 +77,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck {
val connection = TestConnection()
val p = payload("setup")
assertFailsWith(IllegalStateException::class, "failed") {
RSocketConnector {
TestConnector {
connectionConfig {
setupPayload { p }
}
Expand All @@ -86,8 +86,9 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck {
assertTrue(p.data.isNotEmpty)
error("failed")
}
}.connect { connection }
}.connect(connection)
}
connection.coroutineContext.job.join()
assertTrue(p.data.isEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,41 @@
* limitations under the License.
*/

package io.rsocket.kotlin.test
package io.rsocket.kotlin

import app.cash.turbine.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.test.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*
import kotlin.test.*
import kotlin.time.*
import kotlin.time.Duration.Companion.seconds

class TestConnection : Connection {
class TestConnection : Connection, ClientTransport {
override val pool: ObjectPool<ChunkBuffer> = InUseTrackingPool
override val coroutineContext: CoroutineContext =
Job() + Dispatchers.Unconfined + CoroutineExceptionHandler { c, e -> println("$c -> $e") }
Job() + Dispatchers.Unconfined + TestExceptionHandler

private val sendChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
private val receiveChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)

init {
coroutineContext.job.invokeOnCompletion {
sendChannel.close(it)
@Suppress("INVISIBLE_MEMBER") receiveChannel.fullClose(it)
receiveChannel.fullClose(it)
}
}

override suspend fun connect(): Connection = this

override suspend fun send(packet: ByteReadPacket) {
sendChannel.send(packet)
}
Expand All @@ -52,17 +57,21 @@ class TestConnection : Connection {
return receiveChannel.receive()
}

suspend fun sendToReceiver(vararg frames: Frame) {
suspend fun ignoreSetupFrame() {
assertEquals(FrameType.Setup, sendChannel.receive().readFrame(InUseTrackingPool).type)
}

internal suspend fun sendToReceiver(vararg frames: Frame) {
frames.forEach {
val packet = @Suppress("INVISIBLE_MEMBER") it.toPacket(InUseTrackingPool)
val packet = it.toPacket(InUseTrackingPool)
receiveChannel.send(packet)
}
}

suspend fun test(validate: suspend FlowTurbine<Frame>.() -> Unit) {
internal suspend fun test(validate: suspend FlowTurbine<Frame>.() -> Unit) {
sendChannel.consumeAsFlow().map {
@Suppress("INVISIBLE_MEMBER") it.readFrame(InUseTrackingPool)
}.test(validate = validate)
it.readFrame(InUseTrackingPool)
}.test(5.seconds, validate = validate)
}
}

Expand All @@ -76,6 +85,6 @@ suspend fun FlowTurbine<*>.expectNoEventsIn(timeMillis: Long) {
expectNoEvents()
}

suspend inline fun FlowTurbine<Frame>.awaitFrame(block: (frame: Frame) -> Unit) {
internal suspend inline fun FlowTurbine<Frame>.awaitFrame(block: (frame: Frame) -> Unit) {
block(awaitItem())
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
* limitations under the License.
*/

package io.rsocket.kotlin.test
package io.rsocket.kotlin

import io.rsocket.kotlin.test.*
import kotlinx.coroutines.*

abstract class TestWithConnection : SuspendTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import app.cash.turbine.*
import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.test.*
import io.rsocket.kotlin.transport.local.*
Expand All @@ -40,10 +39,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
}

private suspend fun start(handler: RSocket? = null): RSocket {
val localServer = RSocketServer {
loggerFactory = LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("SERVER |$it") }
}.bindIn(
CoroutineScope(Dispatchers.Unconfined + testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") }),
val localServer = TestServer().bindIn(
CoroutineScope(Dispatchers.Unconfined + testJob + TestExceptionHandler),
LocalServerTransport(InUseTrackingPool)
) {
handler ?: RSocketRequestHandler {
Expand All @@ -60,8 +57,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
}
}

return RSocketConnector {
loggerFactory = LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("CLIENT |$it") }
return TestConnector {
connectionConfig {
keepAlive = KeepAlive(1000.seconds, 1000.seconds)
}
Expand Down Expand Up @@ -102,8 +98,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
}
}

@Test //ignored on native because of bug inside native coroutines
fun testStreamResponderError() = test(ignoreNative = true) {
@Test
fun testStreamResponderError() = test {
var p: Payload? = null
val requester = start(RSocketRequestHandler {
requestStream {
Expand Down Expand Up @@ -361,13 +357,13 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
private suspend inline fun complete(sendChannel: SendChannel<Payload>, receiveChannel: ReceiveChannel<Payload>) {
sendChannel.close()
delay(100)
assertTrue(receiveChannel.isClosedForReceive)
assertTrue(receiveChannel.isClosedForReceive, "receiveChannel.isClosedForReceive=true")
}

private suspend inline fun cancel(requesterChannel: SendChannel<Payload>, responderChannel: ReceiveChannel<Payload>) {
responderChannel.cancel()
delay(100)
assertTrue(requesterChannel.isClosedForSend)
assertTrue(requesterChannel.isClosedForSend, "requesterChannel.isClosedForSend=true")
}

private suspend fun sendAndCheckReceived(
Expand All @@ -376,8 +372,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
payloads: List<Payload>,
) {
delay(100)
assertFalse(requesterChannel.isClosedForSend)
assertFalse(responderChannel.isClosedForReceive)
assertFalse(requesterChannel.isClosedForSend, "requesterChannel.isClosedForSend=false")
assertFalse(responderChannel.isClosedForReceive, "responderChannel.isClosedForReceive=false")
payloads.forEach { requesterChannel.send(it.copy()) } //TODO?
payloads.forEach { responderChannel.checkReceived(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {

//needed for native
private val fails = atomic(0)
private val first = atomic(true)
private val logger = DefaultLoggerFactory.logger("io.rsocket.kotlin.connection")
private val logger = PrintLogger.withLevel(LoggingLevel.DEBUG).logger("io.rsocket.kotlin.connection")

private suspend fun connectWithReconnect(
connect: suspend () -> RSocket,
Expand Down Expand Up @@ -202,7 +203,7 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
rSocket.requestStream(Payload.Empty).collect()
}

rSocket.requestStream(Payload.Empty).test {
rSocket.requestStream(Payload.Empty).test(5.seconds) {
repeat(5) {
assertEquals(Payload.Empty, awaitItem())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.test.*
import kotlin.test.*

fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) {
internal fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) {
val packet = toPacket(InUseTrackingPool)
writeLength(packet.remaining.toInt())
writePacket(packet)
}

fun ByteReadPacket.toFrameWithLength(): Frame {
internal fun ByteReadPacket.toFrameWithLength(): Frame {
val length = readLength()
assertEquals(length, remaining.toInt())
return readFrame(InUseTrackingPool)
}

fun Frame.loopFrame(): Frame = toPacket(InUseTrackingPool).readFrame(InUseTrackingPool)
internal fun Frame.loopFrame(): Frame = toPacket(InUseTrackingPool).readFrame(InUseTrackingPool)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.rsocket.kotlin.internal

import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.payload.*
Expand All @@ -34,17 +33,13 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
override suspend fun before() {
super.before()

requester = connect(
connection = connection,
isServer = false,
maxFragmentSize = 0,
interceptors = InterceptorsBuilder().build(),
connectionConfig = ConnectionConfig(
keepAlive = KeepAlive(1000.seconds, 1000.seconds),
payloadMimeType = DefaultPayloadMimeType,
setupPayload = Payload.Empty
)
) { RSocketRequestHandler { } }
requester = TestConnector {
connectionConfig {
keepAlive = KeepAlive(1000.seconds, 1000.seconds)
}
}.connect(connection)

connection.ignoreSetupFrame()
}

@Test
Expand Down Expand Up @@ -333,8 +328,8 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
}
}

@Test //ignored on native because of coroutines bug with channels
fun testChannelRequestServerSideCancellation() = test(ignoreNative = true) {
@Test
fun testChannelRequestServerSideCancellation() = test {
var ch: SendChannel<Payload>? = null
val request = channelFlow<Payload> {
ch = this
Expand Down Expand Up @@ -366,7 +361,8 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
}
}

requester.requestChannel(payload("INIT"), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).launchIn(connection)
requester.requestChannel(payload("INIT"), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0))
.launchIn(connection)
connection.test {
awaitFrame { frame ->
assertTrue(frame is RequestFrame)
Expand Down Expand Up @@ -397,10 +393,12 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
}

@Test
fun rrTerminatedOnConnectionClose() = streamIsTerminatedOnConnectionClose { requester.requestResponse(Payload.Empty) }
fun rrTerminatedOnConnectionClose() =
streamIsTerminatedOnConnectionClose { requester.requestResponse(Payload.Empty) }

@Test
fun rsTerminatedOnConnectionClose() = streamIsTerminatedOnConnectionClose { requester.requestStream(Payload.Empty).collect() }
fun rsTerminatedOnConnectionClose() =
streamIsTerminatedOnConnectionClose { requester.requestStream(Payload.Empty).collect() }

@Test
fun rcTerminatedOnConnectionClose() =
Expand Down
Loading