Skip to content

Observable.create may call source callback with already disposed emitter #7862

Open
@feerbach

Description

@feerbach

I’m observing behavior that seems strange to me when I use an Observable constructed in the following way.

I’m using Observable.using to create an observable based on some resource. In Observable.using, I pass a function as sourceSupplier, which returns an observable created using Observable.create.

Then I apply the takeUntil operator to the observable created by Observable.using to combine it with another observable — stopSubject, which in this example is a BehaviorSubject.

        class Resource {
            boolean isClosed = false;
        }

        BehaviorSubject<Object> stopSubject = BehaviorSubject.create();

        stopSubject.onNext(1);

        Observable
                .using(
                        Resource::new,
                        (resource) -> Observable.create(emitter -> {
                            ...
                        }),
                        (resource) -> {
                            resource.isClosed = true;
                        }
                )
                .takeUntil(stopSubject)

I expect that when the callback passed to Observable.create is invoked, the resource created by Observable.using has not yet been disposed, and the emitter passed to the callback is not yet disposed either. I verify this in the tests of the attached TakeUntilTest class.

TakeUntilTest.java.zip

These expectations hold true if I emit a value into stopSubject after subscribing to the observable under test (see the test otherObservableEmitsValueAfterSubscription).

However, if I emit a value into stopSubject before subscribing to the observable under test, the following happens:
The callback passed to Observable.create is invoked, but by that time the resource has already been disposed, and the emitter passed to the callback is already in the disposed state. This is verified in the test otherObservableHasValueInAdvance.

I looked into the RxJava implementation, and I believe the reason for this behavior is as follows:

ObservableTakeUntil class in its subscribeActual method first subscribes to the other observable and only afterwards subscribes to the source observable:

    @Override
    public void subscribeActual(Observer<? super T> child) {
        TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<>(child);
        child.onSubscribe(parent);

        other.subscribe(parent.otherObserver);
        source.subscribe(parent);
    }

In my test, stopSubject acts as the other observable, and since it already has a value in advance, subscribing to it immediately triggers the TakeUntilMainObserver.otherComplete method, which writes a special DISPOSED value into the TakeUntilMainObserver.upstream field (using the helper method DisposableHelper.dispose):

        void otherComplete() {
            DisposableHelper.dispose(upstream);
            HalfSerializer.onComplete(downstream, this, error);
        }

The subsequent subscription to the source observable in ObservableTakeUntil.subscribeActual calls subscribeActual on ObservableCreate, which first calls onSubscribe on the passed observer, and only then invokes the source callback:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

The call to observer.onSubscribe inside ObservableCreate ends up calling TakeUntilMainObserver. onSubscribe, which tries to set the passed disposable into the upstream field using the helper method DisposableHelper.setOnce:

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(upstream, d);
        }

But since the upstream field already contains the special DISPOSED value, the assignment doesn’t happen, and instead DisposableHelper.setOnce calls dispose on the passed disposable. In this case, the disposable is an instance of ObservableUsing$UsingObserver, which proceeds to dispose the resource:

        @Override
        public void dispose() {
            if (eager) {
                disposeResource();
                upstream.dispose();
                upstream = DisposableHelper.DISPOSED;
            } else {
                upstream.dispose();
                upstream = DisposableHelper.DISPOSED;
                disposeResource();
            }
        }

Therefore, when control returns back to ObservableCreate.subscribeActual after onSubscribe is called, and the source callback is finally invoked, the resource has already been disposed, and the emitter passed to the callback (CreateEmitter) is already in disposed state.

I believe this current behavior is incorrect, and in the test otherObservableHasValueInAdvance, one of the following should happen:
• Either the callback passed to Observable.create should not be invoked at all,
• Or the callback should be invoked before the resource is disposed and the emitter is transitioned to the disposed state.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions