Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Flow operator is missing #1491

Closed
ZakTaccardi opened this issue Aug 30, 2019 · 7 comments
Closed

Merge Flow operator is missing #1491

ZakTaccardi opened this issue Aug 30, 2019 · 7 comments
Labels

Comments

@ZakTaccardi
Copy link

ZakTaccardi commented Aug 30, 2019

The merge() operator is missing

fun Flow<T>.merge(vararg flows: Flow<T>): Flow<T> = TODO()

Use Case

Representing clicks on a keypad:

val key1: View
val key2: View
val numberClicks: Flow<Int> = merge(
  key1.clicks().map { 1 },
  key2.clicks().map { 2 }
)

Note: I think fun Flow<T>.merge(otherFlow: Flow<T>): Flow<T> is an anti-pattern because it creates hard to read flows, so I am intentionally not requesting it 😛

@ZakTaccardi
Copy link
Author

ZakTaccardi commented Aug 31, 2019

Wrote a quick and dirty custom implementation. Is it missing anything? Felt too easy...

fun merge(vararg flows: Flow<Int>): Flow<Int> = channelFlow {
    val channel = this.channel
    flows.forEach { flow ->
        launch {
            flow
                .onEach {
                    channel.send(it)
                }
                .collect()
        }
    }
}

@BoxResin
Copy link

BoxResin commented Aug 31, 2019

You can implement it by flattenMerge.

fun <T> merge(vararg flows: Flow<T>): Flow<T> = flowOf(*flows).flattenMerge()

Usage

fun main(): Unit = runBlocking {
    val one: Flow<String> = flow {
        delay(100)
        emit("one 1")
        delay(100)
        emit("one 2")
        delay(100)
        emit("one 3")
    }

    val two: Flow<String> = flow {
        delay(50)
        emit("two 1")
        delay(150)
        emit("two 2")
    }

    val three: Flow<String> = flow {
        emit("three 1")
        delay(60)
        emit("three 2")
        delay(120)
        emit("three 3")
    }

    merge(one, two, three).collect { println(it) }
}

Result

three 1
two 1
three 2
one 1
three 3
one 2
two 2
one 3

flattenMerge is handy, but note that it's in the preview state yet.
The documentation link: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html

@elizarov
Copy link
Contributor

elizarov commented Sep 3, 2019

flattenMerge is in preview, because it has an optional concurrency parameter and we have not decided yet what exactly to do with it and with other "concurrent" operators (see #1147). In particular it means that if you are planning to use the above merge implementation with more than 16 flows, then you should rewrite it like this:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = 
    flowOf(*flows).flattenMerge(concurrency = flows.length)

@ZakTaccardi
Copy link
Author

ZakTaccardi commented Sep 13, 2019

if you are planning to use the above merge implementation with more than 16 flows

I wrote tests for over 17 simultaneously merged flows, which worked fine. Not sure if use of runBlockingTest affects this

@elizarov
Copy link
Contributor

if you are planning to use the above merge implementation with more than 16 flows

I wrote tests for over 17 simultaneously merged flows, which worked fine. Not sure if use of runBlockingTest affects this

They "do work", but with the default concurrency values of 16 the last flow will start collecting only after one for the first 16 completes.

@elizarov elizarov added the flow label Sep 18, 2019
@elizarov
Copy link
Contributor

The "gotcha" with concurrency limit is a strong motivator for a separate merge operator.

@ZakTaccardi
Copy link
Author

They "do work", but with the default concurrency values of 16 the last flow will start collecting only after one for the first 16 completes.

@elizarov were you referring to @BoxResin 's implementation? And mine should be fine because I launch a new coroutine for each flow?

qwwdfsad added a commit that referenced this issue Dec 11, 2019
qwwdfsad added a commit that referenced this issue Dec 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants