-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge Flow operator is missing #1491
Comments
Wrote a quick and dirty custom implementation. Is it missing anything? Felt too easy... fun merge(vararg flows: Flow<Int>): Flow<Int> = channelFlow {
val channel = this.channel
flows.forEach { flow ->
launch {
flow
.onEach {
channel.send(it)
}
.collect()
}
}
} |
You can implement it by fun <T> merge(vararg flows: Flow<T>): Flow<T> = flowOf(*flows).flattenMerge() Usagefun main(): Unit = runBlocking {
val one: Flow<String> = flow {
delay(100)
emit("one 1")
delay(100)
emit("one 2")
delay(100)
emit("one 3")
}
val two: Flow<String> = flow {
delay(50)
emit("two 1")
delay(150)
emit("two 2")
}
val three: Flow<String> = flow {
emit("three 1")
delay(60)
emit("three 2")
delay(120)
emit("three 3")
}
merge(one, two, three).collect { println(it) }
} Result
|
fun <T> merge(vararg flows: Flow<T>): Flow<T> =
flowOf(*flows).flattenMerge(concurrency = flows.length) |
I wrote tests for over 17 simultaneously merged flows, which worked fine. Not sure if use of |
They "do work", but with the default |
The "gotcha" with concurrency limit is a strong motivator for a separate |
The
merge()
operator is missingUse Case
Representing clicks on a keypad:
Note: I think
fun Flow<T>.merge(otherFlow: Flow<T>): Flow<T>
is an anti-pattern because it creates hard to read flows, so I am intentionally not requesting it 😛The text was updated successfully, but these errors were encountered: