Closed
Description
The ReceiveChannel<E>.flatMap
function currently works like this:
- take the 1st
ReceiveChannel<R>
fromtransform
function and pipe all values from it to theflatMap
result channel, - take the 2nd
ReceiveChannel<R>
fromtransform
function and pipe all values from it to theflatMap
result channel,
3 ...
n ...
This way the following test:
val channel = produce {
send("1000")
send("1")
}
val list = channel.flatMap { value ->
produce {
delay(value.toLong())
send(value)
}
}.toList()
assertEquals(listOf("1", "1000"), list)
fails. That is if a value from the 2nd ReceiveChannel<R>
comes earlier than the value from the 1st ReceiveChannel<R>
it still is processed after the value from the 1st ReceiveChannel<R>
.
I think it should work on a first-come-first-go basis.
A preliminary version of my version of flatMap
is sth like this:
private fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Unconfined,
transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
produce(context) {
val transformed = mutableListOf<ReceiveChannel<R>>()
val deferredList = mutableListOf<Deferred<ReceiveChannel<R>>>()
var finishedSource = false
while (isActive &&
(!finishedSource || transformed.isNotEmpty() || deferredList.isNotEmpty())) {
selectUnbiased<Unit> {
this@flatMap.onReceiveOrNull { value ->
when {
value != null -> deferredList += async(context) { transform(value) }
else -> finishedSource = true
}
}
transformed.forEach { output ->
output.onReceiveOrNull { value ->
when {
value != null -> this@produce.send(value!!)
else -> transformed -= output
}
}
}
deferredList.forEach { deferred ->
deferred.onAwait { output ->
deferredList -= deferred
transformed += output
}
}
}
}
}
It might not work correctly all the times (e.g. I haven't done any analysis on cancelling behaviour), but it should give you an idea of how I imagine it.