Description
I’m observing behavior that seems strange to me when I use an Observable constructed in the following way.
I’m using Observable.using
to create an observable based on some resource. In Observable.using
, I pass a function as sourceSupplier
, which returns an observable created using Observable.create
.
Then I apply the takeUntil
operator to the observable created by Observable.using
to combine it with another observable — stopSubject
, which in this example is a BehaviorSubject.
class Resource {
boolean isClosed = false;
}
BehaviorSubject<Object> stopSubject = BehaviorSubject.create();
stopSubject.onNext(1);
Observable
.using(
Resource::new,
(resource) -> Observable.create(emitter -> {
...
}),
(resource) -> {
resource.isClosed = true;
}
)
.takeUntil(stopSubject)
I expect that when the callback passed to Observable.create
is invoked, the resource created by Observable.using
has not yet been disposed, and the emitter
passed to the callback is not yet disposed either. I verify this in the tests of the attached TakeUntilTest class.
These expectations hold true if I emit a value into stopSubject after subscribing to the observable under test (see the test otherObservableEmitsValueAfterSubscription
).
However, if I emit a value into stopSubject
before subscribing to the observable under test, the following happens:
The callback passed to Observable.create
is invoked, but by that time the resource has already been disposed, and the emitter
passed to the callback is already in the disposed state. This is verified in the test otherObservableHasValueInAdvance
.
I looked into the RxJava implementation, and I believe the reason for this behavior is as follows:
ObservableTakeUntil
class in its subscribeActual
method first subscribes to the other
observable and only afterwards subscribes to the source
observable:
@Override
public void subscribeActual(Observer<? super T> child) {
TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<>(child);
child.onSubscribe(parent);
other.subscribe(parent.otherObserver);
source.subscribe(parent);
}
In my test, stopSubject
acts as the other
observable, and since it already has a value in advance, subscribing to it immediately triggers the TakeUntilMainObserver.otherComplete
method, which writes a special DISPOSED
value into the TakeUntilMainObserver.upstream
field (using the helper method DisposableHelper.dispose
):
void otherComplete() {
DisposableHelper.dispose(upstream);
HalfSerializer.onComplete(downstream, this, error);
}
The subsequent subscription to the source
observable in ObservableTakeUntil.subscribeActual
calls subscribeActual
on ObservableCreate
, which first calls onSubscribe
on the passed observer
, and only then invokes the source
callback:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
The call to observer.onSubscribe
inside ObservableCreate
ends up calling TakeUntilMainObserver. onSubscribe
, which tries to set the passed disposable into the upstream
field using the helper method DisposableHelper.setOnce
:
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(upstream, d);
}
But since the upstream
field already contains the special DISPOSED
value, the assignment doesn’t happen, and instead DisposableHelper.setOnce
calls dispose
on the passed disposable
. In this case, the disposable
is an instance of ObservableUsing$UsingObserver
, which proceeds to dispose the resource:
@Override
public void dispose() {
if (eager) {
disposeResource();
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
} else {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeResource();
}
}
Therefore, when control returns back to ObservableCreate.subscribeActual
after onSubscribe
is called, and the source
callback is finally invoked, the resource
has already been disposed, and the emitter
passed to the callback (CreateEmitter) is already in disposed state.
I believe this current behavior is incorrect, and in the test otherObservableHasValueInAdvance
, one of the following should happen:
• Either the callback passed to Observable.create
should not be invoked at all,
• Or the callback should be invoked before the resource
is disposed and the emitter
is transitioned to the disposed state.