Skip to content

Easy Unsubscription #277

Closed
Closed
@ZakTaccardi

Description

@ZakTaccardi

One of the best parts of Rx is that unsubscriptions automatically transfer upstream. I can create a complex stream from multiple data sources, transform them, and subscribe to them. Then, I can add them to a CompositeDisposable - which can automatically get cleared during the appropriate lifecycle event, like onDestroyed() (I'm an android developer). It would be really helpful if co-routines could implement this behavior.

val disposables = CompositeDisposable()

// writeable stream in our model layer
val writeableSource: Subject<Int> = PublishSubject.create()

// readable stream. our model layer only exposes readable properties
val readableSource: Observable<Int> = writeableSource

val disposable = readableSource.map {
    println("map executed for: $it")
    it
}
    .subscribe { println(it) }
    .addTo(disposables)

writeableSource.onNext(1)

disposables.clear()

// should be ignored, because we previously called clear()
writeableSource.onNext(2)

// outputs:
// map executed for: 1
// 1

The biggest roadblock to this is that because Channel<T> subscriptions are hot, unsubscriptions do not automatically transfer upstream. Would #254 provide this? More specifically:

// writeable stream in our model layer
val broadcastChannel = BroadcastChannel<Int>(10)

// readable stream
val source: SubscriptionReceiveChannel<Int> = broadcastChannel.openSubscription()

launch {
    source // closing this prevents downstream operators from emitting
        .map { it + 1 }
        .map { it + 2 }
        .also { receiveChannel ->
            // calling `.cancel(null)` on the `ReceiveChannel` returned by the previous
            // `.map` operator above does not close the upstream channel. This can easily
            // cause a memory leak
            receiveChannel.cancel(null)
        }
        // ideally, `.consumeEach()` could somehow be modified to return a `Disposable` like
        // interface that cancels all upstream emissions
        .consumeEach { println(it) }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions