Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

launchConsumeEach terminal operator (name needed) #328

Closed
elizarov opened this issue Apr 13, 2018 · 18 comments
Closed

launchConsumeEach terminal operator (name needed) #328

elizarov opened this issue Apr 13, 2018 · 18 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Apr 13, 2018

ConflatedBroadcastChannel is a good fit for dataflow programming. It represents an asynchronously updatable value and its consuming operation "does the right thing" of delivering the most recent value and all the subsequent update. In the domain of dataflow programming the common operation is to process each value in a separate coroutine. For example, in Android:

val x = ConflatedBroadcastChannel() // a model for some value x

launch(UI) { 
    x.consumeEach { someView.text = it.toString } 
}

This launch(context) { consumeEach { ... } } pattern is very common, so some dedicated terminal operator seems to be in order for it. Possible names are:

launchConsumeEach is an obvious, but verbose choice, see https://github.com/dmytrodanylyk/coroutines-arch/blob/master/library/src/main/java/com/kotlin/arch/ChannelsExt.kt by @dmytrodanylyk

The challenge that we have is that all terminal operators currently defined on ReceiveChannel are suspending functions that work in the context of invoker. All non-suspending operator that receive context as parameter perform some kind of transform and return another representation. This immediately suggests that launchConsumeEach become such an intermediate operator and its result should be a Job. But can we find a better name for it? Some verb in style of the other operators we have?

@LouisCAD
Copy link
Contributor

@elizarov Your snippet should be x.consumeEach { … } as there's no receiver.

@LouisCAD
Copy link
Contributor

LouisCAD commented Apr 13, 2018

Why not someChannel.launchConsumer(UI, parent = job) { … }?

@elizarov
Copy link
Contributor Author

@LouisCAD Thanks for correction on my snipped.

launchConsumer does not make it obvious that I'm operating on each element in the block. We've been discussing if we could use onEach as a name (short is on topic), but Sequence.onEach is an intermediate operator that also sends each element downstream, which is not what we want to achieve here.

@LouisCAD
Copy link
Contributor

someChannel.consumeEachAsync(UI, parent = job) { … }? I'm not 100% fan because async returns a Deferred, not a Job, but I find it natural to read otherwise.

@fvasco
Copy link
Contributor

fvasco commented Apr 14, 2018

I see analogies to PR #274, specifically for map, combine and launchObserver.

Is this issue related to that PR?

@elizarov
Copy link
Contributor Author

All concepts are indeed related.

@fvasco
Copy link
Contributor

fvasco commented Apr 14, 2018

Hi @elizarov
I consider two discussion strictly related, your problem is to create a "asynchronously updatable value" and this looks covered to PR #274, if that PR does not solve your problem maybe we need to find a valid use case for #274.

The exposed problem is to create an observer for a value (a particular kind of actor), @jcornaz and I discussed here: #274 (comment) #274 (comment)

It is possible solve this issue creating an actor using a "channel" as parameter instead of "capacity".

@venkatperi
Copy link
Contributor

venkatperi commented Apr 14, 2018

How about drainWith to indicate a terminal operation?

fun <T> ReceiveChannel<T>.drainWith(block: suspend (T) -> Unit, ...) =
  launch(...) {
    consumeEach {
      block(it)
    }
  }

@elizarov
Copy link
Contributor Author

@fvasco You are right. In the #274 you propose launchObserver operator. It is, indeed, more convenient that the solution we have now.

To recap. Now we have to write:

launch(UI) { // sets context for the last consumeEach -- very unintuitive!
    source
        .filter { ... }
        .consumeEach { ... }
}

Your proposal:

source
    .filter { ... }
    .launchObserver(UI) { 
        consumeEach { ... }
    }

It is definitely better. But now we see that launchObserver and consumeEach are going to be often used together. We can do even better and combine then into launchConsumeEach:

source
    .filter { ... }
    .launchConsumeEach(UI) { ... }

@dmytrodanylyk
Copy link
Contributor

@elizarov

Some more ideas:

E.g.

channel.consumeEachWithCoroutine(CommonPool) {  }
channel.consumeEachIndexedWithCoroutine(CommonPool) {  }

channel.consumeEachWithContext(CommonPool) {  }
channel.consumeEachIndexedWithContext(CommonPool) {  }

channel.consumeEachByCoroutine(CommonPool) {  }
channel.consumeEachIndexedByCoroutine(CommonPool) {  }

channel.consumeEachWith(CommonPool) {  }
channel.consumeEachIndexedWith(CommonPool) {  }

channel.consumeEachBy(CommonPool) {  }
channel.consumeEachIndexedBy(CommonPool) {  }

@dmytrodanylyk
Copy link
Contributor

Here is another idea, how about introducing CoroutineChannelConsumer class (maybe with better name)? 🤔

This class will spawn coroutine for consumeEach and consumeEachIndexed.

val consumer = CoroutineChannelConsumer(channel, CommonPool)

// does launch(CommPool) channel.consume { } under the hood
consumer.consumeEach { ... }  // non-suspend fun

// does launch(CommPool) channel.consume { } under the hood
consumer.consumeEachIndexed { ... }   // non-suspend fun

Pros

  • function names can stay exactly the same

Cons

  • need to wrap channel with CoroutineChannelConsumer object

@LouisCAD
Copy link
Contributor

What about channel.doOnEach { … }?
It is similar to Android KTX naming for extensions like doOnLayout { … }.
However, it does not includes the word consume. I don't know if documenting the fact that it consumes the channel would be enough.

@LouisCAD
Copy link
Contributor

Here's what I'm using at the moment:

import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.coroutineContext

fun <E> ReceiveChannel<E>.doOnEach(
        context: CoroutineContext = DefaultDispatcher,
        parent: Job? = null,
        action: suspend (E) -> Unit
) = launch(context, parent = parent) { consumeEach { action(it) } }

suspend fun <E> ReceiveChannel<E>.doOnEach(action: suspend (E) -> Unit) = launch(coroutineContext) {
    consumeEach { action(it) }
}

@LouisCAD
Copy link
Contributor

I just had two new name ideas: channel.invokeOnEach { .. } and channel.invokeForEach { .. }.

Upside: Same naming as invokeOnCompletion.
Downside: Not so clear that it consumes the channel.

@LouisCAD
Copy link
Contributor

LouisCAD commented Sep 27, 2018

Here's my new version with structured concurrency support (non suspend version has been removed):

import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.coroutineContext

suspend inline fun <E> ReceiveChannel<E>.doOnEach(
        context: CoroutineContext = EmptyCoroutineContext,
        noinline action: suspend (E) -> Unit
) = CoroutineScope(coroutineContext).launch(context) { consumeEach { action(it) } }

Gist link: https://gist.github.com/LouisCAD/ec559f6d62f796e9287145c4797400f7/45eedb3c2c612f2ed1e1be18a9199ff6fbd14b70

@elizarov
Copy link
Contributor Author

We've been designing the similar operator for #254 cold stream and the idea we've decided to go with will be usable for channels, too If you have a channel it would look like this:

channel.asFlow().launchIn(scope) { 
    onEach { it -> action(it) }
    catch<SomeException> { it -> handleException(it) }
    finally { releaseSomeResources() }
}

The other way to use this API would be like this:

scope.launchFlow(channel.asFlow()) { 
    // same DSL here
}

@qwwdfsad
Copy link
Contributor

channel.asFlow().launchIn is a way to go, fixed in 1.3.0-M1

@elizarov
Copy link
Contributor Author

elizarov commented Aug 2, 2019

A correction: The released replacement for the proposed channel.launchConsumeEach(scope) { block } is channel.asFlow().onEach { block }.launchIn(scope)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants