From 4c069fc8f365d59102e964912e64787a8e3158b5 Mon Sep 17 00:00:00 2001 From: sokolova Date: Wed, 24 Jul 2019 17:52:11 +0300 Subject: [PATCH] Reactor coroutine context propagation in more places * Propagation of the coroutine context of await calls into Mono/Flux builder * Publisher.asFlow propagates coroutine context from `collect` call to the Publisher * Flow.asFlux transform Fixes #284 --- .../kotlinx-coroutines-reactive.txt | 27 +++-- .../kotlinx-coroutines-reactor.txt | 9 ++ .../kotlinx-coroutines-reactive/src/Await.kt | 13 ++- .../src/ContextInjector.kt | 14 +++ .../src/FlowAsPublisher.kt | 109 ++++++++++++++++++ .../src/{flow => }/PublisherAsFlow.kt | 19 ++- .../src/flow/FlowAsPublisher.kt | 103 ----------------- .../test/{flow => }/IterableFlowTckTest.kt | 2 +- .../test/{flow => }/PublisherAsFlowTest.kt | 3 +- .../{flow => }/RangePublisherBufferedTest.kt | 2 +- .../test/{flow => }/RangePublisherTest.kt | 2 +- .../UnboundedIntegerIncrementPublisherTest.kt | 2 +- ...otlinx.coroutines.reactive.ContextInjector | 1 + .../src/FlowAsFlux.kt | 29 +++++ .../kotlinx-coroutines-reactor/src/Flux.kt | 2 +- .../src/ReactorContext.kt | 14 +++ .../src/ReactorContextInjector.kt | 26 +++++ .../test/BackpressureTest.kt | 1 - .../test/FlowAsFluxTest.kt | 27 +++++ .../test/ReactorContextTest.kt | 66 ++++++++++- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 9 +- .../test/BackpressureTest.kt | 1 - 22 files changed, 347 insertions(+), 134 deletions(-) create mode 100644 reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt create mode 100644 reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt rename reactive/kotlinx-coroutines-reactive/src/{flow => }/PublisherAsFlow.kt (85%) delete mode 100644 reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt rename reactive/kotlinx-coroutines-reactive/test/{flow => }/IterableFlowTckTest.kt (98%) rename reactive/kotlinx-coroutines-reactive/test/{flow => }/PublisherAsFlowTest.kt (98%) rename reactive/kotlinx-coroutines-reactive/test/{flow => }/RangePublisherBufferedTest.kt (95%) rename reactive/kotlinx-coroutines-reactive/test/{flow => }/RangePublisherTest.kt (97%) rename reactive/kotlinx-coroutines-reactive/test/{flow => }/UnboundedIntegerIncrementPublisherTest.kt (97%) create mode 100644 reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector create mode 100644 reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt create mode 100644 reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt create mode 100644 reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt index 643f64170d..1b35578255 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt @@ -14,11 +14,29 @@ public final class kotlinx/coroutines/reactive/ChannelKt { public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel; } +public abstract interface class kotlinx/coroutines/reactive/ContextInjector { + public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher; +} + public final class kotlinx/coroutines/reactive/ConvertKt { public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher; public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher; } +public final class kotlinx/coroutines/reactive/FlowKt { + public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow; + public static final fun asFlow (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow; + public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher; +} + +public final class kotlinx/coroutines/reactive/FlowSubscription : org/reactivestreams/Subscription { + public final field flow Lkotlinx/coroutines/flow/Flow; + public final field subscriber Lorg/reactivestreams/Subscriber; + public fun (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V + public fun cancel ()V + public fun request (J)V +} + public final class kotlinx/coroutines/reactive/PublishKt { public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; @@ -44,12 +62,3 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } -public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt { - public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher; -} - -public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt { - public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow; - public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow; -} - diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt index 46b35ed71f..9051a49d56 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt @@ -5,6 +5,10 @@ public final class kotlinx/coroutines/reactor/ConvertKt { public static final fun asMono (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Mono; } +public final class kotlinx/coroutines/reactor/FlowKt { + public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux; +} + public final class kotlinx/coroutines/reactor/FluxKt { public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux; public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux; @@ -28,6 +32,11 @@ public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key { } +public final class kotlinx/coroutines/reactor/ReactorContextInjector : kotlinx/coroutines/reactive/ContextInjector { + public fun ()V + public fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher; +} + public final class kotlinx/coroutines/reactor/ReactorContextKt { public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext; } diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index d12a6280eb..f7d08e7e2f 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription +import java.util.* import kotlin.coroutines.* /** @@ -81,6 +82,16 @@ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) // ------------------------ private ------------------------ +// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only. +// If `kotlinx-coroutines-reactor` module is not included, the list is empty. +private val contextInjectors: Array = + ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList().toTypedArray() + +private fun Publisher.injectCoroutineContext(coroutineContext: CoroutineContext) = + contextInjectors.fold(this) { pub, contextInjector -> + contextInjector.injectCoroutineContext(pub, coroutineContext) + } + private enum class Mode(val s: String) { FIRST("awaitFirst"), FIRST_OR_DEFAULT("awaitFirstOrDefault"), @@ -93,7 +104,7 @@ private suspend fun Publisher.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> - subscribe(object : Subscriber { + injectCoroutineContext(cont.context).subscribe(object : Subscriber { private lateinit var subscription: Subscription private var value: T? = null private var seenValue = false diff --git a/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt new file mode 100644 index 0000000000..77181ba2ba --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt @@ -0,0 +1,14 @@ +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.InternalCoroutinesApi +import org.reactivestreams.Publisher +import kotlin.coroutines.CoroutineContext + +/** @suppress */ +@InternalCoroutinesApi +public interface ContextInjector { + /** + * Injects the coroutine context into the context of the publisher. + */ + public fun injectCoroutineContext(publisher: Publisher, coroutineContext: CoroutineContext): Publisher +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt new file mode 100644 index 0000000000..429977a575 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt @@ -0,0 +1,109 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.reactivestreams.* +import java.util.concurrent.atomic.* +import kotlin.coroutines.* + +/** + * Transforms the given flow to a spec-compliant [Publisher]. + */ +@ExperimentalCoroutinesApi +public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) + +/** + * Adapter that transforms [Flow] into TCK-complaint [Publisher]. + * [cancel] invocation cancels the original flow. + */ +@Suppress("PublisherImplementation") +private class FlowAsPublisher(private val flow: Flow) : Publisher { + override fun subscribe(subscriber: Subscriber?) { + if (subscriber == null) throw NullPointerException() + subscriber.onSubscribe(FlowSubscription(flow, subscriber)) + } +} + +/** @suppress */ +@InternalCoroutinesApi +public class FlowSubscription( + @JvmField val flow: Flow, + @JvmField val subscriber: Subscriber +) : Subscription { + @Volatile + private var canceled: Boolean = false + private val requested = AtomicLong(0L) + private val producer: AtomicReference?> = AtomicReference() + + // This is actually optimizable + private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { + try { + consumeFlow() + subscriber.onComplete() + } catch (e: Throwable) { + // Failed with real exception, not due to cancellation + if (!coroutineContext[Job]!!.isCancelled) { + subscriber.onError(e) + } + } + } + + private suspend fun consumeFlow() { + flow.collect { value -> + if (!coroutineContext.isActive) { + subscriber.onComplete() + coroutineContext.ensureActive() + } + + if (requested.get() == 0L) { + suspendCancellableCoroutine { + producer.set(it) + if (requested.get() != 0L) it.resumeSafely() + } + } + + requested.decrementAndGet() + subscriber.onNext(value) + } + } + + override fun cancel() { + canceled = true + job.cancel() + } + + override fun request(n: Long) { + if (n <= 0) { + return + } + + if (canceled) return + + job.start() + var snapshot: Long + var newValue: Long + do { + snapshot = requested.get() + newValue = snapshot + n + if (newValue <= 0L) newValue = Long.MAX_VALUE + } while (!requested.compareAndSet(snapshot, newValue)) + + val prev = producer.get() + if (prev == null || !producer.compareAndSet(prev, null)) return + prev.resumeSafely() + } + + private fun CancellableContinuation.resumeSafely() { + val token = tryResume(Unit) + if (token != null) { + completeResume(token) + } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt similarity index 85% rename from reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt rename to reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt index 50338de605..a8a4b6873c 100644 --- a/reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/PublisherAsFlow.kt @@ -2,14 +2,17 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.reactive import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.internal.* -import kotlinx.coroutines.reactive.* import org.reactivestreams.* +import java.util.* import kotlin.coroutines.* /** @@ -21,13 +24,11 @@ import kotlin.coroutines.* * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements * are discarded. */ -@JvmName("from") @ExperimentalCoroutinesApi public fun Publisher.asFlow(): Flow = PublisherAsFlow(this, 1) @FlowPreview -@JvmName("from") @Deprecated( message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure", level = DeprecationLevel.ERROR, @@ -70,7 +71,7 @@ private class PublisherAsFlow( override suspend fun collect(collector: FlowCollector) { val subscriber = ReactiveSubscriber(capacity, requestSize) - publisher.subscribe(subscriber) + publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber) try { var consumed = 0L while (true) { @@ -127,3 +128,11 @@ private class ReactiveSubscriber( subscription.cancel() } } + +// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only. +// If `kotlinx-coroutines-reactor` module is not included, the list is empty. +private val contextInjectors: List = + ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList() + +private fun Publisher.injectCoroutineContext(coroutineContext: CoroutineContext) = + contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt b/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt deleted file mode 100644 index 05f2391e36..0000000000 --- a/reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.reactive.flow - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import org.reactivestreams.* -import java.util.concurrent.atomic.* -import kotlin.coroutines.* - -/** - * Transforms the given flow to a spec-compliant [Publisher]. - */ -@JvmName("from") -@ExperimentalCoroutinesApi -public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) - -/** - * Adapter that transforms [Flow] into TCK-complaint [Publisher]. - * [cancel] invocation cancels the original flow. - */ -@Suppress("PublisherImplementation") -private class FlowAsPublisher(private val flow: Flow) : Publisher { - - override fun subscribe(subscriber: Subscriber?) { - if (subscriber == null) throw NullPointerException() - subscriber.onSubscribe(FlowSubscription(flow, subscriber)) - } - - private class FlowSubscription(val flow: Flow, val subscriber: Subscriber) : Subscription { - @Volatile - internal var canceled: Boolean = false - private val requested = AtomicLong(0L) - private val producer: AtomicReference?> = AtomicReference() - - // This is actually optimizable - private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) { - try { - consumeFlow() - subscriber.onComplete() - } catch (e: Throwable) { - // Failed with real exception, not due to cancellation - if (!coroutineContext[Job]!!.isCancelled) { - subscriber.onError(e) - } - } - } - - private suspend fun consumeFlow() { - flow.collect { value -> - if (!coroutineContext.isActive) { - subscriber.onComplete() - coroutineContext.ensureActive() - } - - if (requested.get() == 0L) { - suspendCancellableCoroutine { - producer.set(it) - if (requested.get() != 0L) it.resumeSafely() - } - } - - requested.decrementAndGet() - subscriber.onNext(value) - } - } - - override fun cancel() { - canceled = true - job.cancel() - } - - override fun request(n: Long) { - if (n <= 0) { - return - } - - if (canceled) return - - job.start() - var snapshot: Long - var newValue: Long - do { - snapshot = requested.get() - newValue = snapshot + n - if (newValue <= 0L) newValue = Long.MAX_VALUE - } while (!requested.compareAndSet(snapshot, newValue)) - - val prev = producer.get() - if (prev == null || !producer.compareAndSet(prev, null)) return - prev.resumeSafely() - } - - private fun CancellableContinuation.resumeSafely() { - val token = tryResume(Unit) - if (token != null) { - completeResume(token) - } - } - } -} diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt similarity index 98% rename from reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt rename to reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt index 31c5a3c489..5dfd9d537d 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/IterableFlowTckTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IterableFlowTckTest.kt @@ -4,7 +4,7 @@ @file:Suppress("UNCHECKED_CAST") -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import kotlinx.coroutines.flow.* import org.junit.* diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt similarity index 98% rename from reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt rename to reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt index 3f33b33c8b..a37719de64 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt @@ -2,12 +2,11 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* -import kotlinx.coroutines.reactive.* import kotlin.test.* class PublisherAsFlowTest : TestBase() { diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt similarity index 95% rename from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt rename to reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt index 2ff96eb176..b710c59064 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherBufferedTest.kt @@ -2,7 +2,7 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import kotlinx.coroutines.flow.* import org.junit.* diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt similarity index 97% rename from reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt rename to reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt index 1b37ee9974..72d5de5e82 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/RangePublisherTest.kt @@ -2,7 +2,7 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import org.junit.* import org.reactivestreams.* diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt similarity index 97% rename from reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt rename to reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt index 9e611008c2..63d444c19e 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/UnboundedIntegerIncrementPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/UnboundedIntegerIncrementPublisherTest.kt @@ -2,7 +2,7 @@ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.coroutines.reactive.flow +package kotlinx.coroutines.reactive import org.junit.* import org.reactivestreams.example.unicast.AsyncIterablePublisher diff --git a/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector new file mode 100644 index 0000000000..0097ec3539 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/resources/META-INF/services/kotlinx.coroutines.reactive.ContextInjector @@ -0,0 +1 @@ +kotlinx.coroutines.reactor.ReactorContextInjector \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt new file mode 100644 index 0000000000..cb16c38763 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/src/FlowAsFlux.kt @@ -0,0 +1,29 @@ +@file:JvmName("FlowKt") + +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.reactive.FlowSubscription +import reactor.core.CoreSubscriber +import reactor.core.publisher.Flux + +/** + * Converts the given flow to a cold flux. + * The original flow is cancelled when the flux subscriber is disposed. + */ +@ExperimentalCoroutinesApi +public fun Flow.asFlux(): Flux = FlowAsFlux(this) + +private class FlowAsFlux(private val flow: Flow) : Flux() { + override fun subscribe(subscriber: CoreSubscriber?) { + if (subscriber == null) throw NullPointerException() + subscriber.onSubscribe( + FlowSubscription( + flow.flowOn(subscriber.currentContext().asCoroutineContext()), + subscriber + ) + ) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 316146b578..18b84ac117 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -74,4 +74,4 @@ private fun reactorPublish( val coroutine = PublisherCoroutine(newContext, subscriber) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) -} +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 5a4ccd040e..61d75c17a7 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -30,6 +30,20 @@ import kotlin.coroutines.* * .subscribe() * } * ``` + * + * [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance + * is propagated into [mono] and [flux] Reactor builders: + * + * ``` + * launch(Context.of("key", "value").asCoroutineContext()) { + * assertEquals(bar().awaitFirst(), "value") + * } + * + * fun bar(): Mono = mono { + * coroutineContext[ReactorContext]!!.context.get("key") + * } + * ``` +} */ @ExperimentalCoroutinesApi public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) { diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt new file mode 100644 index 0000000000..b7212b979a --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt @@ -0,0 +1,26 @@ +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.reactive.ContextInjector +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.util.context.Context +import kotlin.coroutines.CoroutineContext + +/** @suppress */ +@InternalCoroutinesApi +class ReactorContextInjector : ContextInjector { + /** + * Injects all values from the [ReactorContext] entry of the given coroutine context + * into the downstream [Context] of Reactor's [Publisher] instances of [Mono] or [Flux]. + */ + override fun injectCoroutineContext(publisher: Publisher, coroutineContext: CoroutineContext): Publisher { + val reactorContext = coroutineContext[ReactorContext]?.context ?: return publisher + return when(publisher) { + is Mono -> publisher.subscriberContext(reactorContext) + is Flux -> publisher.subscriberContext(reactorContext) + else -> publisher + } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt index 120cd72ba9..80feaeb865 100644 --- a/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/BackpressureTest.kt @@ -7,7 +7,6 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* -import kotlinx.coroutines.reactive.flow.* import org.junit.Test import reactor.core.publisher.* import kotlin.test.* diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt new file mode 100644 index 0000000000..2f8ce9ac42 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt @@ -0,0 +1,27 @@ +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* +import kotlinx.coroutines.runBlocking +import org.junit.Test +import reactor.core.publisher.Mono +import reactor.util.context.Context +import kotlin.test.assertEquals + +class FlowAsFluxTest { + @Test + fun testFlowToFluxContextPropagation() = runBlocking { + val flux = flow { + (1..4).forEach { i -> emit(m(i).awaitFirst()) } + } .asFlux() + .subscriberContext(Context.of(1, "1")) + .subscriberContext(Context.of(2, "2", 3, "3", 4, "4")) + var i = 0 + flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) } + } + + private fun m(i: Int): Mono = mono { + val ctx = coroutineContext[ReactorContext]?.context + ctx?.getOrDefault(i, "noValue") + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt index 1fb4f0bb64..9e91b4337e 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -3,8 +3,10 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* import kotlinx.coroutines.reactive.* import org.junit.Test +import reactor.core.publisher.* import reactor.util.context.Context import kotlin.test.assertEquals +import kotlinx.coroutines.flow.* class ReactorContextTest { @Test @@ -14,8 +16,8 @@ class ReactorContextTest { buildString { (1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) } } - } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) - .subscriberContext { ctx -> ctx.put(6, "6") } + } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) + .subscriberContext { ctx -> ctx.put(6, "6") } assertEquals(mono.awaitFirst(), "1234567") } @@ -29,4 +31,64 @@ class ReactorContextTest { var i = 0 flux.subscribe { str -> i++; assertEquals(str, i.toString()) } } + + @Test + fun testAwait() = runBlocking(Context.of(3, "3").asCoroutineContext()) { + val result = mono(Context.of(1, "1").asCoroutineContext()) { + val ctx = coroutineContext[ReactorContext]?.context + buildString { + (1..3).forEach { append(ctx?.getOrDefault(it, "noValue")) } + } + } .subscriberContext(Context.of(2, "2")) + .awaitFirst() + assertEquals(result, "123") + } + + @Test + fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) { + assertEquals(m().awaitFirst(), "7") + assertEquals(m().awaitFirstOrDefault("noValue"), "7") + assertEquals(m().awaitFirstOrNull(), "7") + assertEquals(m().awaitFirstOrElse { "noValue" }, "7") + assertEquals(m().awaitLast(), "7") + assertEquals(m().awaitSingle(), "7") + } + + @Test + fun testFluxAwaitContextPropagation() = runBlocking(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) { + assertEquals(f().awaitFirst(), "1") + assertEquals(f().awaitFirstOrDefault("noValue"), "1") + assertEquals(f().awaitFirstOrNull(), "1") + assertEquals(f().awaitFirstOrElse { "noValue" }, "1") + assertEquals(f().awaitLast(), "3") + var i = 0 + f().subscribe { str -> i++; assertEquals(str, i.toString()) } + } + + private fun m(): Mono = mono { + val ctx = coroutineContext[ReactorContext]?.context + ctx?.getOrDefault(7, "noValue") + } + + + private fun f(): Flux = flux { + val ctx = coroutineContext[ReactorContext]?.context + (1..3).forEach { send(ctx?.getOrDefault(it, "noValue")) } + } + + @Test + fun testFlowToFluxContextPropagation() = runBlocking(Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()) { + var i = 0 + bar().collect { str -> + i++; assertEquals(str, i.toString()) + } + assertEquals(i, 3) + } + + suspend fun bar(): Flow { + return flux { + val ctx = coroutineContext[ReactorContext]!!.context + (1..3).forEach { send(ctx.getOrDefault(it, "noValue")) } + }.asFlow() + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index d5678de921..4b12127189 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -8,8 +8,7 @@ import io.reactivex.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* -import kotlinx.coroutines.reactive.flow.* -import org.reactivestreams.* +import kotlinx.coroutines.reactive.* import kotlin.coroutines.* /** @@ -82,7 +81,7 @@ public fun ReceiveChannel.asObservable(context: CoroutineContext): /** * Converts the given flow to a cold observable. - * The original flow is cancelled if the observable subscriber was disposed. + * The original flow is cancelled when the observable subscriber is disposed. */ @JvmName("from") @ExperimentalCoroutinesApi @@ -106,8 +105,8 @@ public fun Flow.asObservable() : Observable = Observable.create { } /** - * Converts the given flow to a cold observable. - * The original flow is cancelled if the flowable subscriber was disposed. + * Converts the given flow to a cold flowable. + * The original flow is cancelled when the flowable subscriber is disposed. */ @JvmName("from") @ExperimentalCoroutinesApi diff --git a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt index 1904334144..ed0bc369c0 100644 --- a/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/BackpressureTest.kt @@ -8,7 +8,6 @@ import io.reactivex.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* -import kotlinx.coroutines.reactive.flow.* import org.junit.Test import kotlin.test.*