Skip to content

Proposal: asConflatedBroadcastChannel extension for Flow #1221

Closed
@LouisCAD

Description

@LouisCAD

Related to #1086

I thought about how to share a cold stream to multiple consumers without turning it into a hot stream, and inspiration from the new liveData coroutine based builder helped me find a potential solution.

Proposal:

The extension function

Add an extension function named asConflatedBroadcastChannel for Flow.
This function doesn't activate the Flow, but returns a ConflatedBroadcastReceiveChannel that will activate the flow as long as it has at least one active subscriber.

It doesn't returns a BroadcastChannel, nor a BroadcastReceiveChannel because those implement SendChannel, and here, we don't want to expose sending as it is managed internally when a value is collected from the source Flow.

Implied changes in the broadcast channels type hierarchy

To complement ConflatedBroadcastReceiveChannel, a new BroadcastReceiveChannel interface is also introduced. As you can guess, the former extends the latter.

The existing BroadcastChannel interface loses its openSubscription function to its new super-interface BroadcastReceiveChannel.

The ConflatedBroadcastChannel now implements ConflatedBroadcastReceiveChannel in addition to BroadcastChannel.

All the extensions and symbols that have BroadcastChannel in their signature solely for its openSubscription function are updated to replace their usage by BroadcastReceiveChannel instead.
That includes the asFlow() extension function, which now allows to share an activated flow to different collectors through a middle channel.

Why distinction between conflated and not necessarily conflated

The distinction between ConflatedBroadcastReceiveChannel and BroadcastReceiveChannel exists for the predictability of the behavior for consumers/collectors, which I believe is important as a new flow collector will not get the first value, but the last one emitted. I also believe this is the most common and simplest use case.

There's still a risk the ConflatedBroadcastReceiveChannel function would be misued by a bad implementor, but I don't think it's much different from List.

Activated timeout

Similarly to the coroutines based liveData builder, we could pass a timeout millis parameter to the asConflatedBroadcastChannel extension function, to instruct it to keep the flow activated and updating the conflated value for the given time after the last subsciption is cancelled, in case a new subscriber comes before the end of this delay. Lot of use cases in UI code here (including Android, of course).

Scope for the source Flow

Still, we need to pass a CoroutineScope since we can't get the scope of the first consumer/collecter and make it outlive that first subscription for other subscribers. I guess attempting to open a subscription after that scope is no longer active should result in a IllegalStateException.

Remaining questions

That still leaves us with a few unanswered questions though:

Exception/throwable handling, I'd expect them to reach all the subscribers, but having a middleman be a channel makes it impossible.
Should we discourage having the source flow throw, and crash in some way if this is violated?
Or maybe find an alternative to the middle conflated broadcast channel, involving only flows in the public API surface?

I'm happy to read your thoughts about this or another better way to tackle the same problem.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions