Skip to content

Commit

Permalink
Deprecate consumeEach on Publisher, ObservableSource and MaybeSource,…
Browse files Browse the repository at this point in the history
… introduce collect extension instead to be aligned with Flow

First wave of #1080
  • Loading branch information
qwwdfsad committed Apr 24, 2019
1 parent fcfabee commit deda827
Show file tree
Hide file tree
Showing 31 changed files with 91 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public final class kotlinx/coroutines/reactive/AwaitKt {
}

public final class kotlinx/coroutines/reactive/ChannelKt {
public static final fun collect (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumeEach (Lorg/reactivestreams/Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun openSubscription (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public final class kotlinx/coroutines/rx2/RxAwaitKt {
}

public final class kotlinx/coroutines/rx2/RxChannelKt {
public static final fun collect (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumeEach (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun consumeEach (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun openSubscription (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down
43 changes: 21 additions & 22 deletions reactive/coroutines-guide-reactive.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ fun main() = runBlocking<Unit> {
}
// print elements from the source
println("Elements:")
source.consumeEach { // consume elements from it
source.collect { // collect elements from it
println(it)
}
// print elements from the source AGAIN
println("Again:")
source.consumeEach { // consume elements from it
source.collect { // collect elements from it
println(it)
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ elements is produced. It becomes the actual stream of elements on _subscription_
a different stream of elements, depending on how the corresponding implementation of `Publisher` works.

The [publish] coroutine builder, that is used in the above example, launches a fresh coroutine on each subscription.
Every [Publisher.consumeEach][org.reactivestreams.Publisher.consumeEach] invocation creates a fresh subscription.
Every [Publisher.collect][org.reactivestreams.Publisher.collect] invocation creates a fresh subscription.
We have two of them in this code and that is why we see "Begin" printed twice.

In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can iterate
Expand All @@ -217,9 +217,9 @@ method with it.

### Subscription and cancellation

An example in the previous section uses `source.consumeEach { ... }` snippet to open a subscription
An example in the previous section uses `source.collect { ... }` snippet to open a subscription
and receive all the elements from it. If we need more control on how what to do with
the elements that are being received from the channel, we can use [Publisher.openSubscription][org.reactivestreams.Publisher.openSubscription]
the elements that are being received from the channel, we can use [Publisher.collect][org.reactivestreams.Publisher.collect]
as shown in the following example:

<!--- INCLUDE
Expand Down Expand Up @@ -269,7 +269,7 @@ listener prints "Finally" to confirm that the subscription is actually being clo
is never printed because we did not consume all of the elements.

We do not need to use an explicit `cancel` either if iteration is performed over all the items that are emitted
by the publisher, because it is being cancelled automatically by `consumeEach`:
by the publisher, because it is being cancelled automatically by `collect`:

<!--- INCLUDE
import io.reactivex.*
Expand All @@ -285,7 +285,7 @@ fun main() = runBlocking<Unit> {
.doOnComplete { println("OnComplete") } // ...
.doFinally { println("Finally") } // ... into what's going on
// iterate over the source fully
source.consumeEach { println(it) }
source.collect { println(it) }
}
```

Expand All @@ -308,7 +308,7 @@ Finally

Notice, how "OnComplete" and "Finally" are printed before the last element "5". It happens because our `main` function in this
example is a coroutine that we start with [runBlocking] coroutine builder.
Our main coroutine receives on the channel using `source.consumeEach { ... }` expression.
Our main coroutine receives on the flowable using `source.collect { ... }` expression.
The main coroutine is _suspended_ while it waits for the source to emit an item.
When the last item is emitted by `Flowable.range(1, 5)` it
_resumes_ the main coroutine, which gets dispatched onto the main thread to print this
Expand Down Expand Up @@ -422,7 +422,7 @@ You can subscribe to subjects from a coroutine just as with any other reactive s
<!--- INCLUDE
import io.reactivex.subjects.BehaviorSubject
import kotlinx.coroutines.*
import kotlinx.coroutines.rx2.consumeEach
import kotlinx.coroutines.rx2.collect
-->

```kotlin
Expand All @@ -432,7 +432,7 @@ fun main() = runBlocking<Unit> {
subject.onNext("two")
// now launch a coroutine to print everything
GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context
subject.consumeEach { println(it) }
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
Expand Down Expand Up @@ -476,7 +476,7 @@ fun main() = runBlocking<Unit> {
subject.onNext("two")
// now launch a coroutine to print the most recent update
launch { // use the context of the main thread for a coroutine
subject.consumeEach { println(it) }
subject.collect { println(it) }
}
subject.onNext("three")
subject.onNext("four")
Expand Down Expand Up @@ -584,7 +584,7 @@ It is straightforward to use from a coroutine:
```kotlin
fun main() = runBlocking<Unit> {
// Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default
range(Dispatchers.Default, 1, 5).consumeEach { println(it) }
range(Dispatchers.Default, 1, 5).collect { println(it) }
}
```

Expand Down Expand Up @@ -623,7 +623,7 @@ fun <T, R> Publisher<T>.fusedFilterMap(
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
) = GlobalScope.publish<R>(context) {
consumeEach { // consume the source stream
collect { // consume the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
}
Expand All @@ -644,7 +644,7 @@ fun CoroutineScope.range(start: Int, count: Int) = publish<Int> {
fun main() = runBlocking<Unit> {
range(1, 5)
.fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
.consumeEach { println(it) } // print all the resulting strings
.collect { println(it) } // print all the resulting strings
}
```

Expand Down Expand Up @@ -717,7 +717,7 @@ The following code shows how `takeUntil` works:
fun main() = runBlocking<Unit> {
val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval
val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms
slowNums.takeUntil(coroutineContext, stop).consumeEach { println(it) } // let's test it
slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // let's test it
}
```

Expand Down Expand Up @@ -749,9 +749,9 @@ import kotlin.coroutines.*

```kotlin
fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) {
consumeEach { pub -> // for each publisher received on the source channel
collect { pub -> // for each publisher collected
launch { // launch a child coroutine
pub.consumeEach { send(it) } // resend all element from this publisher
pub.collect { send(it) } // resend all element from this publisher
}
}
}
Expand Down Expand Up @@ -792,7 +792,7 @@ The test code is to use `merge` on `testPub` and to display the results:

```kotlin
fun main() = runBlocking<Unit> {
testPub().merge(coroutineContext).consumeEach { println(it) } // print the whole stream
testPub().merge(coroutineContext).collect { println(it) } // print the whole stream
}
```

Expand Down Expand Up @@ -975,7 +975,7 @@ fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int

fun main() = runBlocking<Unit> {
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.consumeEach { println("$it on thread ${Thread.currentThread().name}") }
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
```

Expand Down Expand Up @@ -1021,7 +1021,7 @@ fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool)
rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
.consumeEach { println("$it on thread ${Thread.currentThread().name}") }
.collect { println("$it on thread ${Thread.currentThread().name}") }
}
job.join() // wait for our coroutine to complete
}
Expand Down Expand Up @@ -1077,8 +1077,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
<!--- MODULE kotlinx-coroutines-reactive -->
<!--- INDEX kotlinx.coroutines.reactive -->
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html
[org.reactivestreams.Publisher.consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/consume-each.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-flowable.html
Expand Down
9 changes: 8 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 0): ReceiveChannel<T
return channel
}

// Will be promoted to error in 1.3.0, removed in 1.4.0
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
openSubscription().consumeEach(action)

/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
*/
public suspend inline fun <T> Publisher<T>.consumeEach(action: (T) -> Unit) =
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
openSubscription().consumeEach(action)

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
Expand Down
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class IntegrationTest(
assertNSE { pub.awaitLast() }
assertNSE { pub.awaitSingle() }
var cnt = 0
pub.consumeEach { cnt++ }
pub.collect { cnt++ }
assertThat(cnt, IsEqual(0))
}

Expand All @@ -67,7 +67,7 @@ class IntegrationTest(
assertThat(pub.awaitLast(), IsEqual("OK"))
assertThat(pub.awaitSingle(), IsEqual("OK"))
var cnt = 0
pub.consumeEach {
pub.collect {
assertThat(it, IsEqual("OK"))
cnt++
}
Expand Down Expand Up @@ -125,7 +125,7 @@ class IntegrationTest(

private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
var last = 0
pub.consumeEach {
pub.collect {
assertThat(it, IsEqual(++last))
}
assertThat(last, IsEqual(n))
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactive/test/PublishTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class PublishTest : TestBase() {
}
}
try {
pub.consumeEach {
pub.collect {
throw TestException()
}
} catch (e: TestException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PublisherCompletionStressTest : TestBase() {
runBlocking {
withTimeout(5000) {
var received = 0
range(Dispatchers.Default, 1, count).consumeEach { x ->
range(Dispatchers.Default, 1, count).collect { x ->
received++
if (x != received) error("$x != $received")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PublisherMultiTest : TestBase() {
jobs.forEach { it.join() }
}
val resultSet = mutableSetOf<Int>()
observable.consumeEach {
observable.collect {
assertTrue(resultSet.add(it))
}
assertThat(resultSet.size, IsEqual(n))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class FluxCompletionStressTest : TestBase() {
runBlocking {
withTimeout(5000) {
var received = 0
range(Dispatchers.Default, 1, count).consumeEach { x ->
range(Dispatchers.Default, 1, count).collect { x ->
received++
if (x != received) error("$x != $received")
}
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class FluxMultiTest : TestBase() {
fun testIteratorResendUnconfined() {
val n = 10_000 * stressTestMultiplier
val flux = GlobalScope.flux(Dispatchers.Unconfined) {
Flux.range(0, n).consumeEach { send(it) }
Flux.range(0, n).collect { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
assertEquals((0 until n).toList(), list)
Expand All @@ -57,7 +57,7 @@ class FluxMultiTest : TestBase() {
fun testIteratorResendPool() {
val n = 10_000 * stressTestMultiplier
val flux = GlobalScope.flux {
Flux.range(0, n).consumeEach { send(it) }
Flux.range(0, n).collect { send(it) }
}
checkMonoValue(flux.collectList()) { list ->
assertEquals((0 until n).toList(), list)
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class FluxSingleTest {
fun testFluxIteration() {
val flux = GlobalScope.flux {
var result = ""
Flux.just("O", "K").consumeEach { result += it }
Flux.just("O", "K").collect { result += it }
send(result)
}

Expand All @@ -193,7 +193,7 @@ class FluxSingleTest {
fun testFluxIterationFailure() {
val flux = GlobalScope.flux {
try {
Flux.error<String>(RuntimeException("OK")).consumeEach { fail("Should not be here") }
Flux.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
send("Fail")
} catch (e: RuntimeException) {
send(e.message!!)
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class FluxTest : TestBase() {
expect(2)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(3)
observable.consumeEach {
observable.collect {
expect(8)
assertEquals("OK", it)
}
Expand All @@ -131,7 +131,7 @@ class FluxTest : TestBase() {
}
}
try {
pub.consumeEach {
pub.collect {
throw TestException()
}
} catch (e: TestException) {
Expand Down
18 changes: 16 additions & 2 deletions reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,30 @@ public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
return channel
}

// Will be promoted to error in 1.3.0, removed in 1.4.0
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) =
openSubscription().consumeEach(action)

// Will be promoted to error in 1.3.0, removed in 1.4.0
@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)"))
public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) =
openSubscription().consumeEach(action)

/**
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
*/
public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit) =
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit) =
openSubscription().consumeEach(action)

/**
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
*/
public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) =
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit) =
openSubscription().consumeEach(action)

@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class FlowAsObservableTest : TestBase() {
expect(1)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
observable.consumeEach {
observable.collect {
expect(5)
assertEquals("OK", it)
}
Expand All @@ -106,7 +106,7 @@ class FlowAsObservableTest : TestBase() {
}.asObservable()

try {
observable.consumeEach {
observable.collect {
expect(3)
throw TestException()
}
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-rx2/test/FlowableTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class FlowableTest : TestBase() {
expect(2)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
expect(3)
observable.consumeEach{
observable.collect {
expect(8)
assertEquals("OK", it)
}
Expand All @@ -131,7 +131,7 @@ class FlowableTest : TestBase() {
}
}
try {
pub.consumeEach {
pub.collect {
throw TestException()
}
} catch (e: TestException) {
Expand Down
Loading

0 comments on commit deda827

Please sign in to comment.