You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I thought about how to share a cold stream to multiple consumers without turning it into a hot stream, and inspiration from the new liveData coroutine based builder helped me find a potential solution.
Proposal:
The extension function
Add an extension function named asConflatedBroadcastChannel for Flow.
This function doesn't activate the Flow, but returns a ConflatedBroadcastReceiveChannel that will activate the flow as long as it has at least one active subscriber.
It doesn't returns a BroadcastChannel, nor a BroadcastReceiveChannel because those implement SendChannel, and here, we don't want to expose sending as it is managed internally when a value is collected from the source Flow.
Implied changes in the broadcast channels type hierarchy
To complement ConflatedBroadcastReceiveChannel, a new BroadcastReceiveChannel interface is also introduced. As you can guess, the former extends the latter.
The existing BroadcastChannel interface loses its openSubscription function to its new super-interface BroadcastReceiveChannel.
The ConflatedBroadcastChannel now implements ConflatedBroadcastReceiveChannel in addition to BroadcastChannel.
All the extensions and symbols that have BroadcastChannel in their signature solely for its openSubscription function are updated to replace their usage by BroadcastReceiveChannel instead.
That includes the asFlow() extension function, which now allows to share an activated flow to different collectors through a middle channel.
Why distinction between conflated and not necessarily conflated
The distinction between ConflatedBroadcastReceiveChannel and BroadcastReceiveChannel exists for the predictability of the behavior for consumers/collectors, which I believe is important as a new flow collector will not get the first value, but the last one emitted. I also believe this is the most common and simplest use case.
There's still a risk the ConflatedBroadcastReceiveChannel function would be misued by a bad implementor, but I don't think it's much different from List.
Activated timeout
Similarly to the coroutines based liveData builder, we could pass a timeout millis parameter to the asConflatedBroadcastChannel extension function, to instruct it to keep the flow activated and updating the conflated value for the given time after the last subsciption is cancelled, in case a new subscriber comes before the end of this delay. Lot of use cases in UI code here (including Android, of course).
Scope for the source Flow
Still, we need to pass a CoroutineScope since we can't get the scope of the first consumer/collecter and make it outlive that first subscription for other subscribers. I guess attempting to open a subscription after that scope is no longer active should result in a IllegalStateException.
Remaining questions
That still leaves us with a few unanswered questions though:
Exception/throwable handling, I'd expect them to reach all the subscribers, but having a middleman be a channel makes it impossible.
Should we discourage having the source flow throw, and crash in some way if this is violated?
Or maybe find an alternative to the middle conflated broadcast channel, involving only flows in the public API surface?
I'm happy to read your thoughts about this or another better way to tackle the same problem.
The text was updated successfully, but these errors were encountered:
I'm not sure we need CBC for this. How about flow.share() operator that returns another flow in a way that multiple collectors to the resulting flow will all share the single upstream instance of producer. Isn't it enough?
Related to #1086
I thought about how to share a cold stream to multiple consumers without turning it into a hot stream, and inspiration from the new
liveData
coroutine based builder helped me find a potential solution.Proposal:
The extension function
Add an extension function named
asConflatedBroadcastChannel
forFlow
.This function doesn't activate the
Flow
, but returns aConflatedBroadcastReceiveChannel
that will activate the flow as long as it has at least one active subscriber.It doesn't returns a
BroadcastChannel
, nor aBroadcastReceiveChannel
because those implementSendChannel
, and here, we don't want to expose sending as it is managed internally when a value is collected from the sourceFlow
.Implied changes in the broadcast channels type hierarchy
To complement
ConflatedBroadcastReceiveChannel
, a newBroadcastReceiveChannel
interface is also introduced. As you can guess, the former extends the latter.The existing
BroadcastChannel
interface loses itsopenSubscription
function to its new super-interfaceBroadcastReceiveChannel
.The
ConflatedBroadcastChannel
now implementsConflatedBroadcastReceiveChannel
in addition toBroadcastChannel
.All the extensions and symbols that have
BroadcastChannel
in their signature solely for itsopenSubscription
function are updated to replace their usage byBroadcastReceiveChannel
instead.That includes the
asFlow()
extension function, which now allows to share an activated flow to different collectors through a middle channel.Why distinction between conflated and not necessarily conflated
The distinction between
ConflatedBroadcastReceiveChannel
andBroadcastReceiveChannel
exists for the predictability of the behavior for consumers/collectors, which I believe is important as a new flow collector will not get the first value, but the last one emitted. I also believe this is the most common and simplest use case.There's still a risk the
ConflatedBroadcastReceiveChannel
function would be misued by a bad implementor, but I don't think it's much different fromList
.Activated timeout
Similarly to the coroutines based
liveData
builder, we could pass a timeout millis parameter to theasConflatedBroadcastChannel
extension function, to instruct it to keep the flow activated and updating the conflated value for the given time after the last subsciption is cancelled, in case a new subscriber comes before the end of this delay. Lot of use cases in UI code here (including Android, of course).Scope for the source
Flow
Still, we need to pass a
CoroutineScope
since we can't get the scope of the first consumer/collecter and make it outlive that first subscription for other subscribers. I guess attempting to open a subscription after that scope is no longer active should result in aIllegalStateException
.Remaining questions
That still leaves us with a few unanswered questions though:
Exception/throwable handling, I'd expect them to reach all the subscribers, but having a middleman be a channel makes it impossible.
Should we discourage having the source flow throw, and crash in some way if this is violated?
Or maybe find an alternative to the middle conflated broadcast channel, involving only flows in the public API surface?
I'm happy to read your thoughts about this or another better way to tackle the same problem.
The text was updated successfully, but these errors were encountered: