Skip to content

Problems with flatMap function on ReceiveChannel<T> #180

Closed
@konrad-kaminski

Description

@konrad-kaminski

The ReceiveChannel<E>.flatMap function currently works like this:

  1. take the 1st ReceiveChannel<R> from transform function and pipe all values from it to the flatMap result channel,
  2. take the 2nd ReceiveChannel<R> from transform function and pipe all values from it to the flatMap result channel,
    3 ...
    n ...

This way the following test:

val channel = produce {
    send("1000")
    send("1")
}

val list = channel.flatMap { value ->
    produce {
        delay(value.toLong())
        send(value)
    }
}.toList()

assertEquals(listOf("1", "1000"), list)

fails. That is if a value from the 2nd ReceiveChannel<R> comes earlier than the value from the 1st ReceiveChannel<R> it still is processed after the value from the 1st ReceiveChannel<R>.

I think it should work on a first-come-first-go basis.
A preliminary version of my version of flatMap is sth like this:

private fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined,
                                          transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
    produce(context) {
        val transformed = mutableListOf<ReceiveChannel<R>>()
        val deferredList = mutableListOf<Deferred<ReceiveChannel<R>>>()
        var finishedSource = false

        while (isActive &&
                (!finishedSource || transformed.isNotEmpty() || deferredList.isNotEmpty())) {
            selectUnbiased<Unit> {
                this@flatMap.onReceiveOrNull { value ->
                    when {
                        value != null -> deferredList += async(context) { transform(value) }
                        else          -> finishedSource = true
                    }
                }

                transformed.forEach { output ->
                    output.onReceiveOrNull { value ->
                        when {
                            value != null -> this@produce.send(value!!)
                            else          -> transformed -= output
                        }
                    }
                }

                deferredList.forEach { deferred ->
                    deferred.onAwait { output ->
                        deferredList -= deferred
                        transformed += output
                    }
                }
            }
        }
    }

It might not work correctly all the times (e.g. I haven't done any analysis on cancelling behaviour), but it should give you an idea of how I imagine it.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions