-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider sharing a Flow through a ConnectableFlow #1086
Comments
Can you please add some actual use-case as in "here is the what application is trying to do and that is what it wants to achive" without tying this use-case to the actual solution in the form of |
Use case for ConnectableFlow: Cold Streams are often Unicast. When an observer/consumer starts observing, the source of the Stream is started again. The Hot Streams are often Multicast where observers/consumers can come and go without them restarting anything. Sometimes, it is desirable to have Cold Streams that are Multicast. The source of the stream may not always be active (it may be expensive to have the stream being active all the time or starting a new one each time), and starting the stream does not depend on whether any observers/consumers are actually observing. Starting and stopping the cold multicast stream needs to be managed explicitly. The proposed Examples of such cold multicast streams are BLE (Bluetooth Low Energy) characteristics that notify the observer of data changing on an external device, e.g. a BLE thermometer or any other continuous monitoring device. Starting a characteristic keeps a connection open between the observer and the BLE device and this can be somewhat expensive. It is best to manage this explicitly, e.g. have the user click a 'connect' and 'disconnect' button or to manage it implicitly by only starting the connection when observers on the UI are observing the device. Make a cold unicast Implicitly manage the connection of a |
Can you, please, provide an example with an actual application scenario (as in "here is the actual application I'm writing and here is why I need it") where a cold stream needs to be mutlicast, but it cannot be always active (so you cannot just use always active |
@streetsofboston To me, that Bluetooth GATT notifications use case is not a cold stream at all, but a hot one with manual start and stop, so effectively just a channel (possibly broadcast), and two custom functions to start/subscribe and stop/unsubscribe. |
@LouisCAD You're correct. I also do believe the addition of I'm currently not working on any BLE app right now, but have been in the past on an apps that used plain callbacks the RxAndroidBLE library. There we make use of ConnectableObservables. But that has been a while and I need to dig into the past a little to get a good use-case. :-) |
@streetsofboston Actually, I made a library for Bluetooth Low Energy with coroutines a while ago (and I keep it updated). Notifications support works with channels, although I've not needed notifications support myself. If you think that part of the API may be improved, feel free to open an issue there! |
All that is true, but wrapping GATT into cold stream provides benefits of automatic state management. Programmer can easily forget to call manual start/stop method (especially stop). But with flow, this is done automatically (start on Another use case: Developing a mobile app that uses GPS location at various points. Location is exposed as flow stream. Whenever part of app needs access to Location, it starts |
This last example might be somewhat related to #1097, but using Edit: although it's only the analogue of the |
Let me summarize what I'm getting out of this thread so far. I see a bunch of use-cases here for an operator that automatically actives a flow on a first collector, shares the emitted events with all the other collectors, and cancels the flow instance as soon as the last collector is done. Easy, usefull, no chance of resource leakage, no need to introduce any new types like A kind of manual activation/deactivation of the flow sounds like a use-cases for a channel to me. You can already do Does this sound like a plan? |
For my use case, this sounds perfect. |
Next to just Take for example the way Firebase Database provides lists of items. Firebase emits events like 'item added', 'item removed', 'item moved', etc. Clients can construct the entire list from these events. databaseEvents
.scan(emptyList()) { list, event ->
list.apply(event)
}
.replay(1).refCount() Replacing |
@elizarov I also tried to implement the The main issue with using Maybe I'm overlooking something and missing an implementation that works without the introduction of something new like a |
@streetsofboston Indeed. You cannot easily emulate post-factum activation of the flow via existing APIs. But what is the use-case for that? Why would you need to get a reference to the flow that is not "active" yet and then activate it later? |
A use-case is a data-store (key-value) store that is relatively expensive to start-up (and shut down). There is a UI (eg Android Activities/Fragments) that examines and shows key-value pairs from this data store. Using a However, the moving of the user from Activity to Activity or from Fragment to Fragment should not dictate when the data-store starts (and shuts down), since this can be unpredictable and may cause expensive re-starts of the data-store. Instead, a Service (or some other 'manager') can be used to explicitly (re)start and stop the data store, reducing the amount of re-starts by keeping the data-store alive a little longer. The order of the appearance of the UI-screens, that need the key-value pairs from the data-store, and the start of the Service should be independent: The UI should be unaware of this Service and focus on just the key-value pairs |
Wouldn't simple |
'Dummy collect' shows a different intention than explicitly starting and stopping the data store, and has more potential to be removed by accident. |
Yes I see now that this would not look good in code. Even if some better function like Also now that you mentioned the activities, timeout on Use case for this is that Android's screens often go through configuration change, which destroys the screen and creates new one. It goes like this:
This wastes resources, because resource behind Flow stream is closed for like one millisecond and then reopened again. Timeout (like RxJava's |
@matejdro RxJava has such operators. It has overloaded Having some code that acts as a manager to start and stop the stream exactly when it wants leaves it up to the exact needs of the use case. Maybe later we can add those overloaded |
Yes, I know about |
Sorry @matejdro. Me reading and typing an answer/reply on a phone is not the smartest thing to do :) |
Why does this issue still have |
Does this issue need any additional use-cases? |
I don't think so. I made a formal proposal in #1221 that leaves a few open questions. They certainly need an answer to make an implementation possible. |
What open-source projects you'd recommend too look at that use/need this kind of conntable flow (it is Ok if they are using Rx now)? |
Right now, I'm not aware of any open-source project that may need it. But I'm not aware off all open-source projects out there :) For our own private repos, for our clients, we're often using |
Here is my proposal for |
@elizarov Looks good. Looking forward into the future, where I can imagine that folks would have the need for |
I'm proposing that you can use |
The only missing feature is |
I see! Yep, adding a buffer-size param to An extension function |
The |
I know this issue has been all but superseded by #1261, but FYI RxJava is considering changing their Connectable API to make the state machine more explicit when reconnecting: ReactiveX/RxJava#5628 |
If I understand suspension correctly, there could be some trouble with connecting and collecting because you have to val connectable = ...
launch { connectable.collect { println } }
launch { connectable.collect { println } }
launch { connectable.connect() } With a |
@akarnokd I'm not sure I understand this question. Can you, please, clarify. |
If a
Collectors are launched concurrently. First one activates the flow. Second might see all of it if subscribed fast enough, but could miss a few if out of luck. That is why in my opinion Here is an example: I would image some kind of The closest solution from Rx world I found is I use it quite a lot in many different circumstances and it would be great to have something like this for Flow. The key difference from share is that all subscriptions happen within provided lambda, so operator can safely connect to upstream after everybody is subscribed and no events are lost. |
The problem is that it is generally unclear (to me) when the collector(s) are all lined up to receive items. For example, the multicast operator equivalent to RxJava's In RxJava, consumers are by default non-blocking and synchronous, thus, we generally know all the consumers to the subject have lined up and ready to receive items the moment subscribe returns. |
@pacher This is quite a valid concern. There are two different use-cases here:
|
Another example of similar feature is shiny new teeing collector from java 12 @elizarov Exactly! I would formulate the difference as follows:
My example of data processing is just one use-case. As I mentioned
It is amazing how far you can get with it This is probably already another use-case territory, but as usual I just want you to keep it in mind while in the design and discussion phase. (maybe @akarnokd Totally agree. It is tricky and I don't see a simple solution either, otherwise would just code something for myself instead of bothering all of you. |
There is a design for |
Basic use-cases described herein are now taken into account, too, in the design of sharing operators as described in #2047, so I'm closing this issue. |
Enhancement: Add
ConnectableFlow
to the Flow API.Each time an observer of a
Flow
starts collecting, the source of the Flow is executed, much like a call tosubscribe
of aFlowable
in RxJava executes theFlowable
's source.This change is to defer the execution of the source of the Flow until a specific point in time, possibly after one or more observers started collecting the 'shared' Flow.
The use-case for deferring the execution of the source of a Flow is for (cold) Flows whose data-source is a resource that should not be started/created or stopped/destroyed by each and every call to
collect
and should be explicitly managed by a call to a function (connect
, for example) instead. It differs from usingbroadcastIn
by the fact thatpublish
will return aFlow
, not aBroadcastChannel
.E.g.
I propose creating these new classes and extension functions or something similar (they are modeled after RxJava
ConnectableObserver
):and
This is my first stab at an initial/draft/try-out implementation:
https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381
Update:
I took
autoConnect
out: This is more for 'replay' and 'caching'. If needed, this should be addressed in a separate issue.The text was updated successfully, but these errors were encountered: