Description
Related to #1086
I thought about how to share a cold stream to multiple consumers without turning it into a hot stream, and inspiration from the new liveData
coroutine based builder helped me find a potential solution.
Proposal:
The extension function
Add an extension function named asConflatedBroadcastChannel
for Flow
.
This function doesn't activate the Flow
, but returns a ConflatedBroadcastReceiveChannel
that will activate the flow as long as it has at least one active subscriber.
It doesn't returns a BroadcastChannel
, nor a BroadcastReceiveChannel
because those implement SendChannel
, and here, we don't want to expose sending as it is managed internally when a value is collected from the source Flow
.
Implied changes in the broadcast channels type hierarchy
To complement ConflatedBroadcastReceiveChannel
, a new BroadcastReceiveChannel
interface is also introduced. As you can guess, the former extends the latter.
The existing BroadcastChannel
interface loses its openSubscription
function to its new super-interface BroadcastReceiveChannel
.
The ConflatedBroadcastChannel
now implements ConflatedBroadcastReceiveChannel
in addition to BroadcastChannel
.
All the extensions and symbols that have BroadcastChannel
in their signature solely for its openSubscription
function are updated to replace their usage by BroadcastReceiveChannel
instead.
That includes the asFlow()
extension function, which now allows to share an activated flow to different collectors through a middle channel.
Why distinction between conflated and not necessarily conflated
The distinction between ConflatedBroadcastReceiveChannel
and BroadcastReceiveChannel
exists for the predictability of the behavior for consumers/collectors, which I believe is important as a new flow collector will not get the first value, but the last one emitted. I also believe this is the most common and simplest use case.
There's still a risk the ConflatedBroadcastReceiveChannel
function would be misued by a bad implementor, but I don't think it's much different from List
.
Activated timeout
Similarly to the coroutines based liveData
builder, we could pass a timeout millis parameter to the asConflatedBroadcastChannel
extension function, to instruct it to keep the flow activated and updating the conflated value for the given time after the last subsciption is cancelled, in case a new subscriber comes before the end of this delay. Lot of use cases in UI code here (including Android, of course).
Scope for the source Flow
Still, we need to pass a CoroutineScope
since we can't get the scope of the first consumer/collecter and make it outlive that first subscription for other subscribers. I guess attempting to open a subscription after that scope is no longer active should result in a IllegalStateException
.
Remaining questions
That still leaves us with a few unanswered questions though:
Exception/throwable handling, I'd expect them to reach all the subscribers, but having a middleman be a channel makes it impossible.
Should we discourage having the source flow throw, and crash in some way if this is violated?
Or maybe find an alternative to the middle conflated broadcast channel, involving only flows in the public API surface?
I'm happy to read your thoughts about this or another better way to tackle the same problem.