Description
The share()
operator operates on Flow<T>
and returns Flow<T>
. It shall have the following semantics. The resulting flow is cold, but when one collector shart collecting from it, the it starts to collect from the upstream flow, activating the emitter upstream. The trick of the share
operator is that when additional collectors appear in the downstream, they all "share" the same upstream emitter.
For example, consider the flow:
val flow = flow {
var i = 0
while(true) {
delay(1000)
println("Emit $i")
emit(i++)
}
}
If you launch two collectors:
launch { flow.collect { println("A: got $it") } }
launch { flow.collect { println("B: got $it") } }
Then you shall see "Emit 0 / A: got 0 / Emit 0 / B: got 0 / Emit 1 / A: got 1 / Emit 1 / B: got 1 / ...".
However, if you change the flow to val flow = flow { /* same */ }.share()
, then you shall see "Emit 0 / A: got 0 / B: got 0 / Emit 1 / A: got 1 / B: got 1 / ...", that is one emission gets delivered to both collectors.
Now if you need to artificially "start" the shared flow simply to keep it active, then you can always launch a dummy collector: launch { flow.collect {} }
that works as a "reference" which is active until you cancel the resulting job.
TBD: Share operator might need some configuration with the respect to how much "history" to keep in memory for "late collectors". So far it seems that one non-negative integer is enough (with zero -- new collector don't get any history, with one -- only the most recent value, with more -- the specified number of recent values). What is unclear is what shall be the default value (if any).
UPDATE: It will have to be, actually, a shareIn(scope)
operator. Otherwise, the scope it works in will be bound the the first collectors and when this callector is cancelled it will not be able to maintain emissions to other collectors.