Description
Enhancement: Add ConnectableFlow
to the Flow API.
Each time an observer of a Flow
starts collecting, the source of the Flow is executed, much like a call to subscribe
of a Flowable
in RxJava executes the Flowable
's source.
This change is to defer the execution of the source of the Flow until a specific point in time, possibly after one or more observers started collecting the 'shared' Flow.
The use-case for deferring the execution of the source of a Flow is for (cold) Flows whose data-source is a resource that should not be started/created or stopped/destroyed by each and every call to collect
and should be explicitly managed by a call to a function (connect
, for example) instead. It differs from using broadcastIn
by the fact that publish
will return a Flow
, not a BroadcastChannel
.
E.g.
val dataFlow = flowViaChannel<MyData> { channel ->
val resource = getResource(channel)
channel.invokeOnClose {
resource.close()
}
resource.startReceivingData()
}
val sharedFlow = dataFlow.publish()
...
...
val observer1 = launch {
sharedFlow.collect { ... }
}
...
...
val observer2 = launch {
sharedFlow.collect { ... }
}
...
// Start the flow right now.
val connection = sharedFlow.connect(scope)
...
...
...
// Cancel the flow here.
// Note that when 'scope' is cancelled, this 'connection' would be canceled as well.
connection.close()
...
I propose creating these new classes and extension functions or something similar (they are modeled after RxJava ConnectableObserver
):
/**
* A [Flow] of type [T] that only starts emitting value after its [connect] method is called.
*
* If this flow's [Connection] is still connected, the current [Connection] will be returned when
* [connect] is called and the flow will not be restarted.
*
* When its [collect] method is called, this flow will not immediately start collecting. Only after
* [connect] is called, the emission and actual collecting of values starts.
*/
interface ConnectableFlow<out T> : Flow<T> {
/**
* Connects this shared [Flow] to start emitting values.
*
* @param scope The [CoroutineScope] in which the emissions will take place.
* @return The [Connection] that can be closed to stop this shared [Flow].
*/
fun connect(scope: CoroutineScope): Connection
}
and
/**
* A connection returned by a call to [ConnectableFlow.connect].
*/
interface Connection {
/**
* Returns true if this connection is connected and active.
*/
suspend fun isConnected(): Boolean
/**
* Closes this connection in an orderly fashion.
*/
suspend fun close()
}
/**
* Publishes and shares an upstream [Flow] of type [T] and returns a [ConnectableFlow] of type [T].
*
* The upstream [Flow] begins emissions only after the [ConnectableFlow.connect] has been called.
*
* @return The [ConnectableFlow] that represents the shared [Flow] of this receiver.
*/
fun <T> Flow<T>.publish(): ConnectableFlow<T>
/**
* Creates a [Flow] of type [T] from this [ConnectableFlow] that automatically connects (i.e. calls
* [ConnectableFlow.connect]) when the first [numberOfCollectors] observer starts collecting (i.e. calls [Flow.collect])
* and automatically cancels this [ConnectableFlow] when the last observers stops collecting.
*
* @param scope The scope in which this [ConnectableFlow] will be connected.
* @param numberOfCollectors The number of observers that need to start collecting before the connection (re)starts.
* @return The shared referenced-counted [Flow].
*/
fun <T> ConnectableFlow<T>.refCount(scope: CoroutineScope, numberOfCollectors: Int = 1): Flow<T> =
/**
* Shares this [Flow] of type [T] with multiple observers without restarting when each observer starts
* collecting. This is the same as calling [Flow.publish] and then [ConnectableFlow.refCount].
*
* @param scope The scope in which this [ConnectableFlow] will be connected.
* @return The new [Flow] that shares this [Flow]
*/
fun <T> Flow<T>.share(scope: CoroutineScope): Flow<T> = publish().refCount(scope)
This is my first stab at an initial/draft/try-out implementation:
https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381
Update:
I took autoConnect
out: This is more for 'replay' and 'caching'. If needed, this should be addressed in a separate issue.