-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flow.share operator #1261
Comments
This looks good as #1086 implementation. One thing missing is the sharing timeout, which use case is described in #1086 (comment) |
One question about 'starting' a shared/published flow through In #1086 , starting a shared Flow could only happen on a Flow that was already shared, because Say I want to create an extension function out of
I can't be sure that the receiver of the
and non-shared/non-published Flows won't be able to get 'connect'ed. |
What happens to the cached "history" when moving from one to zero collectors (cancelling the upstream collection), then back to one? Does the history get preserved across upstream collections or lost? |
@zach-klippenstein That is why Rx does not use With Rx' |
To me, the name "share" implies only multicasting, so zero would be a reasonable default. The caching/replaying behavior is an additional feature that is often used along with multicasting, but not necessarily implied by the name. |
@streetsofboston That's how Rx works, yes, but I was asking for clarification about this operator because I didn't see it specified in the issue description. I believe The behavior of saving the cache across "connections" would be similar to Jake Wharton's RxReplayingShare. |
I would argue that this should be an option, since it could be useful to have it preserve data over cancelling (for example to use this flow as a memory cache if data takes a while to load after subscribing) or it would not be useful (for example when data from the flow changes frequently, rendering old values obsolete), depending on the context
I think that there shouldn't be any default value, but |
@zach-klippenstein That's how I modeled my prototype implementation of the ConnectableFlow as well, for the Roman wants to not expose yet another type (ConnectableFlow) from the library and only add one more function called And maybe the argument in favor of exposing a new In the end it's up to the maintainers of kotlinx.coroutines to weigh these factors and make a decision :-) However, I don't think overloading the proposed Maybe a separate, but related, issue should be opened for a |
@matejdro With the Cache survives re-activations of the flow: Cache does not survive re-activations of the flow: |
Thanks for timeout reminder. I'll see how it can be incorporate here is some nicer way than just adding an additional "timeout" parameter to this operator. |
We actually plan this kind of operator, tentatively to be called |
I don't see how to decouple |
But the scope remains active until job is working |
@fvasco |
|
I actually envision a single implementation where "replay size" (of zero or more) is just a configuration parameter. |
That proposal lookg good. It leaves me wondering about
|
If you need to handle errors for all collectors in a shared flow (that is, handle emitter's errors), then you have to put error-handling operators before you call |
My consideration is that the launch trick has non-trivial consequences. |
You can design it to just have one "share" method with a replay-size parameter, but my point was that decoupling the share and cache use cases is possible. I'd argue for decoupling, but you can argue for one "share" method as well. |
@streetsofboston This is slightly different. AutoConnect preserves the upstream connection as well. RxReplayingShare will still disconnect upstream when the ref count hits zero (like the In general I am wary of
@matejdro The function of |
In @streetsofboston's proposal, You could also make flow.share() // returns a multicasted flow
.cache(1) // configures the multicasted flow to cache 1 emission I don't mind using a single operator for both, as long as the operator name communicates that it does both multicasting and caching (e.g. |
Having a |
How will the updated signature
|
|
Would it close the flow if all of the consumers are cancelled? In terms of the above example that would mean canceling the long-running operation in the flow if both consumers, Screen A and Screen B, are closed. |
@voddan Yes, that's the whole point of having a |
Here is my take on the share operator: https://gist.github.com/matejdro/a9c838bf0066595fb52b4b8816f49252 It supports conflating and timeouts described in #1086 (comment). From what I see, implementation is a bit simpler than above links (it just uses an actor that loops through events), but it is not very fast - I would not use it to emit thousands events per second. However, in my projects I almost never need that big of a throughput, so it fits my use cases. |
hi, @matejdro I have seen your gist, and it looks good. What do you think of using an extension function in this loc? https://gist.github.com/matejdro/a9c838bf0066595fb52b4b8816f49252#file-multicastflowtest-kt-L139 |
Yes, that would probably be a good idea. |
I've also came across to use this for a usecase. Right know to do it is imperative premature collecting the flow. |
I just noticed this one:
I think this unnecessarily complicates the use of this feature. Flow is already scoped by the downstream (it starts collecting when first collector starts and stops when all collectors get cancelled). Introducing another scope would significantly complicate implementations where such share operator is exposed by the stateless objects. |
It definitely makes the stream more complicated to reason about, but I don't think it does so unnecessarily. Multicasting inherently involves a scope that is separate from that of any of the downstream collectors, because no downstream collector is guaranteed to be active for the lifetime of the upstream collection. That scope could be implicit, as in the case with a reference counting approach, but sometimes you need it to be explicit: if the owner of the multicasting operation has its own scope (e.g. is a service-type object that gets shutdown when a user logs out of an app), it may want to explicitly manage the lifetime of that upstream collection and cancel it when the service object itself is shutdown. |
I see the purpose here. So we would need support for both explicit and implicit scopes? |
@zach-klippenstein Issue #1086 discusses such a class for explicitly managing the scope: Instead of the |
Scoping by downstream is not enough. We need the real scope to provide the context for pulling upstream data. We cannot just take the context of the first downstream subscriber. It can get destroyed, while other subscribers should not be suffering. In a case where you don't really care, you can always use |
📣 Asking here a question to the community interested in this issue. There are tons of use-cases listed here and in #1086, but they all revolve around working with never-ending streams of incoming events or state updates. However, a Now, in Rx there is a |
For me, having a share/replay operator would allow to implement some sort of if-/else logic with flows, i.e. to distribute elements into multiple flows based on criteria. For example, I need the first 100 item with With share/replay:
In this case, I don't want to execute the input Flow multiple times (which could be a file consisting of a couple of GB of data on a network drive etc.). |
For reference: This is the (I wasn't able to get rid of the consumerCount parameter, because in my case I collect the first flow before starting the second one)
|
I work on a fairly large mobile app, and we use |
@matthiaswelz Thanks for your use-case. Note, that in your use-case you are not actually "widely sharing" the flow. You don't actually publish the shared flow for later consumption by an unknown number of future collectors. On the contrary, here you know, in advance, that there are going to be two downstream flows that are going to process your upstream flow. In this case, it looks like Rx-like I think that your use-case of "splitting the flow in a few other flows" should be covered by a separate operator which we can tentatively call |
@zach-klippenstein But do you ever call |
There is a design for |
This issue is superseded by the worked-out design of sharing operators. See #2047 |
While it's not release, a good enough worakround is that simple extension:
|
The
share()
operator operates onFlow<T>
and returnsFlow<T>
. It shall have the following semantics. The resulting flow is cold, but when one collector shart collecting from it, the it starts to collect from the upstream flow, activating the emitter upstream. The trick of theshare
operator is that when additional collectors appear in the downstream, they all "share" the same upstream emitter.For example, consider the flow:
If you launch two collectors:
Then you shall see "Emit 0 / A: got 0 / Emit 0 / B: got 0 / Emit 1 / A: got 1 / Emit 1 / B: got 1 / ...".
However, if you change the flow to
val flow = flow { /* same */ }.share()
, then you shall see "Emit 0 / A: got 0 / B: got 0 / Emit 1 / A: got 1 / B: got 1 / ...", that is one emission gets delivered to both collectors.Now if you need to artificially "start" the shared flow simply to keep it active, then you can always launch a dummy collector:
launch { flow.collect {} }
that works as a "reference" which is active until you cancel the resulting job.TBD: Share operator might need some configuration with the respect to how much "history" to keep in memory for "late collectors". So far it seems that one non-negative integer is enough (with zero -- new collector don't get any history, with one -- only the most recent value, with more -- the specified number of recent values). What is unclear is what shall be the default value (if any).
UPDATE: It will have to be, actually, a
shareIn(scope)
operator. Otherwise, the scope it works in will be bound the the first collectors and when this callector is cancelled it will not be able to maintain emissions to other collectors.The text was updated successfully, but these errors were encountered: