Skip to content

Consider sharing a Flow through a ConnectableFlow #1086

Closed
@streetsofboston

Description

@streetsofboston

Enhancement: Add ConnectableFlow to the Flow API.

Each time an observer of a Flow starts collecting, the source of the Flow is executed, much like a call to subscribe of a Flowable in RxJava executes the Flowable's source.

This change is to defer the execution of the source of the Flow until a specific point in time, possibly after one or more observers started collecting the 'shared' Flow.

The use-case for deferring the execution of the source of a Flow is for (cold) Flows whose data-source is a resource that should not be started/created or stopped/destroyed by each and every call to collect and should be explicitly managed by a call to a function (connect, for example) instead. It differs from using broadcastIn by the fact that publish will return a Flow, not a BroadcastChannel.

E.g.

val dataFlow = flowViaChannel<MyData> { channel ->
    val resource = getResource(channel)
    channel.invokeOnClose {
        resource.close()
    }
    resource.startReceivingData()
}

val sharedFlow = dataFlow.publish()
...
...

val observer1 = launch {
    sharedFlow.collect { ... }
}
...
...
val observer2 = launch {
    sharedFlow.collect { ... }
}
...
// Start the flow right now.
val connection = sharedFlow.connect(scope)
...
...
...
// Cancel the flow here.
// Note that when 'scope' is cancelled, this 'connection' would be canceled as well.
connection.close()
...

I propose creating these new classes and extension functions or something similar (they are modeled after RxJava ConnectableObserver):

/**
 * A [Flow] of type [T] that only starts emitting value after its [connect] method is called.
 * 
 * If this flow's [Connection] is still connected, the current [Connection] will be returned when 
 * [connect] is called and the flow will not be restarted. 
 *
 * When its [collect] method is called, this flow will not immediately start collecting. Only after
 * [connect] is called, the emission and actual collecting of values starts.
 */
interface ConnectableFlow<out T> : Flow<T> {
    /**
     * Connects this shared [Flow] to start emitting values.
     *
     * @param scope The [CoroutineScope] in which the emissions will take place.
     * @return The [Connection] that can be closed to stop this shared [Flow].
     */
    fun connect(scope: CoroutineScope): Connection
}

and

/**
 * A connection returned by a call to [ConnectableFlow.connect].
 */
interface Connection {
    /**
     * Returns true if this connection is connected and active.
     */
    suspend fun isConnected(): Boolean

    /**
     * Closes this connection in an orderly fashion.
     */
    suspend fun close()
}

/**
 * Publishes and shares an upstream [Flow] of type [T] and returns a [ConnectableFlow] of type [T].
 *
 * The upstream [Flow] begins emissions only after the [ConnectableFlow.connect] has been called.
 *
 * @return The [ConnectableFlow] that represents the shared [Flow] of this receiver.
 */
fun <T> Flow<T>.publish(): ConnectableFlow<T> 

/**
 * Creates a [Flow] of type [T] from this [ConnectableFlow] that automatically connects (i.e. calls
 * [ConnectableFlow.connect]) when the first [numberOfCollectors] observer starts collecting (i.e. calls [Flow.collect])
 * and automatically cancels this [ConnectableFlow] when the last observers stops collecting.
 *
 * @param scope The scope in which this [ConnectableFlow] will be connected.
 * @param numberOfCollectors The number of observers that need to start collecting before the connection (re)starts.
 * @return The shared referenced-counted [Flow].
 */
fun <T> ConnectableFlow<T>.refCount(scope: CoroutineScope, numberOfCollectors: Int = 1): Flow<T> =

/**
 * Shares this [Flow] of type [T] with multiple observers without restarting when each observer starts
 * collecting. This is the same as calling [Flow.publish] and then [ConnectableFlow.refCount].
 *
 * @param scope The scope in which this [ConnectableFlow] will be connected.
 * @return The new [Flow] that shares this [Flow]
 */
fun <T> Flow<T>.share(scope: CoroutineScope): Flow<T> = publish().refCount(scope)

This is my first stab at an initial/draft/try-out implementation:
https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381

Update:
I took autoConnect out: This is more for 'replay' and 'caching'. If needed, this should be addressed in a separate issue.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions