Closed
Description
One of the best parts of Rx is that unsubscriptions automatically transfer upstream. I can create a complex stream from multiple data sources, transform them, and subscribe to them. Then, I can add them to a CompositeDisposable
- which can automatically get cleared during the appropriate lifecycle event, like onDestroyed()
(I'm an android developer). It would be really helpful if co-routines could implement this behavior.
val disposables = CompositeDisposable()
// writeable stream in our model layer
val writeableSource: Subject<Int> = PublishSubject.create()
// readable stream. our model layer only exposes readable properties
val readableSource: Observable<Int> = writeableSource
val disposable = readableSource.map {
println("map executed for: $it")
it
}
.subscribe { println(it) }
.addTo(disposables)
writeableSource.onNext(1)
disposables.clear()
// should be ignored, because we previously called clear()
writeableSource.onNext(2)
// outputs:
// map executed for: 1
// 1
The biggest roadblock to this is that because Channel<T>
subscriptions are hot, unsubscriptions do not automatically transfer upstream. Would #254 provide this? More specifically:
// writeable stream in our model layer
val broadcastChannel = BroadcastChannel<Int>(10)
// readable stream
val source: SubscriptionReceiveChannel<Int> = broadcastChannel.openSubscription()
launch {
source // closing this prevents downstream operators from emitting
.map { it + 1 }
.map { it + 2 }
.also { receiveChannel ->
// calling `.cancel(null)` on the `ReceiveChannel` returned by the previous
// `.map` operator above does not close the upstream channel. This can easily
// cause a memory leak
receiveChannel.cancel(null)
}
// ideally, `.consumeEach()` could somehow be modified to return a `Disposable` like
// interface that cancels all upstream emissions
.consumeEach { println(it) }
}
Metadata
Metadata
Assignees
Labels
No labels