Skip to content

Commit

Permalink
Reactive scopeless (#1341)
Browse files Browse the repository at this point in the history
Make all reactive builders top-level functions instead of extensions on CoroutineScope and prohibit jobs in their context

Downsides of having lifecycle-managed scoped builders:
  * The lifecycle of semantically cold entity is managed externally by the hot-one.
  * Independent failures in independent triggered computations affect each other
  * Two cancellation sources should be managed, coroutine-related Job parent and disposable/subscription
  • Loading branch information
qwwdfsad authored Jul 16, 2019
1 parent ace5899 commit d100a3f
Show file tree
Hide file tree
Showing 46 changed files with 447 additions and 508 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ public final class kotlinx/coroutines/NonCancellable : kotlin/coroutines/Abstrac
public fun join (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
public fun start ()Z
public fun toString ()Ljava/lang/String;
}

public final class kotlinx/coroutines/NonDisposableHandle : kotlinx/coroutines/ChildHandle, kotlinx/coroutines/DisposableHandle {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ public final class kotlinx/coroutines/reactive/ConvertKt {
}

public final class kotlinx/coroutines/reactive/PublishKt {
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static synthetic fun publish$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
public static synthetic fun publish$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ public final class kotlinx/coroutines/reactor/ConvertKt {
}

public final class kotlinx/coroutines/reactor/FluxKt {
public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
public static synthetic fun flux$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Flux;
public static synthetic fun flux$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Flux;
}

public final class kotlinx/coroutines/reactor/MonoKt {
public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
public static final fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono;
public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public final class kotlinx/coroutines/rx2/RxChannelKt {
}

public final class kotlinx/coroutines/rx2/RxCompletableKt {
public static final fun rxCompletable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable;
public static final fun rxCompletable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable;
public static synthetic fun rxCompletable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable;
public static synthetic fun rxCompletable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable;
}

Expand All @@ -35,17 +37,23 @@ public final class kotlinx/coroutines/rx2/RxConvertKt {
}

public final class kotlinx/coroutines/rx2/RxFlowableKt {
public static final fun rxFlowable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable;
public static final fun rxFlowable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable;
public static synthetic fun rxFlowable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static synthetic fun rxFlowable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable;
}

public final class kotlinx/coroutines/rx2/RxMaybeKt {
public static final fun rxMaybe (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe;
public static final fun rxMaybe (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe;
public static synthetic fun rxMaybe$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe;
public static synthetic fun rxMaybe$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe;
}

public final class kotlinx/coroutines/rx2/RxObservableKt {
public static final fun rxObservable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable;
public static final fun rxObservable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable;
public static synthetic fun rxObservable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable;
public static synthetic fun rxObservable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable;
}

Expand All @@ -54,7 +62,9 @@ public final class kotlinx/coroutines/rx2/RxSchedulerKt {
}

public final class kotlinx/coroutines/rx2/RxSingleKt {
public static final fun rxSingle (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single;
public static final fun rxSingle (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single;
public static synthetic fun rxSingle$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single;
public static synthetic fun rxSingle$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single;
}

Expand Down
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/common/src/NonCancellable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,9 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job {
*/
@InternalCoroutinesApi
override fun attachChild(child: ChildJob): ChildHandle = NonDisposableHandle

/** @suppress */
override fun toString(): String {
return "NonCancellable"
}
}
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/jvm/test/TestBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.scheduling.*
import org.junit.*
import java.util.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
import kotlin.test.*

private val VERBOSE = systemProp("test.verbose", false)
Expand Down Expand Up @@ -213,4 +214,6 @@ public actual open class TestBase actual constructor() {
assertTrue(result.exceptionOrNull() is T, "Expected ${T::class}, but had $result")
return result.exceptionOrNull()!! as T
}

protected suspend fun currentDispatcher() = coroutineContext[ContinuationInterceptor]!!
}
20 changes: 10 additions & 10 deletions reactive/coroutines-guide-reactive.md
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ fun <T, R> Publisher<T>.fusedFilterMap(
context: CoroutineContext, // the context to execute this coroutine in
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
) = GlobalScope.publish<R>(context) {
) = publish<R>(context) {
collect { // collect the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
Expand All @@ -638,7 +638,7 @@ fun CoroutineScope.range(start: Int, count: Int) = publish<Int> {
```kotlin
fun main() = runBlocking<Unit> {
range(1, 5)
.fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
.fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" })
.collect { println(it) } // print all the resulting strings
}
```
Expand Down Expand Up @@ -673,7 +673,7 @@ import kotlin.coroutines.*
-->

```kotlin
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = GlobalScope.publish<T>(context) {
fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = publish<T>(context) {
this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher<T>
val current = this
other.openSubscription().consume { // explicitly open channel to Publisher<U>
Expand Down Expand Up @@ -711,7 +711,7 @@ The following code shows how `takeUntil` works:
fun main() = runBlocking<Unit> {
val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval
val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms
slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it
slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it
}
```

Expand Down Expand Up @@ -742,7 +742,7 @@ import kotlin.coroutines.*
-->

```kotlin
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) {
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = publish<T>(context) {
collect { pub -> // for each publisher collected
launch { // launch a child coroutine
pub.collect { send(it) } // resend all element from this publisher
Expand Down Expand Up @@ -783,7 +783,7 @@ The test code is to use `merge` on `testPub` and to display the results:

```kotlin
fun main() = runBlocking<Unit> {
testPub().merge(coroutineContext).collect { println(it) } // print the whole stream
testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream
}
```

Expand Down Expand Up @@ -865,7 +865,7 @@ import kotlin.coroutines.CoroutineContext
-->

```kotlin
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
Expand Down Expand Up @@ -915,7 +915,7 @@ import kotlin.coroutines.CoroutineContext
-->

```kotlin
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish<Int>(context) {
for (x in start until start + count) {
delay(time) // wait before sending each number
send(x)
Expand Down Expand Up @@ -1067,12 +1067,12 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html
<!--- MODULE kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.reactive -->
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-flowable.html
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
<!--- END -->


2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Conversion functions:
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
<!--- MODULE kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.reactive -->
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html
[org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first.html
[org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-default.html
[org.reactivestreams.Publisher.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-else.html
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactive/src/Convert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import kotlin.coroutines.*
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
@ObsoleteCoroutinesApi
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = GlobalScope.publish(context) {
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = publish(context) {
for (t in this@asPublisher)
send(t)
}
38 changes: 30 additions & 8 deletions reactive/kotlinx-coroutines-reactive/src/Publish.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")

package kotlinx.coroutines.reactive

import kotlinx.atomicfu.*
Expand All @@ -11,6 +13,7 @@ import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
import kotlin.coroutines.*
import kotlin.internal.LowPriorityInOverloadResolution

/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
Expand All @@ -26,25 +29,44 @@ import kotlin.coroutines.*
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with corresponding [coroutineContext] element.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @param context context of the coroutine.
* @param block the coroutine code.
*/
@ExperimentalCoroutinesApi
public fun <T> publish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> {
require(context[Job] === null) { "Publisher context cannot contain job in it." +
"Its lifecycle should be managed via subscription. Had $context" }
return publishInternal(GlobalScope, context, block)
}

@Deprecated(
message = "CoroutineScope.publish is deprecated in favour of top-level publish",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("publish(context, block)")
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
@LowPriorityInOverloadResolution
public fun <T> CoroutineScope.publish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = publishInternal(this, context, block)

/** @suppress For internal use from other reactive integration modules only */
@InternalCoroutinesApi
public fun <T> publishInternal(
scope: CoroutineScope, // support for legacy publish in scope
context: CoroutineContext,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
val newContext = newCoroutineContext(context)
val newContext = scope.newCoroutineContext(context)
val coroutine = PublisherCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
Expand Down
9 changes: 4 additions & 5 deletions reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class IntegrationTest(
) : TestBase() {

enum class Ctx {
MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context },
MAIN { override fun invoke(context: CoroutineContext): CoroutineContext = context.minusKey(Job) },
DEFAULT { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Default },
UNCONFINED { override fun invoke(context: CoroutineContext): CoroutineContext = Dispatchers.Unconfined };

Expand All @@ -39,7 +39,7 @@ class IntegrationTest(

@Test
fun testEmpty(): Unit = runBlocking {
val pub = CoroutineScope(ctx(coroutineContext)).publish<String> {
val pub = publish<String>(ctx(coroutineContext)) {
if (delay) delay(1)
// does not send anything
}
Expand Down Expand Up @@ -77,7 +77,7 @@ class IntegrationTest(
@Test
fun testNumbers() = runBlocking<Unit> {
val n = 100 * stressTestMultiplier
val pub = CoroutineScope(ctx(coroutineContext)).publish {
val pub = publish(ctx(coroutineContext)) {
for (i in 1..n) {
send(i)
if (delay) delay(1)
Expand All @@ -99,8 +99,7 @@ class IntegrationTest(
fun testCancelWithoutValue() = runTest {
val job = launch(Job(), start = CoroutineStart.UNDISPATCHED) {
publish<String> {
yield()
expectUnreached()
hang {}
}.awaitFirst()
}

Expand Down
Loading

0 comments on commit d100a3f

Please sign in to comment.