-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel flow processing #1147
Comments
We do plan a set of separate functions for that and to close #172 with them. However, the key challenge here is to introduce a composable set of concurrency operators as opposed to specific operators like flow.concurrent(n) {
// inside here we can have a sequence of flow operations,
// each doing its own thing concurrently
// this: Flow<T> here similarly to `flowWith` operator
filter { ... }.map { ... }.filter { ... } // -> Flow<R>
} // -> Flow<Flow<R>> Alternatively you can call // different way to write the same code as above
flow.concurrent(n).map {
// it: Flow<T> here
it.filter { ... }.map { ... }.filter { ... } // -> Flow<R>
} // -> Flow<Flow<R>> Note, that the result is going to be flow.concurrent(n) { // this: Flow<Input>
map(transform) // -> Flow<Mapped>
.reduce(reducer) // -> R
} // -> Flow<R>
.reduce(reducer) // we need to reduce again to get -> R So that part of design for concurrent terminal operators is still TBD with map-reduce being the primary use-case. The similar design problem we have with terminal concurrent collect:
|
I see the idea, but parallel is not necessary concurrent. On the contrary, in my example, there is no concurrency. The key feature needed for parallel Maybe we can modify your structure by replacing inner Flow by lazy As for initial example, I work a lot with parallel data processing and map/reduce workflows. And basically the only operation that could be parallelized is |
@altavir Actually, it is the converse with the respect to the traditional distinction between concurrency and parallelism (it is quite a recent distinction, < 20 years old, but quite established by now). Concurrent is not necessarily parallel. A However, even if you take concurrent operator, it does not necessarily mean that it is going to actually run different calls to On JVM you can run your version |
I won't argue with you about the terminology, especially remembering that significant fraction of my knowledge of it comes from your articles. Still, it does not change the fact that all we need for parallel processing is a way to start multiple computations ahead of time. I am trying to implement it via transforming a |
The problem was in the conflict of lazy deferred with structured concurrency. I've fixed it by replacing deferred by my imitation. The working code is available here. In theory, it should be possible to make concurrent |
Let's hold off a discussion on implementation strategy a bit. There are lots of ways to implement it. What are you trying to achieve in the end? You've started this thread with a proposal for |
The particular case is a streaming mathematical processor with ability to parallel computations. In the version before the The typical application of such mechanism is the analysis of time series, when you have a very long block of numbers and you want to continuously run it through some chain of mathematical operations (like moving window, Fourier transform etc) and drop result somewhere without loading the whole block in the memory. Some of those operations could be parallelized. For example, one could split the incoming data into fixed chunks and then apply FFT to each chunk in parallel. Basically it is the same old map-reduce technology. And very similar to Java Stream processing (I would use Java Streams, but I want to implement it on MPP). Flow seems to be good replacement for my initial construct since next Flow always consumes previous one and the Flow does not start generating until final consumer is called. The internal sequential processing also probably allows to avoid context switching performance losses. |
Actually,
|
I think I'm debating the same need over on https://discuss.kotlinlang.org/t/must-go-fast-sequences-flows-bounded-channels/12494 - but with less knowledge of the various options than this thread, so I'm going to sit quietly and watch here instead. :) Why I think my need is the same use case:
And I have too many options and not enough knowledge if there is one that is "use this until we make the Kotlin canonical lib"
|
@qwwdfsad Seems to be working. I looked into the source and it does the trick with two channels I was intending to do. I've changed signature a bit: I think it mostly solves my case. Few additional comments:
|
On Spring side, the use case is pretty common: you want to chain 2 remote webservices, the first will return a list of IDs and the second will allow to get the details associated with each IDs. For that you are typically using
So developers will naturally use a sequential I agree with @gildor when he says I would personally be in favor to support parallel processing by default in |
I'm all for adding concurrent APIs, too, but in an explicit way. There's a lot to design, to think about, though. I've already presented one way to provide explicit concurrent API in this thread. The other would be something like this:
This is both explicit about concurrency and about what happens with all those concurrently mapped things -- they are all merged at the end (the other possible termination is to Actually, I have one really radical idea and this thread is good place to share it. I'm thinking that we should deprecate
At the same time |
I like the simplification that leads to more consistency with only |
I like the idea (the radical one) in general. The The question is how the intermediate flow looks from the type system point of view. I do not like |
@sdeleuze You need to convert your flow to "promises" at some point and then another point to actualize those promises. In between those points you have concurrency. So you always have "in concurrency" and "out of concurrency" gates. The question is how you will describe what happens between them and will you allow to open one gate, pass the flow somewhere and close it there. |
W.r.t. to types, I think that It is somewhat conceptually similar to how |
A "concurrent block" can avoid to define a dedicated type, ie:
or something similar. Moreover it provides a not concurrent sandbox to use a not pure functions. |
I understand why you are tempted by this design, but it would be pretty heavy to use and I am quite puzzled by the need to use 3 operators to do something as simple as a concurrent map where I see much more use cases for operation with latency like network ones than for sequential one. The original design was beautiful because simple, that looks like less clean to me. Why not keeping simple |
There are several design rules to be taken into account before we make a decision:
Having spelled this out, having an optional |
My thought exactly. This way it could even inherit |
I do not think it will be a problem. The order, of course is important. |
@altavir indeed, the buffer size parameter is useless when you are emitting flow with a single element, I wanted to show the general idea.
Now you can use it in the following way:
Buffer size controls backpressure of the consumer and concurrency controls the amount of in-flight batches. Note that even if we completely remove |
Could you please elaborate a bit? What we (may) expect a user to write:
What one may want to write in RxJava
(1) and (3) are basically the same. The (2) option depends on the If I understood you correctly, you want (2) to be the default option in this scenario.
The follow-up question is how can we improve |
@elizarov Please find bellow my feedback
Kotlin coroutines are designed on that principle, and I agree that's a good idea to avoid too much surprise here, so my initial proposal to make
Be sure I try to provide feedback with Kotlin mindset in mind. Forcing developers to use 2 additional operators that will most of the time just surround a single one just to enable concurrency is IMO nor elegant or pragmatic. Of course anybody can define custom operators, but here I think we are talking about one of the most common use cases.
Based on my experience of working on server-side reactive stuff for a few years, I strongly disagree with that statement about concurrency when applied to
The need for concurrency does not come from data set size, but from latency. I am going to try to elaborate more as asked by @qwwdfsad. A company is developing micro-services (with Spring or Ktor) that needs to request external slow REST webservices that are sadly not cachable. Locally I only have a The first thing that developers will do that after reading that After bad feedback saying that the application is slow, the developer updates its code and find that she/he has to wrap the suspending function artificially in a Let's see what are the options to improve this for coroutines 1.3.
|
I wish to expose a little issue regarding the using of Example:
but concurrenty now is
tunig these parameters is not easy, to add more operators to chain become tricky. See other solution
This looks better, but we have to consider to implement all operators for the new type, not only a limited set. ids()
.concurrent(concurrency = 4).filter { checkId(it) }.merge()
.someOperator()
.concurrent(concurrency = 4).map { awaitDetails(it) }.merge() The same issue, unless Lastly the sandbox proposal: ids()
.concurrent(concurrency = 4) { flow ->
flow
.filter { checkId(it) }
.someOperator()
.map { awaitDetails(it) }
} |
Good point, I agree that tuning these parameters is not easy. On one side, specifying this concurrency parameter (with proper documentation) makes that more explicit so users will have a better idea of how many concurrent request their could be. On the other side, Coroutines are cheap, so maybe we don't really care of the exact value for most cases since that adds a potentially annoying cognitive load to choose the proper value. Maybe users should just choose the concurrency mode they want (using a dedicated operator or a boolean or enum optional parameter) ... |
What about using
|
Let us explore this a bit. Assume that we have If the result of Assume that I have a flow of records and I want to store all of them to DB:
But that does storing to DB sequentially. What if
That's a really clear way to express my intent to concurrently store those record to DB! But now, if a result of
So what are the possible solutions to this conundrum in the context of our exploration of
But that means that we'll need operators to convert back to the sequential mode and it cannot be just That, in turn, means that if I want to do "concurrent map" and get a regular sequential flow as a result, then I have to write somewhat longer-looking:
|
You are looking through a prism of your narrow use-case, since there are clearly many use-cases for which the order is not important (see, for example, an example of storing to DB above). First of all, let me note that none of implementations of
What happens here is that it creates a lot of inner flows and they are started to be collected concurrently. Now, the first one to emit, gets merged first. You can play with an example I wrote here to convince yourself that it does indeed reorder elements. Try rewriting so that it does preserve an order, yet remains concurrent. Now, order-presenting "concurrentMap" is indeed a use-case. One of many. This use-case is an intricate one, though, because it is hard to design in a composable way. I can envision on order-preserving "concurrentMap" implementation as a separate operator, but it does not easily lend itself for a decomposition into simpler operators to combine it with "concurrentFilter" and others. It is one of the puzzles we have in the design of concurrent operators for flows. |
Just my two cents. In the very likely event that we get a
|
Here's an implementation of |
Here's my take on the implementation inspired by @Dominaezzz's and |
I have a design that specifies a separate type ParallelFlow and thus its own sub DSL. Unfortunately, I've run into a couple of conceptional problems with using |
@akarnokd That is one of the designs we have on the table for this issue. No decision yet. Advantage of this "separate type" design is that it is a DSL that can be tailored to the concurrent processing, but the disadvantage of it is that is not quite orthogonal in nature. It requires duplication of many operators. All Note, that if we follow with this design it will be called Our working theory, if we follow with "separate type" design, is that we can provide only a few reduction operators out-of-the-box with Moreover, in this case, existing |
Are there plans to support partition keys? Aka consistent hashing. |
So, is this issue a thing, or are we more or less looking at flatMapMerge/flatMapConcat in the forseeable future? I like the idea of chaining parallel flows. I actually DO think we should have if not the full glory of parallel streams, at least some of it. I hate having to bail to streams to get the functionality, and it's awkward when trying to work within coroutine context. |
I'd put in a vote for the separate-type flow despite the downsides of extra API volume. It's I think a lot safer and ends up being cleaner to reason about, despite some DRY. We're really not talking about massive volumes of code, if I'm estimating this right - sure more than reusing fully, but it's not that big a delta, all things considered. |
@elizarov In the spirit of #2065, how about fun <T, R> Flow<T>.concurrentTransform(
transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> It is lower - level but generic enough to give us full flexibility and be usable until |
@pacher I really like that idea (So much, that I'm going to try and implement it). The only issue I see is how to handle back pressure. |
@Dominaezzz I don't know. I did not think it through that much, it was just an idea, maybe even not a good one. As per the topic of the issue, here is my take on order-preserving "concurrentMap" : fun <T, R> Flow<T>.concurrentMap(scope: CoroutineScope, concurrencyLevel: Int, transform: suspend (T) -> R): Flow<R> = this
.map { scope.async { transform(it) } }
.buffer(concurrencyLevel)
.map { it.await() } Can anybody see problems with this approach? It looks good to me because it is just a concise combination of a few basic flow operators. But it is preserving order compared to @qwwdfsad 's flatMapMerge snippet. |
@pacher That's a good take on order-preserving concurrent-map. There are two things that I see. For one, the way
The other one is that |
@cgruber We plan it to be the next "big thing" after flow sharing. |
@patcher once I guessed at From this end-user who doesn't know the depths of performance/contexts/coroutines/dispatchers, a +1 vote for something like this, it was very easy to make use of. |
Is there any update on this design item? This feature would make ETL applications so much easier and very performant. Thank you! |
@pacher @elizarov Here's my attempt on order-preserving concurrent-map operator after seeing your posts. inline fun <T, R> Flow<T>.mapInOrder(concurrencyLevel: Int, crossinline map: suspend (T) -> R): Flow<R> =
Semaphore(permits = concurrencyLevel).let { semaphore ->
channelFlow {
collect {
semaphore.acquire()
send(async { map(it) })
}
}
.map { it.await() }
.onEach { semaphore.release() }
} Is this approach better? Can it be further improved? |
We've been thinking about this at Google recently, as we definitely have use cases that require preservation of order and concurrent evaluation of RPCs. (Consider search results: order is meaningful, and returning results as fast as possible is a high priority.) There's also some subtlety in exception handling: in e.g. @giulioscattolin's example above, in master...lowasser:map-concurrent is a prototype implementation that tries to address all those issues, as well as both the concurrencyLevel and buffer knobs. |
as this is not the first hit on google, but the first relevant hit on flows map reduce, can this be fast-forwarded and hopefully closed to the state of 1.5.x please? |
I found myself needing concurrent mapping, and seeing how concurrent flow design has not been decided on yet, I thought I'd try to write up a sketch implementation of it myself. Here's the gist.
Usage example: flow {
for (i in 0 until 100) {
emit(i)
delay(10)
}
}.concurrent().map { value ->
delay((0 until 100).random().toLong())
value + 1
}.filter { value ->
delay(1)
value % 2 == 0
}.flatMap { value ->
flow {
delay(5)
emit(value)
delay(200)
emit(value + 1)
}
}.merge(preserveOrder = true).collect { value ->
println(value)
} Here, /**
* Turn this flow into a [ConcurrentFlow].
*
* The following operations like [map], [filter], [flatMap], [reduce], [collect],
* will be run concurrently rather than sequentially.
*/
fun <T> Flow<T>.concurrent(concurrencyLevel: Int = 64): ConcurrentFlow<T> And /**
* Merge a concurrent flow back into a regular, sequential flow.
*
* If [preserveOrder] is true, items are emitted in the same order they were
* present in the initial flow; otherwise, in an arbitrary order. Note that
* preserving order requires additional buffering and means the collector has
* to wait for prior items to become available instead of processing new items
* as they appear, which slows things down.
*/
fun <T> ConcurrentFlow<T>.merge(preserveOrder: Boolean): Flow<T> It's also possible to directly concurrently collect a channelFlow {
collect { value ->
send(value)
}
} |
There is actually a trivial solution to this, which our app uses (and we wrote a small library for). The "philosophy" is to be able to provide transforms "that behave like the normal transform, but faster". Basically, it should never change the order of output. Basically, you can use a fun <T, R> Flow<T>.mapAsync(transform: suspend (value: T) -> R): Flow<R> =
channelFlow {
collect { e -> send(async { transform(e) }) }
}
.map { it.await() } And... that's it. This actually gives you (roughly) fun <T, R> Flow<T>.mapAsync(concurrency: Int, transform: suspend (value: T) -> R): Flow<R> {
require(concurrency >= 1)
if (concurrency == 1) return map(transform)
return channelFlow {
collect { e -> send(async { transform(e) }) }
}
.buffer(concurrency - 2)
.map { it.await() }
} The You can also use What's nice is that (almost) every other flow transform can be expressed in terms of fun <T> Flow<T>.filterAsync(predicate: suspend (value: T) -> Boolean): Flow<T> =
mapAsync { it to pred(it) }.transform { if (it.second) emit(it.first) } Same as Things get trickier if you want to implement things like fun <T, R> Flow<T>.transformAsync(transform: suspend FlowCollector<R>.(T) -> Unit): Flow<R> =
channelFlow<Channel<R>> {
collect { e ->
val channel = Channel<R>(Channel.BUFFERED) // This can be customized
launch {
flow { transform(e) }.collect { channel.send(it) }
channel.close()
}
send(channel)
}
}
.transform { emitAll(it) } This works quite well for our app, and makes it super easy to process things concurrently in a "real-world server app". However, there are 3 important caveats to take into account.
fun <T, R> Flow<T>.mapAsync(concurrency: Int, buffer: Int = concurrency, transform: suspend (value: T) -> R): Flow<R> {
require(concurrency >= 1)
require(buffer >= 0)
channelFlow {
val semaphore = Semaphore(concurrency)
collect { e ->
semaphore.acquire()
send(async {transform(e).also { semaphore.release() }})
}
}
.buffer(buffer)
.map { it.await() }
} This is NOT something we actually use in our application. So caveat emptor. There might be edge cases I did not think about, and it's not production tested.
myFlow
.mapAsync { operation1(it) }
.mapAsync { operation2(it) } Given this example, assume that Generally speaking, this has not caused issues for us, but we are "aware" to try to merge as much as possible all our operations into a single async operation. If you do want such behavior, then bugaevec's answer seems to be what you are looking for. So, I hope this helps. I do not believe this would be a good fit for the standard library. But it's something that's pretty easy to implement and maintain yourself. It works 100% of the time. The caveats are mostly about things not being quite as concurrent as you might they might be. But for "everyday operations", we use this everywhere. In particular, any time we have a |
Oh, I forgot. Similarly, you can implement short-circuiting async suspend fun <T> Flow<T>.allAsync(predicate: suspend (T) -> Boolean): Boolean =
channelFlow { collect { e -> launch { send(predicate(e)) } } }.firstOrNull { !it } ?: true
}
suspend fun <T> Flow<T>.anyAsync(predicate: suspend (T) -> Boolean): Boolean =
channelFlow { collect { e -> launch { send(predicate(e)) } } }.firstOrNull { it } ?: false
} Which can be quite useful for "Only proceed if all/any of these (suspend) conditions are true". |
It makes sense that
Flow
and allmap
operations are sequential by default, but it makes sense to allow some heavy operations to be performed in parallel on explicitly provided dispatcher.I've implemented the feature based on existing
flowOn
implementation:It seems to be working fine, but I can't use internal kotlinx,coroutines methods. I think that this use-case require a separate function in the library.
The text was updated successfully, but these errors were encountered: