Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems with flatMap function on ReceiveChannel<T> #180

Closed
konrad-kaminski opened this issue Dec 4, 2017 · 6 comments
Closed

Problems with flatMap function on ReceiveChannel<T> #180

konrad-kaminski opened this issue Dec 4, 2017 · 6 comments

Comments

@konrad-kaminski
Copy link
Contributor

The ReceiveChannel<E>.flatMap function currently works like this:

  1. take the 1st ReceiveChannel<R> from transform function and pipe all values from it to the flatMap result channel,
  2. take the 2nd ReceiveChannel<R> from transform function and pipe all values from it to the flatMap result channel,
    3 ...
    n ...

This way the following test:

val channel = produce {
    send("1000")
    send("1")
}

val list = channel.flatMap { value ->
    produce {
        delay(value.toLong())
        send(value)
    }
}.toList()

assertEquals(listOf("1", "1000"), list)

fails. That is if a value from the 2nd ReceiveChannel<R> comes earlier than the value from the 1st ReceiveChannel<R> it still is processed after the value from the 1st ReceiveChannel<R>.

I think it should work on a first-come-first-go basis.
A preliminary version of my version of flatMap is sth like this:

private fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined,
                                          transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
    produce(context) {
        val transformed = mutableListOf<ReceiveChannel<R>>()
        val deferredList = mutableListOf<Deferred<ReceiveChannel<R>>>()
        var finishedSource = false

        while (isActive &&
                (!finishedSource || transformed.isNotEmpty() || deferredList.isNotEmpty())) {
            selectUnbiased<Unit> {
                this@flatMap.onReceiveOrNull { value ->
                    when {
                        value != null -> deferredList += async(context) { transform(value) }
                        else          -> finishedSource = true
                    }
                }

                transformed.forEach { output ->
                    output.onReceiveOrNull { value ->
                        when {
                            value != null -> this@produce.send(value!!)
                            else          -> transformed -= output
                        }
                    }
                }

                deferredList.forEach { deferred ->
                    deferred.onAwait { output ->
                        deferredList -= deferred
                        transformed += output
                    }
                }
            }
        }
    }

It might not work correctly all the times (e.g. I haven't done any analysis on cancelling behaviour), but it should give you an idea of how I imagine it.

@elizarov
Copy link
Contributor

elizarov commented Dec 5, 2017

Thanks. Indeed, a sequential flatMap is not that useful for asynchronous channels. My suggestion to solve this problem is this:

  1. Reserve flatMap name for synchronous cases, e.g. when transform returns either a Collection or a Sequence (implement the corresponding versions)
  2. Deprecate flatMap with ReceiveChannel transforms to avoid any confusion and consider reintroducing it under the name of concatMap. For the sake of consistency, concat operation on two channels should be introduced, too.
  3. Use different names for concurrent cases. We can use other Rx-inspired names for concurrent versions (I don't see big benefit in reinventing new names here):
  • mergeMap that would process all items on first-come-first-served basis. The corresponding merge operator for two channels would also help.
  • switchMap that would take items from the first channel only only the next channel is received (switch to it), etc. The corresponding switch operator for two channels might also be helpful.

@jcornaz
Copy link
Contributor

jcornaz commented Dec 5, 2017

What would you think about this kind implementations :

// flatten. I name it 'merge' for consistency with 'mergeMap'
fun <T> ReceiveChannel<ReceiveChannel<T>>.merge(context: CoroutineContext = DefaultDispatcher): ReceiveChannel<T> = produce(context) {
  consumeEach { channel ->
    launch { channel.consumeEach { send(it) } }
  }
}

// flatMap. I name it 'mergeMap' to avoid confusion with the current 'flatMap' implementation which is more a 'concatMap'
fun <T, R> ReceiveChannel<T>.mergeMap(
    context: CoroutineContext = Unconfined,
    transform: suspend (T) -> ReceiveChannel<R>
): ReceiveChannel<R> = map(context, transform).merge()

And to merge two or more channels :

fun <T> Iterable<ReceiveChannel<T>>.merge(context: CoroutineContext = DefaultDispatcher): ReceiveChannel<T> = produce(context) {
  forEach { channel ->
    launch { channel.consumeEach { send(it) } }
  }
}

fun <T> merge(vararg channels: ReceiveChannel<T>): ReceiveChannel<T> = channels.asIterable().merge()

@fvasco
Copy link
Contributor

fvasco commented Dec 5, 2017

The two merge extension methods can run only on DefaultDispatcher without issue.
Moreover we should consider to receive all elements until the inner ReceiveChannel is empty to avoid/retard launch allocation.

fun <T> ReceiveChannel<ReceiveChannel<T>>.merge(): ReceiveChannel<T> = produce {
    consumeEach { channel ->
        var item = channel.poll()
        while (item != null) {
            try {
                send(item)
            } catch (e: Exception) {
                channel.cancel()
                throw e
            }
            item = channel.poll()
        }

        if (!channel.isClosedForReceive)
            launch(coroutineContext) { channel.consumeEach { send(it) } }
    }
}

@elizarov
Copy link
Contributor

These kind of parallel-mapping operators are also discussed in #172. The alternative design that is being discussed there is to have an optional parallism parameters to all the regular operators (like flatMap in our case) instead of defining a new set of "parallel" operators.

@dobriy-eeh
Copy link

It would be nice to take parallism parameter from CoroutineContext -> number of working threads by default.
High level business logic doesn't know working pool settings. Business logic code just want to say: make this work parallel as fast as you can.

@qwwdfsad
Copy link
Contributor

Closing as obsolete, flatMap on the channel is deprecated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants