-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
launchConsumeEach terminal operator (name needed) #328
Comments
@elizarov Your snippet should be |
Why not |
@LouisCAD Thanks for correction on my snipped.
|
|
I see analogies to PR #274, specifically for Is this issue related to that PR? |
All concepts are indeed related. |
Hi @elizarov The exposed problem is to create an observer for a value (a particular kind of actor), @jcornaz and I discussed here: #274 (comment) #274 (comment) It is possible solve this issue creating an actor using a "channel" as parameter instead of "capacity". |
How about fun <T> ReceiveChannel<T>.drainWith(block: suspend (T) -> Unit, ...) =
launch(...) {
consumeEach {
block(it)
}
} |
@fvasco You are right. In the #274 you propose To recap. Now we have to write:
Your proposal:
It is definitely better. But now we see that
|
Some more ideas: E.g.
|
Here is another idea, how about introducing This class will spawn coroutine for val consumer = CoroutineChannelConsumer(channel, CommonPool)
// does launch(CommPool) channel.consume { } under the hood
consumer.consumeEach { ... } // non-suspend fun
// does launch(CommPool) channel.consume { } under the hood
consumer.consumeEachIndexed { ... } // non-suspend fun Pros
Cons
|
What about |
Here's what I'm using at the moment: import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.coroutineContext
fun <E> ReceiveChannel<E>.doOnEach(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
action: suspend (E) -> Unit
) = launch(context, parent = parent) { consumeEach { action(it) } }
suspend fun <E> ReceiveChannel<E>.doOnEach(action: suspend (E) -> Unit) = launch(coroutineContext) {
consumeEach { action(it) }
} |
I just had two new name ideas: Upside: Same naming as |
Here's my new version with structured concurrency support (non suspend version has been removed): import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.coroutineContext
suspend inline fun <E> ReceiveChannel<E>.doOnEach(
context: CoroutineContext = EmptyCoroutineContext,
noinline action: suspend (E) -> Unit
) = CoroutineScope(coroutineContext).launch(context) { consumeEach { action(it) } } |
We've been designing the similar operator for #254 cold stream and the idea we've decided to go with will be usable for channels, too If you have a
The other way to use this API would be like this:
|
|
A correction: The released replacement for the proposed |
ConflatedBroadcastChannel
is a good fit for dataflow programming. It represents an asynchronously updatable value and its consuming operation "does the right thing" of delivering the most recent value and all the subsequent update. In the domain of dataflow programming the common operation is to process each value in a separate coroutine. For example, in Android:This
launch(context) { consumeEach { ... } }
pattern is very common, so some dedicated terminal operator seems to be in order for it. Possible names are:launchConsumeEach
is an obvious, but verbose choice, see https://github.com/dmytrodanylyk/coroutines-arch/blob/master/library/src/main/java/com/kotlin/arch/ChannelsExt.kt by @dmytrodanylykThe challenge that we have is that all terminal operators currently defined on
ReceiveChannel
are suspending functions that work in the context of invoker. All non-suspending operator that receivecontext
as parameter perform some kind of transform and return another representation. This immediately suggests thatlaunchConsumeEach
become such an intermediate operator and its result should be aJob
. But can we find a better name for it? Some verb in style of the other operators we have?The text was updated successfully, but these errors were encountered: