-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Problems with flatMap function on ReceiveChannel<T> #180
Comments
Thanks. Indeed, a sequential
|
What would you think about this kind implementations : // flatten. I name it 'merge' for consistency with 'mergeMap'
fun <T> ReceiveChannel<ReceiveChannel<T>>.merge(context: CoroutineContext = DefaultDispatcher): ReceiveChannel<T> = produce(context) {
consumeEach { channel ->
launch { channel.consumeEach { send(it) } }
}
}
// flatMap. I name it 'mergeMap' to avoid confusion with the current 'flatMap' implementation which is more a 'concatMap'
fun <T, R> ReceiveChannel<T>.mergeMap(
context: CoroutineContext = Unconfined,
transform: suspend (T) -> ReceiveChannel<R>
): ReceiveChannel<R> = map(context, transform).merge() And to merge two or more channels : fun <T> Iterable<ReceiveChannel<T>>.merge(context: CoroutineContext = DefaultDispatcher): ReceiveChannel<T> = produce(context) {
forEach { channel ->
launch { channel.consumeEach { send(it) } }
}
}
fun <T> merge(vararg channels: ReceiveChannel<T>): ReceiveChannel<T> = channels.asIterable().merge() |
The two
|
These kind of parallel-mapping operators are also discussed in #172. The alternative design that is being discussed there is to have an optional |
It would be nice to take |
Closing as obsolete, |
The
ReceiveChannel<E>.flatMap
function currently works like this:ReceiveChannel<R>
fromtransform
function and pipe all values from it to theflatMap
result channel,ReceiveChannel<R>
fromtransform
function and pipe all values from it to theflatMap
result channel,3 ...
n ...
This way the following test:
fails. That is if a value from the 2nd
ReceiveChannel<R>
comes earlier than the value from the 1stReceiveChannel<R>
it still is processed after the value from the 1stReceiveChannel<R>
.I think it should work on a first-come-first-go basis.
A preliminary version of my version of
flatMap
is sth like this:It might not work correctly all the times (e.g. I haven't done any analysis on cancelling behaviour), but it should give you an idea of how I imagine it.
The text was updated successfully, but these errors were encountered: