diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 484f385ca3..0dda7e51c8 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -437,6 +437,7 @@ public final class kotlinx/coroutines/NonCancellable : kotlin/coroutines/Abstrac public fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job; public fun start ()Z + public fun toString ()Ljava/lang/String; } public final class kotlinx/coroutines/NonDisposableHandle : kotlinx/coroutines/ChildHandle, kotlinx/coroutines/DisposableHandle { diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt index 2afa3136cc..43aec89126 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactive.txt @@ -20,8 +20,11 @@ public final class kotlinx/coroutines/reactive/ConvertKt { } public final class kotlinx/coroutines/reactive/PublishKt { + public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; + public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher; public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher; + public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher; } public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt { diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt index 8afd014863..6534bfbbce 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-reactor.txt @@ -6,12 +6,16 @@ public final class kotlinx/coroutines/reactor/ConvertKt { } public final class kotlinx/coroutines/reactor/FluxKt { + public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux; public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux; + public static synthetic fun flux$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Flux; public static synthetic fun flux$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Flux; } public final class kotlinx/coroutines/reactor/MonoKt { + public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono; public static final fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono; + public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono; public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono; } diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt index 67ef8a131c..54a9663a41 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-rx2.txt @@ -21,7 +21,9 @@ public final class kotlinx/coroutines/rx2/RxChannelKt { } public final class kotlinx/coroutines/rx2/RxCompletableKt { + public static final fun rxCompletable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable; public static final fun rxCompletable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable; + public static synthetic fun rxCompletable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable; public static synthetic fun rxCompletable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable; } @@ -35,17 +37,23 @@ public final class kotlinx/coroutines/rx2/RxConvertKt { } public final class kotlinx/coroutines/rx2/RxFlowableKt { + public static final fun rxFlowable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable; public static final fun rxFlowable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable; + public static synthetic fun rxFlowable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable; public static synthetic fun rxFlowable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable; } public final class kotlinx/coroutines/rx2/RxMaybeKt { + public static final fun rxMaybe (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe; public static final fun rxMaybe (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe; + public static synthetic fun rxMaybe$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe; public static synthetic fun rxMaybe$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe; } public final class kotlinx/coroutines/rx2/RxObservableKt { + public static final fun rxObservable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable; public static final fun rxObservable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable; + public static synthetic fun rxObservable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable; public static synthetic fun rxObservable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable; } @@ -54,7 +62,9 @@ public final class kotlinx/coroutines/rx2/RxSchedulerKt { } public final class kotlinx/coroutines/rx2/RxSingleKt { + public static final fun rxSingle (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single; public static final fun rxSingle (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single; + public static synthetic fun rxSingle$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single; public static synthetic fun rxSingle$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single; } diff --git a/kotlinx-coroutines-core/common/src/NonCancellable.kt b/kotlinx-coroutines-core/common/src/NonCancellable.kt index 3a4faeed8e..c48faea7f8 100644 --- a/kotlinx-coroutines-core/common/src/NonCancellable.kt +++ b/kotlinx-coroutines-core/common/src/NonCancellable.kt @@ -115,4 +115,9 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job { */ @InternalCoroutinesApi override fun attachChild(child: ChildJob): ChildHandle = NonDisposableHandle + + /** @suppress */ + override fun toString(): String { + return "NonCancellable" + } } diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt index 0a10913187..073c7a558b 100644 --- a/kotlinx-coroutines-core/jvm/test/TestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.scheduling.* import org.junit.* import java.util.* import java.util.concurrent.atomic.* +import kotlin.coroutines.* import kotlin.test.* private val VERBOSE = systemProp("test.verbose", false) @@ -213,4 +214,6 @@ public actual open class TestBase actual constructor() { assertTrue(result.exceptionOrNull() is T, "Expected ${T::class}, but had $result") return result.exceptionOrNull()!! as T } + + protected suspend fun currentDispatcher() = coroutineContext[ContinuationInterceptor]!! } diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md index 1da42505ee..0eff27b1ea 100644 --- a/reactive/coroutines-guide-reactive.md +++ b/reactive/coroutines-guide-reactive.md @@ -617,7 +617,7 @@ fun Publisher.fusedFilterMap( context: CoroutineContext, // the context to execute this coroutine in predicate: (T) -> Boolean, // the filter predicate mapper: (T) -> R // the mapper function -) = GlobalScope.publish(context) { +) = publish(context) { collect { // collect the source stream if (predicate(it)) // filter part send(mapper(it)) // map part @@ -638,7 +638,7 @@ fun CoroutineScope.range(start: Int, count: Int) = publish { ```kotlin fun main() = runBlocking { range(1, 5) - .fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" }) + .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" }) .collect { println(it) } // print all the resulting strings } ``` @@ -673,7 +673,7 @@ import kotlin.coroutines.* --> ```kotlin -fun Publisher.takeUntil(context: CoroutineContext, other: Publisher) = GlobalScope.publish(context) { +fun Publisher.takeUntil(context: CoroutineContext, other: Publisher) = publish(context) { this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher val current = this other.openSubscription().consume { // explicitly open channel to Publisher @@ -711,7 +711,7 @@ The following code shows how `takeUntil` works: fun main() = runBlocking { val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms - slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it + slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it } ``` @@ -742,7 +742,7 @@ import kotlin.coroutines.* --> ```kotlin -fun Publisher>.merge(context: CoroutineContext) = GlobalScope.publish(context) { +fun Publisher>.merge(context: CoroutineContext) = publish(context) { collect { pub -> // for each publisher collected launch { // launch a child coroutine pub.collect { send(it) } // resend all element from this publisher @@ -783,7 +783,7 @@ The test code is to use `merge` on `testPub` and to display the results: ```kotlin fun main() = runBlocking { - testPub().merge(coroutineContext).collect { println(it) } // print the whole stream + testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream } ``` @@ -865,7 +865,7 @@ import kotlin.coroutines.CoroutineContext --> ```kotlin -fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish(context) { +fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish(context) { for (x in start until start + count) { delay(time) // wait before sending each number send(x) @@ -915,7 +915,7 @@ import kotlin.coroutines.CoroutineContext --> ```kotlin -fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish(context) { +fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish(context) { for (x in start until start + count) { delay(time) // wait before sending each number send(x) @@ -1067,12 +1067,12 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker [whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html -[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html +[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html [org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html [org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html -[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-flowable.html +[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html diff --git a/reactive/kotlinx-coroutines-reactive/README.md b/reactive/kotlinx-coroutines-reactive/README.md index d7746e1d9b..69691e8e77 100644 --- a/reactive/kotlinx-coroutines-reactive/README.md +++ b/reactive/kotlinx-coroutines-reactive/README.md @@ -33,7 +33,7 @@ Conversion functions: [ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html -[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html +[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html [org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first.html [org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-default.html [org.reactivestreams.Publisher.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-else.html diff --git a/reactive/kotlinx-coroutines-reactive/src/Convert.kt b/reactive/kotlinx-coroutines-reactive/src/Convert.kt index 2be24afec7..a7ae128ea5 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Convert.kt @@ -21,7 +21,7 @@ import kotlin.coroutines.* * @param context -- the coroutine context from which the resulting observable is going to be signalled */ @ObsoleteCoroutinesApi -public fun ReceiveChannel.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = GlobalScope.publish(context) { +public fun ReceiveChannel.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = publish(context) { for (t in this@asPublisher) send(t) } diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index f5ea01e586..843c94c8d6 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -1,7 +1,9 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlinx.coroutines.reactive import kotlinx.atomicfu.* @@ -11,6 +13,7 @@ import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import org.reactivestreams.* import kotlin.coroutines.* +import kotlin.internal.LowPriorityInOverloadResolution /** * Creates cold reactive [Publisher] that runs a given [block] in a coroutine. @@ -26,25 +29,44 @@ import kotlin.coroutines.* * | Normal completion or `close` without cause | `onComplete` * | Failure with exception or `close` with cause | `onError` * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. + * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. - * - * @param context context of the coroutine. - * @param block the coroutine code. */ @ExperimentalCoroutinesApi +public fun publish( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope.() -> Unit +): Publisher { + require(context[Job] === null) { "Publisher context cannot contain job in it." + + "Its lifecycle should be managed via subscription. Had $context" } + return publishInternal(GlobalScope, context, block) +} + +@Deprecated( + message = "CoroutineScope.publish is deprecated in favour of top-level publish", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("publish(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring +@LowPriorityInOverloadResolution public fun CoroutineScope.publish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit +): Publisher = publishInternal(this, context, block) + +/** @suppress For internal use from other reactive integration modules only */ +@InternalCoroutinesApi +public fun publishInternal( + scope: CoroutineScope, // support for legacy publish in scope + context: CoroutineContext, + block: suspend ProducerScope.() -> Unit ): Publisher = Publisher { subscriber -> // specification requires NPE on null subscriber if (subscriber == null) throw NullPointerException("Subscriber cannot be null") - val newContext = newCoroutineContext(context) + val newContext = scope.newCoroutineContext(context) val coroutine = PublisherCoroutine(newContext, subscriber) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index ca1834998e..aaeaa009b3 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -20,7 +20,7 @@ class IntegrationTest( ) : TestBase() { enum class Ctx { - MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context }, + MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) }, DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default }, UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined }; @@ -39,7 +39,7 @@ class IntegrationTest( @Test fun testEmpty(): Unit = runBlocking { - val pub = CoroutineScope(ctx(coroutineContext)).publish { + val pub = publish(ctx(coroutineContext)) { if (delay) delay(1) // does not send anything } @@ -77,7 +77,7 @@ class IntegrationTest( @Test fun testNumbers() = runBlocking { val n = 100 * stressTestMultiplier - val pub = CoroutineScope(ctx(coroutineContext)).publish { + val pub = publish(ctx(coroutineContext)) { for (i in 1..n) { send(i) if (delay) delay(1) @@ -99,8 +99,7 @@ class IntegrationTest( fun testCancelWithoutValue() = runTest { val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { publish { - yield() - expectUnreached() + hang {} }.awaitFirst() } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishParentCancelStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishParentCancelStressTest.kt deleted file mode 100644 index 593671235d..0000000000 --- a/reactive/kotlinx-coroutines-reactive/test/PublishParentCancelStressTest.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.reactive - -import kotlinx.coroutines.* -import org.junit.* -import org.junit.Test -import org.reactivestreams.* -import java.util.concurrent.* -import kotlin.test.* - -public class PublishParentCancelStressTest : TestBase() { - private val dispatcher = newFixedThreadPoolContext(3, "PublishParentCancelStressTest") - private val N_TIMES = 5000 * stressTestMultiplier - - @After - fun tearDown() { - dispatcher.close() - } - - @Test - fun testStress() = runTest { - var unhandled: Throwable? = null - val handler = CoroutineExceptionHandler { _, ex -> unhandled = ex } - repeat(N_TIMES) { - val barrier = CyclicBarrier(4) - // launch parent job for publisher - val parent = GlobalScope.async(dispatcher + handler) { - val publisher = publish { - // BARRIER #1 - child publisher crashes - barrier.await() - throw TestException() - } - var sub: Subscription? = null - publisher.subscribe(object : Subscriber { - override fun onComplete() { error("Cannot be reached") } - override fun onSubscribe(s: Subscription?) { sub = s } - override fun onNext(t: Unit?) { error("Cannot be reached" ) } - override fun onError(t: Throwable?) { - assertTrue(t is TestException) - } - }) - launch { - // BARRIER #3 -- cancel subscription - barrier.await() - sub!!.cancel() - } - // BARRIER #2 -- parent completes - barrier.await() - Unit - } - // BARRIE #4 - go 1-3 together - barrier.await() - // Make sure exception is not lost, but incorporated into parent - val result = kotlin.runCatching { parent.await() } - assertTrue(result.exceptionOrNull() is TestException) - // Make sure unhandled exception handler was not invoked - assertNull(unhandled) - } - } - - private class TestException : Exception() -} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index e022ff1529..4ffa0746ca 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -14,7 +14,7 @@ class PublishTest : TestBase() { @Test fun testBasicEmpty() = runTest { expect(1) - val publisher = publish { + val publisher = publish(currentDispatcher()) { expect(5) } expect(2) @@ -32,7 +32,7 @@ class PublishTest : TestBase() { @Test fun testBasicSingle() = runTest { expect(1) - val publisher = publish { + val publisher = publish(currentDispatcher()) { expect(5) send(42) expect(7) @@ -58,7 +58,7 @@ class PublishTest : TestBase() { @Test fun testBasicError() = runTest { expect(1) - val publisher = publish(NonCancellable) { + val publisher = publish(currentDispatcher()) { expect(5) throw RuntimeException("OK") } @@ -82,66 +82,20 @@ class PublishTest : TestBase() { } @Test - fun testCancelsParentOnFailure() = runTest( - expected = { it is RuntimeException && it.message == "OK" } - ) { - // has parent, so should cancel it on failure - publish { - throw RuntimeException("OK") - }.openSubscription() - } - - @Test - fun testHandleFailureAfterCancel() = runTest( - unhandled = listOf({ it -> it is RuntimeException && it.message == "FAILED" }) - ){ + fun testHandleFailureAfterCancel() = runTest { expect(1) - // Exception should be delivered to CoroutineExceptionHandler, because we create publisher - // with the NonCancellable parent - val publisher = publish(NonCancellable + Dispatchers.Unconfined) { - try { - expect(3) - delay(10000) - } finally { - expect(5) - throw RuntimeException("FAILED") // crash after cancel - } - } - var sub: Subscription? = null - publisher.subscribe(object : Subscriber { - override fun onComplete() { - expectUnreached() - } - - override fun onSubscribe(s: Subscription) { - expect(2) - sub = s - } - - override fun onNext(t: Unit?) { - expectUnreached() - } - override fun onError(t: Throwable?) { - expectUnreached() - } - }) - expect(4) - sub!!.cancel() - finish(6) - } - - @Test - fun testParentHandlesFailure() = runTest { - expect(1) - val deferred = CompletableDeferred() - val publisher = publish(deferred + Dispatchers.Unconfined) { + val eh = CoroutineExceptionHandler { _, t -> + assertTrue(t is RuntimeException) + expect(6) + } + val publisher = publish(Dispatchers.Unconfined + eh) { try { expect(3) delay(10000) } finally { expect(5) - throw TestException("FAILED") + throw RuntimeException("FAILED") // crash after cancel } } var sub: Subscription? = null @@ -165,58 +119,13 @@ class PublishTest : TestBase() { }) expect(4) sub!!.cancel() - - try { - deferred.await() - expectUnreached() - } catch (e: TestException) { - expect(6) - } - finish(7) } - @Test - fun testPublishFailureCancelsParent() = runTest( - expected = { it is TestException } - ) { - expect(1) - val publisher = publish { - expect(5) - throw TestException() - } - expect(2) - publisher.subscribe(object : Subscriber { - override fun onComplete() { - expectUnreached() - } - - override fun onSubscribe(s: Subscription) { - expect(3) - } - - override fun onNext(t: Unit?) { - expectUnreached() - } - - override fun onError(t: Throwable?) { - assertTrue(t is TestException) - expect(6) - } - }) - expect(4) - try { - yield() // to coroutine, will crash because it is a cancelled parent coroutine - } finally { - finish(7) - } - expectUnreached() - } - @Test fun testOnNextError() = runTest { expect(1) - val publisher = publish(NonCancellable) { + val publisher = publish(currentDispatcher()) { expect(4) try { send("OK") @@ -255,7 +164,7 @@ class PublishTest : TestBase() { @Test fun testFailingConsumer() = runTest { - val pub = publish { + val pub = publish(currentDispatcher()) { repeat(3) { expect(it + 1) // expect(1), expect(2) *should* be invoked send(it) @@ -269,4 +178,9 @@ class PublishTest : TestBase() { finish(3) } } + + @Test + fun testIllegalArgumentException() { + assertFailsWith { publish(Job()) { } } + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt index 503a0152ae..258632bcfe 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.reactive @@ -12,7 +12,7 @@ class PublisherBackpressureTest : TestBase() { @Test fun testCancelWhileBPSuspended() = runBlocking { expect(1) - val observable = publish { + val observable = publish(currentDispatcher()) { expect(5) send("A") // will not suspend, because an item was requested expect(7) diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt index cac2f550f7..e238d396a8 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherMultiTest.kt @@ -13,7 +13,7 @@ class PublisherMultiTest : TestBase() { @Test fun testConcurrentStress() = runBlocking { val n = 10_000 * stressTestMultiplier - val observable = GlobalScope.publish { + val observable = publish { // concurrent emitters (many coroutines) val jobs = List(n) { // launch diff --git a/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt index 2e55b8adc2..6816a986d3 100644 --- a/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt @@ -26,28 +26,16 @@ class ReactiveStreamTckTest { private val dispatcher: Dispatcher ) : PublisherVerification(TestEnvironment(500, 500)) { - private val scope = CoroutineScope(dispatcher.dispatcher + NonCancellable) - override fun createPublisher(elements: Long): Publisher = - scope.publish { + publish(dispatcher.dispatcher) { for (i in 1..elements) send(i) } override fun createFailedPublisher(): Publisher = - scope.publish { + publish(dispatcher.dispatcher) { throw TestException() } - @Test - public override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() { - // This test fails on default dispatcher because it retains a reference to the last task - // in the structure of its GlobalQueue - // So we skip it with the default dispatcher. - // todo: remove it when CoroutinesScheduler is improved - if (dispatcher == Dispatcher.DEFAULT) return - super.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() - } - @Test public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() { throw SkipException("Skipped") diff --git a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt index 74f5914d20..3f33b33c8b 100644 --- a/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt @@ -17,7 +17,7 @@ class PublisherAsFlowTest : TestBase() { var onCancelled = 0 var onError = 0 - val publisher = publish { + val publisher = publish(currentDispatcher()) { coroutineContext[Job]?.invokeOnCompletion { if (it is CancellationException) ++onCancelled } @@ -45,7 +45,7 @@ class PublisherAsFlowTest : TestBase() { @Test fun testBufferSize1() = runTest { - val publisher = publish { + val publisher = publish(currentDispatcher()) { expect(1) send(3) @@ -66,7 +66,7 @@ class PublisherAsFlowTest : TestBase() { @Test fun testBufferSize10() = runTest { - val publisher = publish { + val publisher = publish(currentDispatcher()) { expect(1) send(5) @@ -87,7 +87,7 @@ class PublisherAsFlowTest : TestBase() { @Test fun testConflated() = runTest { - val publisher = publish { + val publisher = publish(currentDispatcher()) { for (i in 1..5) send(i) } val list = publisher.asFlow().conflate().toList() @@ -96,7 +96,7 @@ class PublisherAsFlowTest : TestBase() { @Test fun testProduce() = runTest { - val flow = publish { repeat(10) { send(it) } }.asFlow() + val flow = publish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow() check((0..9).toList(), flow.produceIn(this)) check((0..9).toList(), flow.buffer(2).produceIn(this)) check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) @@ -113,7 +113,7 @@ class PublisherAsFlowTest : TestBase() { fun testProduceCancellation() = runTest { expect(1) // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled - val flow = publish { + val flow = publish(currentDispatcher()) { expect(3) repeat(10) { value -> when (value) { diff --git a/reactive/kotlinx-coroutines-reactor/README.md b/reactive/kotlinx-coroutines-reactor/README.md index 1a08834fce..153148844e 100644 --- a/reactive/kotlinx-coroutines-reactor/README.md +++ b/reactive/kotlinx-coroutines-reactor/README.md @@ -29,8 +29,8 @@ Conversion functions: -[mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-coroutine-scope/mono.html -[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-coroutine-scope/flux.html +[mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/mono.html +[flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/flux.html [kotlinx.coroutines.Job.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-job/as-mono.html [kotlinx.coroutines.Deferred.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-deferred/as-mono.html [kotlinx.coroutines.channels.ReceiveChannel.asFlux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.channels.-receive-channel/as-flux.html diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt index ea9f8823a2..cf6b65de92 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt @@ -22,7 +22,7 @@ import kotlin.coroutines.* * @param context -- the coroutine context from which the resulting mono is going to be signalled */ @ExperimentalCoroutinesApi -public fun Job.asMono(context: CoroutineContext): Mono = GlobalScope.mono(context) { this@asMono.join() } +public fun Job.asMono(context: CoroutineContext): Mono = mono(context) { this@asMono.join() } /** * Converts this deferred value to the hot reactive mono that signals * [success][MonoSink.success] or [error][MonoSink.error]. @@ -36,7 +36,7 @@ public fun Job.asMono(context: CoroutineContext): Mono = GlobalScope.mono( * @param context -- the coroutine context from which the resulting mono is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asMono(context: CoroutineContext): Mono = GlobalScope.mono(context) { this@asMono.await() } +public fun Deferred.asMono(context: CoroutineContext): Mono = mono(context) { this@asMono.await() } /** * Converts a stream of elements received from the channel to the hot reactive flux. @@ -50,7 +50,7 @@ public fun Deferred.asMono(context: CoroutineContext): Mono = GlobalS * @param context -- the coroutine context from which the resulting flux is going to be signalled */ @ObsoleteCoroutinesApi -public fun ReceiveChannel.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = GlobalScope.flux(context) { +public fun ReceiveChannel.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = flux(context) { for (t in this@asFlux) send(t) } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 3495501964..785b4652bb 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -1,7 +1,9 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlinx.coroutines.reactor import kotlinx.coroutines.* @@ -9,16 +11,15 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.reactive.* import reactor.core.publisher.* import kotlin.coroutines.* +import kotlin.internal.LowPriorityInOverloadResolution /** * Creates cold reactive [Flux] that runs a given [block] in a coroutine. * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. * Coroutine emits items with `send`. Unsubscribing cancels running coroutine. * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. + * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. * * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that * `onNext` is not invoked concurrently. @@ -29,12 +30,29 @@ import kotlin.coroutines.* * | Normal completion or `close` without cause | `onComplete` * | Failure with exception or `close` with cause | `onError` * + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. */ @ExperimentalCoroutinesApi -fun CoroutineScope.flux( +public fun flux( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope.() -> Unit +): Flux { + require(context[Job] === null) { "Flux context cannot contain job in it." + + "Its lifecycle should be managed via Disposable handle. Had $context" } + return Flux.from(publishInternal(GlobalScope, context, block)) +} + +@Deprecated( + message = "CoroutineScope.flux is deprecated in favour of top-level flux", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("flux(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring +@LowPriorityInOverloadResolution +public fun CoroutineScope.flux( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit ): Flux = - Flux.from(publish(newCoroutineContext(context), block = block)) + Flux.from(publishInternal(this, context, block)) diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 7174fb6021..a0f65af5c9 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -1,13 +1,16 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlinx.coroutines.reactor import kotlinx.coroutines.* import reactor.core.* import reactor.core.publisher.* import kotlin.coroutines.* +import kotlin.internal.* /** * Creates cold [mono][Mono] that will run a given [block] in a coroutine. @@ -20,19 +23,37 @@ import kotlin.coroutines.* * | Returns a null | `success` * | Failure with exception or unsubscribe | `error` * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. + * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. * - * @param context context of the coroutine. - * @param block the coroutine code. + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -fun CoroutineScope.mono( +public fun mono( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? +): Mono { + require(context[Job] === null) { "Mono context cannot contain job in it." + + "Its lifecycle should be managed via Disposable handle. Had $context" } + return monoInternal(GlobalScope, context, block) +} + +@Deprecated( + message = "CoroutineScope.mono is deprecated in favour of top-level mono", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("mono(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +@LowPriorityInOverloadResolution +public fun CoroutineScope.mono( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T? +): Mono = monoInternal(this, context, block) + +private fun monoInternal( + scope: CoroutineScope, // support for legacy mono in scope + context: CoroutineContext, + block: suspend CoroutineScope.() -> T? ): Mono = Mono.create { sink -> - val newContext = newCoroutineContext(context) + val newContext = scope.newCoroutineContext(context) val coroutine = MonoCoroutine(newContext, sink) sink.onDispose(coroutine) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) diff --git a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt index 9bd55cdc72..10e05b7658 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ConvertTest.kt @@ -17,7 +17,7 @@ class ConvertTest : TestBase() { val job = launch { expect(3) } - val mono = job.asMono(coroutineContext) + val mono = job.asMono(coroutineContext.minusKey(Job)) mono.subscribe { expect(4) } @@ -29,11 +29,11 @@ class ConvertTest : TestBase() { @Test fun testJobToMonoFail() = runBlocking { expect(1) - val job = async(NonCancellable) { // don't kill parent on exception + val job = async(NonCancellable) { expect(3) throw RuntimeException("OK") } - val mono = job.asMono(coroutineContext + NonCancellable) + val mono = job.asMono(coroutineContext.minusKey(Job)) mono.subscribe( { fail("no item should be emitted") }, { expect(4) } @@ -110,10 +110,10 @@ class ConvertTest : TestBase() { throw TestException("K") } val flux = c.asFlux(Dispatchers.Unconfined) - val mono = GlobalScope.mono(Dispatchers.Unconfined) { + val mono = mono(Dispatchers.Unconfined) { var result = "" try { - flux.consumeEach { result += it } + flux.collect { result += it } } catch(e: Throwable) { check(e is TestException) result += e.message diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt index c4c0dbcbb6..ae23d3c23d 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt @@ -15,7 +15,7 @@ class FluxMultiTest : TestBase() { @Test fun testNumbers() { val n = 100 * stressTestMultiplier - val flux = GlobalScope.flux { + val flux = flux { repeat(n) { send(it) } } checkMonoValue(flux.collectList()) { list -> @@ -26,7 +26,7 @@ class FluxMultiTest : TestBase() { @Test fun testConcurrentStress() { val n = 10_000 * stressTestMultiplier - val flux = GlobalScope.flux { + val flux = flux { // concurrent emitters (many coroutines) val jobs = List(n) { // launch @@ -45,7 +45,7 @@ class FluxMultiTest : TestBase() { @Test fun testIteratorResendUnconfined() { val n = 10_000 * stressTestMultiplier - val flux = GlobalScope.flux(Dispatchers.Unconfined) { + val flux = flux(Dispatchers.Unconfined) { Flux.range(0, n).collect { send(it) } } checkMonoValue(flux.collectList()) { list -> @@ -56,7 +56,7 @@ class FluxMultiTest : TestBase() { @Test fun testIteratorResendPool() { val n = 10_000 * stressTestMultiplier - val flux = GlobalScope.flux { + val flux = flux { Flux.range(0, n).collect { send(it) } } checkMonoValue(flux.collectList()) { list -> @@ -66,11 +66,11 @@ class FluxMultiTest : TestBase() { @Test fun testSendAndCrash() { - val flux = GlobalScope.flux { + val flux = flux { send("O") throw IOException("K") } - val mono = GlobalScope.mono { + val mono = mono { var result = "" try { flux.consumeEach { result += it } diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt index 241cc6aaed..7d8d46984b 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt @@ -14,7 +14,7 @@ import java.time.Duration.* class FluxSingleTest { @Test fun testSingleNoWait() { - val flux = GlobalScope.flux { + val flux = flux { send("OK") } @@ -30,7 +30,7 @@ class FluxSingleTest { @Test fun testSingleEmitAndAwait() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.just("O").awaitSingle() + "K") } @@ -41,7 +41,7 @@ class FluxSingleTest { @Test fun testSingleWithDelay() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K") } @@ -52,7 +52,7 @@ class FluxSingleTest { @Test fun testSingleException() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.just("O", "K").awaitSingle() + "K") } @@ -63,7 +63,7 @@ class FluxSingleTest { @Test fun testAwaitFirst() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.just("O", "#").awaitFirst() + "K") } @@ -74,7 +74,7 @@ class FluxSingleTest { @Test fun testAwaitFirstOrDefault() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.empty().awaitFirstOrDefault("O") + "K") } @@ -85,7 +85,7 @@ class FluxSingleTest { @Test fun testAwaitFirstOrDefaultWithValues() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.just("O", "#").awaitFirstOrDefault("!") + "K") } @@ -96,7 +96,7 @@ class FluxSingleTest { @Test fun testAwaitFirstOrNull() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.empty().awaitFirstOrNull() ?: "OK") } @@ -107,7 +107,7 @@ class FluxSingleTest { @Test fun testAwaitFirstOrNullWithValues() { - val flux = GlobalScope.flux { + val flux = flux { send((Flux.just("O", "#").awaitFirstOrNull() ?: "!") + "K") } @@ -118,7 +118,7 @@ class FluxSingleTest { @Test fun testAwaitFirstOrElse() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.empty().awaitFirstOrElse { "O" } + "K") } @@ -129,7 +129,7 @@ class FluxSingleTest { @Test fun testAwaitFirstOrElseWithValues() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.just("O", "#").awaitFirstOrElse { "!" } + "K") } @@ -140,7 +140,7 @@ class FluxSingleTest { @Test fun testAwaitLast() { - val flux = GlobalScope.flux { + val flux = flux { send(Flux.just("#", "O").awaitLast() + "K") } @@ -151,7 +151,7 @@ class FluxSingleTest { @Test fun testExceptionFromObservable() { - val flux = GlobalScope.flux { + val flux = flux { try { send(Flux.error(RuntimeException("O")).awaitFirst()) } catch (e: RuntimeException) { @@ -166,7 +166,7 @@ class FluxSingleTest { @Test fun testExceptionFromCoroutine() { - val flux = GlobalScope.flux { + val flux = flux { error(Flux.just("O").awaitSingle() + "K") } @@ -178,7 +178,7 @@ class FluxSingleTest { @Test fun testFluxIteration() { - val flux = GlobalScope.flux { + val flux = flux { var result = "" Flux.just("O", "K").collect { result += it } send(result) @@ -191,7 +191,7 @@ class FluxSingleTest { @Test fun testFluxIterationFailure() { - val flux = GlobalScope.flux { + val flux = flux { try { Flux.error(RuntimeException("OK")).collect { fail("Should not be here") } send("Fail") diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt index a0368f84c7..ee26455ec8 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt @@ -15,7 +15,7 @@ class FluxTest : TestBase() { @Test fun testBasicSuccess() = runBlocking { expect(1) - val flux = flux { + val flux = flux(currentDispatcher()) { expect(4) send("OK") } @@ -32,7 +32,7 @@ class FluxTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val flux = flux(NonCancellable) { + val flux = flux(currentDispatcher()) { expect(4) throw RuntimeException("OK") } @@ -52,7 +52,7 @@ class FluxTest : TestBase() { @Test fun testBasicUnsubscribe() = runBlocking { expect(1) - val flux = flux { + val flux = flux(currentDispatcher()) { expect(4) yield() // back to main, will get cancelled expectUnreached() @@ -71,24 +71,11 @@ class FluxTest : TestBase() { finish(6) } - @Test - fun testCancelsParentOnFailure() = runTest( - expected = { it is RuntimeException && it.message == "OK" } - ) { - // has parent, so should cancel it on failure - flux { - throw RuntimeException("OK") - }.subscribe( - { expectUnreached() }, - { assert(it is RuntimeException) } - ) - } - @Test fun testNotifyOnceOnCancellation() = runTest { expect(1) val observable = - flux { + flux(currentDispatcher()) { expect(5) send("OK") try { @@ -124,7 +111,7 @@ class FluxTest : TestBase() { @Test fun testFailingConsumer() = runTest { - val pub = flux { + val pub = flux(currentDispatcher()) { repeat(3) { expect(it + 1) // expect(1), expect(2) *should* be invoked send(it) @@ -138,4 +125,9 @@ class FluxTest : TestBase() { finish(3) } } + + @Test + fun testIllegalArgumentException() { + assertFailsWith { flux(Job()) { } } + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index 7c72edc484..2283d45afc 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -22,14 +22,14 @@ class MonoTest : TestBase() { @Test fun testBasicSuccess() = runBlocking { expect(1) - val mono = mono { + val mono = mono(currentDispatcher()) { expect(4) "OK" } expect(2) mono.subscribe { value -> expect(5) - Assert.assertThat(value, IsEqual("OK")) + assertThat(value, IsEqual("OK")) } expect(3) yield() // to started coroutine @@ -39,7 +39,7 @@ class MonoTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val mono = mono(NonCancellable) { + val mono = mono(currentDispatcher()) { expect(4) throw RuntimeException("OK") } @@ -48,8 +48,8 @@ class MonoTest : TestBase() { expectUnreached() }, { error -> expect(5) - Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java)) - Assert.assertThat(error.message, IsEqual("OK")) + assertThat(error, IsInstanceOf(RuntimeException::class.java)) + assertThat(error.message, IsEqual("OK")) }) expect(3) yield() // to started coroutine @@ -59,7 +59,7 @@ class MonoTest : TestBase() { @Test fun testBasicEmpty() = runBlocking { expect(1) - val mono = mono { + val mono = mono(currentDispatcher()) { expect(4) null } @@ -75,7 +75,7 @@ class MonoTest : TestBase() { @Test fun testBasicUnsubscribe() = runBlocking { expect(1) - val mono = mono { + val mono = mono(currentDispatcher()) { expect(4) yield() // back to main, will get cancelled expectUnreached() @@ -97,7 +97,7 @@ class MonoTest : TestBase() { @Test fun testMonoNoWait() { - val mono = GlobalScope.mono { + val mono = mono { "OK" } @@ -113,7 +113,7 @@ class MonoTest : TestBase() { @Test fun testMonoEmitAndAwait() { - val mono = GlobalScope.mono { + val mono = mono { Mono.just("O").awaitSingle() + "K" } @@ -124,7 +124,7 @@ class MonoTest : TestBase() { @Test fun testMonoWithDelay() { - val mono = GlobalScope.mono { + val mono = mono { Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K" } @@ -135,7 +135,7 @@ class MonoTest : TestBase() { @Test fun testMonoException() { - val mono = GlobalScope.mono { + val mono = mono { Flux.just("O", "K").awaitSingle() + "K" } @@ -146,7 +146,7 @@ class MonoTest : TestBase() { @Test fun testAwaitFirst() { - val mono = GlobalScope.mono { + val mono = mono { Flux.just("O", "#").awaitFirst() + "K" } @@ -157,7 +157,7 @@ class MonoTest : TestBase() { @Test fun testAwaitLast() { - val mono = GlobalScope.mono { + val mono = mono { Flux.just("#", "O").awaitLast() + "K" } @@ -168,7 +168,7 @@ class MonoTest : TestBase() { @Test fun testExceptionFromFlux() { - val mono = GlobalScope.mono { + val mono = mono { try { Flux.error(RuntimeException("O")).awaitFirst() } catch (e: RuntimeException) { @@ -183,7 +183,7 @@ class MonoTest : TestBase() { @Test fun testExceptionFromCoroutine() { - val mono = GlobalScope.mono { + val mono = mono { throw IllegalStateException(Flux.just("O").awaitSingle() + "K") } @@ -193,22 +193,9 @@ class MonoTest : TestBase() { } } - @Test - fun testCancelsParentOnFailure() = runTest( - expected = { it is RuntimeException && it.message == "OK" } - ) { - // has parent, so should cancel it on failure - mono { - throw RuntimeException("OK") - }.subscribe( - { expectUnreached() }, - { assert(it is RuntimeException) } - ) - } - @Test fun testSuppressedException() = runTest { - val mono = mono(NonCancellable) { + val mono = mono(currentDispatcher()) { launch(start = CoroutineStart.ATOMIC) { throw TestException() // child coroutine fails } @@ -227,12 +214,14 @@ class MonoTest : TestBase() { } @Test - fun testUnhandledException() = runTest( - unhandled = listOf { it -> it is TestException } - ) { + fun testUnhandledException() = runTest { expect(1) var subscription: Subscription? = null - val mono = mono(NonCancellable) { + val mono = mono(currentDispatcher() + CoroutineExceptionHandler { _, t -> + assertTrue(t is TestException) + expect(5) + + }) { expect(4) subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled try { @@ -252,6 +241,11 @@ class MonoTest : TestBase() { }) expect(3) yield() // run coroutine - finish(5) + finish(6) + } + + @Test + fun testIllegalArgumentException() { + assertFailsWith { mono(Job()) { } } } } diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index f1079b6fdc..fbdf1b35af 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -51,11 +51,11 @@ Conversion functions: [ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html -[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-completable.html -[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-maybe.html -[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-single.html -[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-observable.html -[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-flowable.html +[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-completable.html +[rxMaybe]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-maybe.html +[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-single.html +[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-observable.html +[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html [io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html [io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html [io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index f44ecf7e0c..61046f280b 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -1,12 +1,15 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* +import kotlin.internal.* /** * Creates cold [Completable] that runs a given [block] in a coroutine. @@ -18,19 +21,36 @@ import kotlin.coroutines.* * | Completes successfully | `onCompleted` * | Failure with exception or unsubscribe | `onError` * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. + * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. - * - * @param context context of the coroutine. - * @param block the coroutine code. + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ +public fun rxCompletable( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> Unit +): Completable { + require(context[Job] === null) { "Completable context cannot contain job in it." + + "Its lifecycle should be managed via Disposable handle. Had $context" } + return rxCompletableInternal(GlobalScope, context, block) +} + +@Deprecated( + message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("rxCompletable(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +@LowPriorityInOverloadResolution public fun CoroutineScope.rxCompletable( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> Unit +): Completable = rxCompletableInternal(this, context, block) + +private fun rxCompletableInternal( + scope: CoroutineScope, // support for legacy rxCompletable in scope + context: CoroutineContext, + block: suspend CoroutineScope.() -> Unit ): Completable = Completable.create { subscriber -> - val newContext = newCoroutineContext(context) + val newContext = scope.newCoroutineContext(context) val coroutine = RxCompletableCoroutine(newContext, subscriber) subscriber.setCancellable(RxCancellable(coroutine)) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index dbf29f1ebb..d5678de921 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -25,7 +25,7 @@ import kotlin.coroutines.* * @param context -- the coroutine context from which the resulting completable is going to be signalled */ @ExperimentalCoroutinesApi -public fun Job.asCompletable(context: CoroutineContext): Completable = GlobalScope.rxCompletable(context) { +public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) { this@asCompletable.join() } @@ -42,7 +42,7 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = GlobalSco * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asMaybe(context: CoroutineContext): Maybe = GlobalScope.rxMaybe(context) { +public fun Deferred.asMaybe(context: CoroutineContext): Maybe = rxMaybe(context) { this@asMaybe.await() } @@ -59,7 +59,7 @@ public fun Deferred.asMaybe(context: CoroutineContext): Maybe = Globa * @param context -- the coroutine context from which the resulting single is going to be signalled */ @ExperimentalCoroutinesApi -public fun Deferred.asSingle(context: CoroutineContext): Single = GlobalScope.rxSingle(context) { +public fun Deferred.asSingle(context: CoroutineContext): Single = rxSingle(context) { this@asSingle.await() } @@ -75,7 +75,7 @@ public fun Deferred.asSingle(context: CoroutineContext): Single * @param context -- the coroutine context from which the resulting observable is going to be signalled */ @ObsoleteCoroutinesApi -public fun ReceiveChannel.asObservable(context: CoroutineContext): Observable = GlobalScope.rxObservable(context) { +public fun ReceiveChannel.asObservable(context: CoroutineContext): Observable = rxObservable(context) { for (t in this@asObservable) send(t) } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt index 93d6079aab..beee40eedb 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt @@ -1,7 +1,9 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlinx.coroutines.rx2 import io.reactivex.* @@ -9,6 +11,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.reactive.* import kotlin.coroutines.* +import kotlin.internal.* /** * Creates cold [flowable][Flowable] that will run a given [block] in a coroutine. @@ -24,19 +27,29 @@ import kotlin.coroutines.* * | Normal completion or `close` without cause | `onComplete` * | Failure with exception or `close` with cause | `onError` * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. + * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect - * to cancellation and error handling may change in the future. - * - * @param context context of the coroutine. - * @param block the coroutine code. */ @ExperimentalCoroutinesApi +public fun rxFlowable( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope.() -> Unit +): Flowable { + require(context[Job] === null) { "Flowable context cannot contain job in it." + + "Its lifecycle should be managed via Disposable handle. Had $context" } + return Flowable.fromPublisher(publishInternal(GlobalScope, context, block)) +} + +@Deprecated( + message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("rxFlowable(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +@LowPriorityInOverloadResolution public fun CoroutineScope.rxFlowable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit -): Flowable = Flowable.fromPublisher(publish(newCoroutineContext(context), block = block)) +): Flowable = Flowable.fromPublisher(publishInternal(this, context, block)) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index 3e3f13b43f..e93ae6b2d4 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -1,12 +1,15 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* +import kotlin.internal.* /** * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine. @@ -19,19 +22,36 @@ import kotlin.coroutines.* * | Returns a null | `onComplete` * | Failure with exception or unsubscribe | `onError` * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. + * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. - * - * @param context context of the coroutine. - * @param block the coroutine code. + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ +public fun rxMaybe( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T? +): Maybe { + require(context[Job] === null) { "Maybe context cannot contain job in it." + + "Its lifecycle should be managed via Disposable handle. Had $context" } + return rxMaybeInternal(GlobalScope, context, block) +} + +@Deprecated( + message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("rxMaybe(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +@LowPriorityInOverloadResolution public fun CoroutineScope.rxMaybe( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T? +): Maybe = rxMaybeInternal(this, context, block) + +private fun rxMaybeInternal( + scope: CoroutineScope, // support for legacy rxMaybe in scope + context: CoroutineContext, + block: suspend CoroutineScope.() -> T? ): Maybe = Maybe.create { subscriber -> - val newContext = newCoroutineContext(context) + val newContext = scope.newCoroutineContext(context) val coroutine = RxMaybeCoroutine(newContext, subscriber) subscriber.setCancellable(RxCancellable(coroutine)) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index f78f5eaf68..35176f1484 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -1,7 +1,9 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlinx.coroutines.rx2 import io.reactivex.* @@ -11,6 +13,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* +import kotlin.internal.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -26,23 +29,37 @@ import kotlin.coroutines.* * | Normal completion or `close` without cause | `onComplete` * | Failure with exception or `close` with cause | `onError` * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. + * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. - * - * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect - * to cancellation and error handling may change in the future. - * - * @param context context of the coroutine. - * @param block the coroutine code. + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ @ExperimentalCoroutinesApi +public fun rxObservable( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope.() -> Unit +): Observable { + require(context[Job] === null) { "Observable context cannot contain job in it." + + "Its lifecycle should be managed via Disposable handle. Had $context" } + return rxObservableInternal(GlobalScope, context, block) +} + +@Deprecated( + message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("rxObservable(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +@LowPriorityInOverloadResolution public fun CoroutineScope.rxObservable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit +): Observable = rxObservableInternal(this, context, block) + +private fun rxObservableInternal( + scope: CoroutineScope, // support for legacy rxObservable in scope + context: CoroutineContext, + block: suspend ProducerScope.() -> Unit ): Observable = Observable.create { subscriber -> - val newContext = newCoroutineContext(context) + val newContext = scope.newCoroutineContext(context) val coroutine = RxObservableCoroutine(newContext, subscriber) subscriber.setCancellable(RxCancellable(coroutine)) // do it first (before starting coroutine), to await unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index 53992d4eb4..e382bbe39d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -1,12 +1,15 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* +import kotlin.internal.* /** * Creates cold [single][Single] that will run a given [block] in a coroutine. @@ -18,19 +21,36 @@ import kotlin.coroutines.* * | Returns a value | `onSuccess` * | Failure with exception or unsubscribe | `onError` * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. + * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. - * - * @param context context of the coroutine. - * @param block the coroutine code. + * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ +public fun rxSingle( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T +): Single { + require(context[Job] === null) { "Single context cannot contain job in it." + + "Its lifecycle should be managed via Disposable handle. Had $context" } + return rxSingleInternal(GlobalScope, context, block) +} + +@Deprecated( + message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("rxSingle(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +@LowPriorityInOverloadResolution public fun CoroutineScope.rxSingle( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T +): Single = rxSingleInternal(this, context, block) + +private fun rxSingleInternal( + scope: CoroutineScope, // support for legacy rxSingle in scope + context: CoroutineContext, + block: suspend CoroutineScope.() -> T ): Single = Single.create { subscriber -> - val newContext = newCoroutineContext(context) + val newContext = scope.newCoroutineContext(context) val coroutine = RxSingleCoroutine(newContext, subscriber) subscriber.setCancellable(RxCancellable(coroutine)) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index a11807c83c..fd159641bb 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -15,7 +15,7 @@ class CompletableTest : TestBase() { @Test fun testBasicSuccess() = runBlocking { expect(1) - val completable = rxCompletable { + val completable = rxCompletable(currentDispatcher()) { expect(4) } expect(2) @@ -30,7 +30,7 @@ class CompletableTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val completable = rxCompletable(NonCancellable) { + val completable = rxCompletable(currentDispatcher()) { expect(4) throw RuntimeException("OK") } @@ -50,7 +50,7 @@ class CompletableTest : TestBase() { @Test fun testBasicUnsubscribe() = runBlocking { expect(1) - val completable = rxCompletable { + val completable = rxCompletable(currentDispatcher()) { expect(4) yield() // back to main, will get cancelled expectUnreached() @@ -73,7 +73,7 @@ class CompletableTest : TestBase() { @Test fun testAwaitSuccess() = runBlocking { expect(1) - val completable = rxCompletable { + val completable = rxCompletable(currentDispatcher()) { expect(3) } expect(2) @@ -84,7 +84,7 @@ class CompletableTest : TestBase() { @Test fun testAwaitFailure() = runBlocking { expect(1) - val completable = rxCompletable(NonCancellable) { + val completable = rxCompletable(currentDispatcher()) { expect(3) throw RuntimeException("OK") } @@ -98,22 +98,9 @@ class CompletableTest : TestBase() { } } - @Test - fun testCancelsParentOnFailure() = runTest( - expected = { it is RuntimeException && it.message == "OK" } - ) { - // has parent, so should cancel it on failure - rxCompletable { - throw RuntimeException("OK") - }.subscribe( - { expectUnreached() }, - { assert(it is RuntimeException) } - ) - } - @Test fun testSuppressedException() = runTest { - val completable = rxCompletable(NonCancellable) { + val completable = rxCompletable(currentDispatcher()) { launch(start = CoroutineStart.ATOMIC) { throw TestException() // child coroutine fails } @@ -132,12 +119,14 @@ class CompletableTest : TestBase() { } @Test - fun testUnhandledException() = runTest( - unhandled = listOf { it -> it is TestException } - ) { + fun testUnhandledException() = runTest() { expect(1) var disposable: Disposable? = null - val completable = rxCompletable(NonCancellable) { + val eh = CoroutineExceptionHandler { _, t -> + assertTrue(t is TestException) + expect(5) + } + val completable = rxCompletable(currentDispatcher() + eh) { expect(4) disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled try { @@ -156,6 +145,6 @@ class CompletableTest : TestBase() { }) expect(3) yield() // run coroutine - finish(5) + finish(6) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt index 475ee57a7c..ba14b89ca8 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ConvertTest.kt @@ -16,7 +16,7 @@ class ConvertTest : TestBase() { val job = launch { expect(3) } - val completable = job.asCompletable(coroutineContext) + val completable = job.asCompletable(coroutineContext.minusKey(Job)) completable.subscribe { expect(4) } @@ -32,7 +32,7 @@ class ConvertTest : TestBase() { expect(3) throw RuntimeException("OK") } - val completable = job.asCompletable(coroutineContext) + val completable = job.asCompletable(coroutineContext.minusKey(Job)) completable.subscribe { expect(4) } @@ -140,7 +140,7 @@ class ConvertTest : TestBase() { throw TestException("K") } val observable = c.asObservable(Dispatchers.Unconfined) - val single = GlobalScope.rxSingle(Dispatchers.Unconfined) { + val single = rxSingle(Dispatchers.Unconfined) { var result = "" try { observable.consumeEach { result += it } diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt index 543de0930c..aebf9993cc 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt @@ -15,7 +15,7 @@ class FlowableTest : TestBase() { @Test fun testBasicSuccess() = runBlocking { expect(1) - val observable = rxFlowable { + val observable = rxFlowable(currentDispatcher()) { expect(4) send("OK") } @@ -32,7 +32,7 @@ class FlowableTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val observable = rxFlowable(NonCancellable) { + val observable = rxFlowable(currentDispatcher()) { expect(4) throw RuntimeException("OK") } @@ -52,7 +52,7 @@ class FlowableTest : TestBase() { @Test fun testBasicUnsubscribe() = runBlocking { expect(1) - val observable = rxFlowable { + val observable = rxFlowable(currentDispatcher()) { expect(4) yield() // back to main, will get cancelled expectUnreached() @@ -71,24 +71,11 @@ class FlowableTest : TestBase() { finish(6) } - @Test - fun testCancelsParentOnFailure() = runTest( - expected = { it is RuntimeException && it.message == "OK" } - ) { - // has parent, so should cancel it on failure - rxFlowable { - throw RuntimeException("OK") - }.subscribe( - { expectUnreached() }, - { assert(it is RuntimeException) } - ) - } - @Test fun testNotifyOnceOnCancellation() = runTest { expect(1) val observable = - rxFlowable { + rxFlowable(currentDispatcher()) { expect(5) send("OK") try { @@ -124,7 +111,7 @@ class FlowableTest : TestBase() { @Test fun testFailingConsumer() = runTest { - val pub = rxFlowable { + val pub = rxFlowable(currentDispatcher()) { repeat(3) { expect(it + 1) // expect(1), expect(2) *should* be invoked send(it) diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt index 9b55e58c83..ca7c0ca5ce 100644 --- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt @@ -20,7 +20,7 @@ class IntegrationTest( ) : TestBase() { enum class Ctx { - MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context }, + MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) }, DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default }, UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined }; @@ -58,7 +58,7 @@ class IntegrationTest( @Test fun testSingle() = runBlocking { - val observable = CoroutineScope(ctx(coroutineContext)).rxObservable { + val observable = rxObservable(ctx(coroutineContext)) { if (delay) delay(1) send("OK") } @@ -101,8 +101,7 @@ class IntegrationTest( fun testCancelWithoutValue() = runTest { val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) { rxObservable { - yield() - expectUnreached() + hang { } }.awaitFirst() } diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index e97b1f01bb..5a9bac2fdb 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -24,7 +24,7 @@ class MaybeTest : TestBase() { @Test fun testBasicSuccess() = runBlocking { expect(1) - val maybe = rxMaybe { + val maybe = rxMaybe(currentDispatcher()) { expect(4) "OK" } @@ -41,7 +41,7 @@ class MaybeTest : TestBase() { @Test fun testBasicEmpty() = runBlocking { expect(1) - val maybe = rxMaybe { + val maybe = rxMaybe(currentDispatcher()) { expect(4) null } @@ -57,7 +57,7 @@ class MaybeTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val maybe = rxMaybe(NonCancellable) { + val maybe = rxMaybe(currentDispatcher()) { expect(4) throw RuntimeException("OK") } @@ -78,7 +78,7 @@ class MaybeTest : TestBase() { @Test fun testBasicUnsubscribe() = runBlocking { expect(1) - val maybe = rxMaybe { + val maybe = rxMaybe(currentDispatcher()) { expect(4) yield() // back to main, will get cancelled expectUnreached() @@ -100,7 +100,7 @@ class MaybeTest : TestBase() { @Test fun testMaybeNoWait() { - val maybe = GlobalScope.rxMaybe { + val maybe = rxMaybe { "OK" } @@ -121,7 +121,7 @@ class MaybeTest : TestBase() { @Test fun testMaybeEmitAndAwait() { - val maybe = GlobalScope.rxMaybe { + val maybe = rxMaybe { Maybe.just("O").await() + "K" } @@ -132,7 +132,7 @@ class MaybeTest : TestBase() { @Test fun testMaybeWithDelay() { - val maybe = GlobalScope.rxMaybe { + val maybe = rxMaybe { Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K" } @@ -143,7 +143,7 @@ class MaybeTest : TestBase() { @Test fun testMaybeException() { - val maybe = GlobalScope.rxMaybe { + val maybe = rxMaybe { Observable.just("O", "K").awaitSingle() + "K" } @@ -154,7 +154,7 @@ class MaybeTest : TestBase() { @Test fun testAwaitFirst() { - val maybe = GlobalScope.rxMaybe { + val maybe = rxMaybe { Observable.just("O", "#").awaitFirst() + "K" } @@ -165,7 +165,7 @@ class MaybeTest : TestBase() { @Test fun testAwaitLast() { - val maybe = GlobalScope.rxMaybe { + val maybe = rxMaybe { Observable.just("#", "O").awaitLast() + "K" } @@ -176,7 +176,7 @@ class MaybeTest : TestBase() { @Test fun testExceptionFromObservable() { - val maybe = GlobalScope.rxMaybe { + val maybe = rxMaybe { try { Observable.error(RuntimeException("O")).awaitFirst() } catch (e: RuntimeException) { @@ -191,7 +191,7 @@ class MaybeTest : TestBase() { @Test fun testExceptionFromCoroutine() { - val maybe = GlobalScope.rxMaybe { + val maybe = rxMaybe { throw IllegalStateException(Observable.just("O").awaitSingle() + "K") } @@ -201,23 +201,10 @@ class MaybeTest : TestBase() { } } - @Test - fun testCancelsParentOnFailure() = runTest( - expected = { it is RuntimeException && it.message == "OK" } - ) { - // has parent, so should cancel it on failure - rxMaybe { - throw RuntimeException("OK") - }.subscribe( - { expectUnreached() }, - { assert(it is RuntimeException) } - ) - } - @Test fun testCancelledConsumer() = runTest { expect(1) - val maybe = rxMaybe { + val maybe = rxMaybe(currentDispatcher()) { expect(4) try { delay(Long.MAX_VALUE) @@ -242,7 +229,7 @@ class MaybeTest : TestBase() { @Test fun testSuppressedException() = runTest { - val maybe = rxMaybe(NonCancellable) { + val maybe = rxMaybe(currentDispatcher()) { launch(start = CoroutineStart.ATOMIC) { throw TestException() // child coroutine fails } @@ -261,12 +248,14 @@ class MaybeTest : TestBase() { } @Test - fun testUnhandledException() = runTest( - unhandled = listOf { it -> it is TestException } - ) { + fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null - val maybe = rxMaybe(NonCancellable) { + val eh = CoroutineExceptionHandler { _, t -> + assertTrue(t is TestException) + expect(5) + } + val maybe = rxMaybe(currentDispatcher() + eh) { expect(4) disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled try { @@ -286,6 +275,6 @@ class MaybeTest : TestBase() { }) expect(3) yield() // run coroutine - finish(5) + finish(6) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt index 9208195fef..75f79de5ca 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt @@ -76,7 +76,7 @@ class ObservableMultiTest : TestBase() { send("O") throw IOException("K") } - val single = GlobalScope.rxSingle { + val single = rxSingle { var result = "" try { observable.consumeEach { result += it } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt index 8dc7120c0f..c71ef566ba 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt @@ -14,7 +14,7 @@ class ObservableTest : TestBase() { @Test fun testBasicSuccess() = runBlocking { expect(1) - val observable = rxObservable { + val observable = rxObservable(currentDispatcher()) { expect(4) send("OK") } @@ -31,7 +31,7 @@ class ObservableTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val observable = rxObservable(NonCancellable) { + val observable = rxObservable(currentDispatcher()) { expect(4) throw RuntimeException("OK") } @@ -51,7 +51,7 @@ class ObservableTest : TestBase() { @Test fun testBasicUnsubscribe() = runBlocking { expect(1) - val observable = rxObservable { + val observable = rxObservable(currentDispatcher()) { expect(4) yield() // back to main, will get cancelled expectUnreached() @@ -70,24 +70,11 @@ class ObservableTest : TestBase() { finish(6) } - @Test - fun testCancelsParentOnFailure() = runTest( - expected = { it is RuntimeException && it.message == "OK" } - ) { - // has parent, so should cancel it on failure - rxObservable { - throw RuntimeException("OK") - }.subscribe( - { expectUnreached() }, - { assert(it is RuntimeException) } - ) - } - @Test fun testNotifyOnceOnCancellation() = runTest { expect(1) val observable = - rxObservable { + rxObservable(currentDispatcher()) { expect(5) send("OK") try { @@ -124,7 +111,7 @@ class ObservableTest : TestBase() { @Test fun testFailingConsumer() = runTest { expect(1) - val pub = rxObservable { + val pub = rxObservable(currentDispatcher()) { expect(2) send("OK") try { diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index 2ae9570265..8b786164b4 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -21,14 +21,14 @@ class SingleTest : TestBase() { @Test fun testBasicSuccess() = runBlocking { expect(1) - val single = rxSingle { + val single = rxSingle(currentDispatcher()) { expect(4) "OK" } expect(2) single.subscribe { value -> expect(5) - Assert.assertThat(value, IsEqual("OK")) + assertThat(value, IsEqual("OK")) } expect(3) yield() // to started coroutine @@ -38,7 +38,7 @@ class SingleTest : TestBase() { @Test fun testBasicFailure() = runBlocking { expect(1) - val single = rxSingle(NonCancellable) { + val single = rxSingle(currentDispatcher()) { expect(4) throw RuntimeException("OK") } @@ -47,8 +47,8 @@ class SingleTest : TestBase() { expectUnreached() }, { error -> expect(5) - Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java)) - Assert.assertThat(error.message, IsEqual("OK")) + assertThat(error, IsInstanceOf(RuntimeException::class.java)) + assertThat(error.message, IsEqual("OK")) }) expect(3) yield() // to started coroutine @@ -59,7 +59,7 @@ class SingleTest : TestBase() { @Test fun testBasicUnsubscribe() = runBlocking { expect(1) - val single = rxSingle { + val single = rxSingle(currentDispatcher()) { expect(4) yield() // back to main, will get cancelled expectUnreached() @@ -82,7 +82,7 @@ class SingleTest : TestBase() { @Test fun testSingleNoWait() { - val single = GlobalScope.rxSingle { + val single = rxSingle { "OK" } @@ -98,7 +98,7 @@ class SingleTest : TestBase() { @Test fun testSingleEmitAndAwait() { - val single = GlobalScope.rxSingle { + val single = rxSingle { Single.just("O").await() + "K" } @@ -109,7 +109,7 @@ class SingleTest : TestBase() { @Test fun testSingleWithDelay() { - val single = GlobalScope.rxSingle { + val single = rxSingle { Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K" } @@ -120,7 +120,7 @@ class SingleTest : TestBase() { @Test fun testSingleException() { - val single = GlobalScope.rxSingle { + val single = rxSingle { Observable.just("O", "K").awaitSingle() + "K" } @@ -131,7 +131,7 @@ class SingleTest : TestBase() { @Test fun testAwaitFirst() { - val single = GlobalScope.rxSingle { + val single = rxSingle { Observable.just("O", "#").awaitFirst() + "K" } @@ -142,7 +142,7 @@ class SingleTest : TestBase() { @Test fun testAwaitLast() { - val single = GlobalScope.rxSingle { + val single = rxSingle { Observable.just("#", "O").awaitLast() + "K" } @@ -153,7 +153,7 @@ class SingleTest : TestBase() { @Test fun testExceptionFromObservable() { - val single = GlobalScope.rxSingle { + val single = rxSingle { try { Observable.error(RuntimeException("O")).awaitFirst() } catch (e: RuntimeException) { @@ -168,7 +168,7 @@ class SingleTest : TestBase() { @Test fun testExceptionFromCoroutine() { - val single = GlobalScope.rxSingle { + val single = rxSingle { throw IllegalStateException(Observable.just("O").awaitSingle() + "K") } @@ -178,22 +178,9 @@ class SingleTest : TestBase() { } } - @Test - fun testCancelsParentOnFailure() = runTest( - expected = { it is RuntimeException && it.message == "OK" } - ) { - // has parent, so should cancel it on failure - rxSingle { - throw RuntimeException("OK") - }.subscribe( - { expectUnreached() }, - { assert(it is RuntimeException) } - ) - } - @Test fun testSuppressedException() = runTest { - val single = rxSingle(NonCancellable) { + val single = rxSingle(currentDispatcher()) { launch(start = CoroutineStart.ATOMIC) { throw TestException() // child coroutine fails } @@ -212,12 +199,14 @@ class SingleTest : TestBase() { } @Test - fun testUnhandledException() = runTest( - unhandled = listOf { it -> it is TestException } - ) { + fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null - val single = rxSingle(NonCancellable) { + val eh = CoroutineExceptionHandler { _, t -> + assertTrue(t is TestException) + expect(5) + } + val single = rxSingle(currentDispatcher() + eh) { expect(4) disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled try { @@ -236,6 +225,6 @@ class SingleTest : TestBase() { }) expect(3) yield() // run coroutine - finish(5) + finish(6) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt index 990d90af9d..b87849a562 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt @@ -10,7 +10,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.reactive.* import kotlin.coroutines.CoroutineContext -fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish(context) { +fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish(context) { for (x in start until start + count) { delay(time) // wait before sending each number send(x) diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt index edaba3d0b1..1a214ce308 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.reactive.* import io.reactivex.schedulers.Schedulers import kotlin.coroutines.CoroutineContext -fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish(context) { +fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish(context) { for (x in start until start + count) { delay(time) // wait before sending each number send(x) diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt index 6fd997f0c1..5f07ba4972 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt @@ -14,7 +14,7 @@ fun Publisher.fusedFilterMap( context: CoroutineContext, // the context to execute this coroutine in predicate: (T) -> Boolean, // the filter predicate mapper: (T) -> R // the mapper function -) = GlobalScope.publish(context) { +) = publish(context) { collect { // collect the source stream if (predicate(it)) // filter part send(mapper(it)) // map part @@ -27,6 +27,6 @@ fun CoroutineScope.range(start: Int, count: Int) = publish { fun main() = runBlocking { range(1, 5) - .fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" }) + .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" }) .collect { println(it) } // print all the resulting strings } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt index b3041935cd..818a792bac 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.selects.* import org.reactivestreams.* import kotlin.coroutines.* -fun Publisher.takeUntil(context: CoroutineContext, other: Publisher) = GlobalScope.publish(context) { +fun Publisher.takeUntil(context: CoroutineContext, other: Publisher) = publish(context) { this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher val current = this other.openSubscription().consume { // explicitly open channel to Publisher @@ -35,5 +35,5 @@ fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publi fun main() = runBlocking { val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms - slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it + slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it } diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt index c57e78fb77..12d9c1f647 100644 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt +++ b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt @@ -10,7 +10,7 @@ import kotlinx.coroutines.reactive.* import org.reactivestreams.* import kotlin.coroutines.* -fun Publisher>.merge(context: CoroutineContext) = GlobalScope.publish(context) { +fun Publisher>.merge(context: CoroutineContext) = publish(context) { collect { pub -> // for each publisher collected launch { // launch a child coroutine pub.collect { send(it) } // resend all element from this publisher @@ -33,5 +33,5 @@ fun CoroutineScope.testPub() = publish> { } fun main() = runBlocking { - testPub().merge(coroutineContext).collect { println(it) } // print the whole stream + testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream }