diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt index 43aec89126..596aa5707e 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt @@ -36,3 +36,20 @@ public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt { public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow; } +public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription { + public fun (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V + public fun cancel ()V + public fun close (Ljava/lang/Throwable;)Z + public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel; + public fun getOnSend ()Lkotlinx/coroutines/selects/SelectClause2; + public fun invokeOnClose (Lkotlin/jvm/functions/Function1;)Ljava/lang/Void; + public synthetic fun invokeOnClose (Lkotlin/jvm/functions/Function1;)V + public fun isClosedForSend ()Z + public fun isFull ()Z + public fun offer (Ljava/lang/Object;)Z + public synthetic fun onCompleted (Ljava/lang/Object;)V + public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V + public fun request (J)V + public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt index 6534bfbbce..0e78289e0b 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt @@ -19,6 +19,19 @@ public final class kotlinx/coroutines/reactor/MonoKt { public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono; } +public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement { + public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key; + public fun (Lreactor/util/context/Context;)V + public final fun getContext ()Lreactor/util/context/Context; +} + +public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key { +} + +public final class kotlinx/coroutines/reactor/ReactorContextKt { + public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlin/coroutines/CoroutineContext; +} + public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public fun (Lreactor/core/scheduler/Scheduler;)V public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 843c94c8d6..a1364af8cc 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -76,7 +76,8 @@ private const val CLOSED = -1L // closed, but have not signalled onCompleted/ private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE") -private class PublisherCoroutine( +@InternalCoroutinesApi +public class PublisherCoroutine( parentContext: CoroutineContext, private val subscriber: Subscriber ) : AbstractCoroutine(parentContext, true), ProducerScope, Subscription, SelectClause2> { diff --git a/reactive/kotlinx-coroutines-reactor/build.gradle b/reactive/kotlinx-coroutines-reactor/build.gradle index 72ef6e5623..c73716d494 100644 --- a/reactive/kotlinx-coroutines-reactor/build.gradle +++ b/reactive/kotlinx-coroutines-reactor/build.gradle @@ -12,4 +12,12 @@ tasks.withType(dokka.getClass()) { url = new URL("https://projectreactor.io/docs/core/$reactor_vesion/api/") packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL() } +} + +compileTestKotlin { + kotlinOptions.jvmTarget = "1.8" +} + +compileKotlin { + kotlinOptions.jvmTarget = "1.8" } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 785b4652bb..18b84ac117 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -1,3 +1,4 @@ + /* * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @@ -9,6 +10,8 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.reactive.* +import org.reactivestreams.Publisher +import reactor.core.CoreSubscriber import reactor.core.publisher.* import kotlin.coroutines.* import kotlin.internal.LowPriorityInOverloadResolution @@ -41,8 +44,8 @@ public fun flux( @BuilderInference block: suspend ProducerScope.() -> Unit ): Flux { require(context[Job] === null) { "Flux context cannot contain job in it." + - "Its lifecycle should be managed via Disposable handle. Had $context" } - return Flux.from(publishInternal(GlobalScope, context, block)) + "Its lifecycle should be managed via Disposable handle. Had $context" } + return Flux.from(reactorPublish(GlobalScope, context, block)) } @Deprecated( @@ -55,4 +58,20 @@ public fun CoroutineScope.flux( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit ): Flux = - Flux.from(publishInternal(this, context, block)) + Flux.from(reactorPublish(this, context, block)) + +private fun reactorPublish( + scope: CoroutineScope, + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope.() -> Unit +): Publisher = Publisher { subscriber -> + // specification requires NPE on null subscriber + if (subscriber == null) throw NullPointerException("Subscriber cannot be null") + require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." } + val currentContext = subscriber.currentContext() + val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext() + val newContext = scope.newCoroutineContext(context + reactorContext) + val coroutine = PublisherCoroutine(newContext, subscriber) + subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions + coroutine.start(CoroutineStart.DEFAULT, coroutine, block) +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index a0f65af5c9..b218f6d0c5 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -53,7 +53,8 @@ private fun monoInternal( context: CoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono = Mono.create { sink -> - val newContext = scope.newCoroutineContext(context) + val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext() + val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = MonoCoroutine(newContext, sink) sink.onDispose(coroutine) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) @@ -78,7 +79,7 @@ private class MonoCoroutine( handleCoroutineException(context, cause) } } - + override fun dispose() { disposed = true cancel() @@ -86,4 +87,3 @@ private class MonoCoroutine( override fun isDisposed(): Boolean = disposed } - diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt new file mode 100644 index 0000000000..408c1607ca --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -0,0 +1,50 @@ +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import reactor.util.context.Context +import kotlin.coroutines.* + +/** + * Marks coroutine context element that contains Reactor's [Context] elements in [context] for seamless integration + * between [CoroutineContext] and Reactor's [Context]. + * + * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext]. + * + * Reactor builders: [mono], [flux] can extract the reactor context from their coroutine context and + * pass it on. Modifications of reactor context can be retrieved by `coroutineContext[ReactorContext]`. + * + * Example usage: + * + * Passing reactor context from coroutine builder to reactor entity: + * + * ``` + * launch(Context.of("key", "value").asCoroutineContext()) { + * mono { + * assertEquals(coroutineContext[ReactorContext]!!.context.get("key"), "value") + * }.subscribe() + * } + * ``` + * + * Accessing modified reactor context enriched from downstream via coroutine context: + * + * ``` + * launch { + * mono { + * assertEquals(coroutineContext[ReactorContext]!!.context.get("key"), "value") + * }.subscriberContext(Context.of("key", "value")) + * .subscribe() + * } + * ``` + */ +@ExperimentalCoroutinesApi +public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) { + companion object Key : CoroutineContext.Key +} + + +/** + * Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context + * and later retrieved via `coroutineContext[ReactorContext]`. + */ +@ExperimentalCoroutinesApi +public fun Context.asCoroutineContext(): CoroutineContext = ReactorContext(this) \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt new file mode 100644 index 0000000000..1fb4f0bb64 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -0,0 +1,32 @@ +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.* +import kotlinx.coroutines.reactive.* +import org.junit.Test +import reactor.util.context.Context +import kotlin.test.assertEquals + +class ReactorContextTest { + @Test + fun testMonoHookedContext() = runBlocking { + val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) { + val ctx = coroutineContext[ReactorContext]?.context + buildString { + (1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) } + } + } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) + .subscriberContext { ctx -> ctx.put(6, "6") } + assertEquals(mono.awaitFirst(), "1234567") + } + + @Test + fun testFluxContext() = runBlocking { + val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) { + val ctx = coroutineContext[ReactorContext]!!.context + (1..7).forEach { send(ctx.getOrDefault(it, "noValue")) } + } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) + .subscriberContext { ctx -> ctx.put(6, "6") } + var i = 0 + flux.subscribe { str -> i++; assertEquals(str, i.toString()) } + } +} \ No newline at end of file