-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: Inconsistent behavior on throwing UndeliverableException in Observable.merge #5779
Comments
Hi, here are the explanations:
The second
When the
No, that's why public static <T> Observable<T> mergeFirstErrorOnly(Observable<T>... sources) {
return Observable.defer(() -> {
AtomicBoolean once = new AtomicBoolean();
PublishSubject<Void> errorSubject = PublishSubject.create();
for (int i = 0; i < sources.length; i++) {
sources[i] = sources[i].onErrorResumeNext(e -> {
if (once.compareAndSet(false, true)) {
errorSubject.onError(e);
}
return Observable.empty();
});
}
return Observable.merge(sources).takeUntil(errorSubject);
});
} |
Hi @akarnokd, Thanks for the detailed explanation! I now see that it is expected implementation-wise, however from "rxjava-user" point of view I did not expect that something as simple as o1.publish((observable) ->
Observable.merge(
observable.filter((condition) -> condition),
observable.filter((condition) -> !condition)
)
) would crash on Android (if no custom plugin error handler is installed). I would rather expect Do you think it is worth mentioning in the documentation of |
Closing via #5786. |
Hi @akarnokd ! After some usage of the suggested mergeFirstErrorOnly we found that it still produces an UndeliverableException to the RxJavaPlugins' errorHandler if called from multiple threads. package com.test;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
public class MergeFirstErrorOnlyTest {
@Test
public void mergeFirstErrorOnlyWithConcurrentUsage() {
final List<Throwable> undeliverableExceptions = new ArrayList<>();
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
undeliverableExceptions.add(throwable);
}
});
for (int i = 0; i < 10000; ++i) {
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();
Disposable disposable = Observables
.mergeFirstErrorOnly(
subject1.observeOn(Schedulers.io()),
subject2.observeOn(Schedulers.io())
)
.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) {
// do nothing
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
// do nothing
}
}
);
subject1.onError(new Exception("Exception 1"));
subject2.onError(new Exception("Exception 2"));
disposable.dispose();
}
if (!undeliverableExceptions.isEmpty()) {
undeliverableExceptions.get(0).printStackTrace();
}
Assertions.assertThat(undeliverableExceptions).isEmpty();
RxJavaPlugins.reset();
}
}
class Observables {
public static <T> Observable<T> mergeFirstErrorOnly(final Observable<T>... sources) {
return Observable.defer(new Callable<ObservableSource<? extends T>>() {
@Override
public ObservableSource<? extends T> call() {
final AtomicBoolean once = new AtomicBoolean();
final PublishSubject<Void> errorSubject = PublishSubject.create();
for (int i = 0; i < sources.length; i++) {
sources[i] = sources[i].onErrorResumeNext(new Function<Throwable, ObservableSource<? extends T>>() {
@Override
public ObservableSource<? extends T> apply(Throwable e) {
if (once.compareAndSet(false, true)) {
errorSubject.onError(e);
}
return Observable.empty();
}
});
}
return Observable.merge(Arrays.asList(sources)).takeUntil(errorSubject);
}
});
}
} The above fails with results similar to
The exception itself looks like
For now I don't see a way to solve the issue completely using the current RxJava 2 API. |
Something like public final class PublishSubject<T> extends Subject<T> {
public void tryOnError(Throwable t) {
ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
if (subscribers.get() == TERMINATED) {
RxJavaPlugins.onError(t);
return;
}
error = t;
for (PublishDisposable<T> s : subscribers.getAndSet(TERMINATED)) {
s.tryOnError(t);
}
}
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
public void tryOnError(Throwable t) {
if (!get()) {
actual.onError(t);
}
}
}
} I can submit a PR if you're OK with the proposal. |
If you cancel a The implication of a |
Also looks like you are in control of those PublishSubject ps1 = PublishSubject.create();
PublishSubject ps2 = PublishSubject.create();
AtomicBoolean once = new AtomicBoolean();
PublishSubject error = PublishSubject.create();
Observable.merge(ps1, ps2).takeUntil(error);
if (once.compareAndSet(false, true)) {
error.onError(new RuntimeException());
} This way, the merge cancelling the non-failing |
OK, thanks, I see the point. |
Library version: 2.1.7
As stated in RxJava2 Error Handling, no
Throwable
should be silently swallowed in 2.x.Throwable
s that cannot be delivered to a disposed downstream get redirected toRxJavaPlugins
' errorHandler.However,
Observable.merge
operator does not behave consistently in this case:PublishSubject
s are merged, and emitonError
one after the other, the second exception does not get delivered toRxJavaPlugins
' errorHandler.PublishSubject
get merged with itself (after being filtered for example, which I expect to be a common scenario), and it emit single onError, the exception emitted gets delivered to both onError andRxJavaPlugins
' errorHandler.The snippet below
produces
I'm still not sure which of the options is correct, as "splitting" an
Observable
and then merging it back seems to be a common pattern, which will always cause anUndeliverableException
inRxJavaPlugins
' errorHandler in case of onError.Observable.mergeDelayErrors
is an option for "split-merge" pattern, but the user of it has to make sure he does not swallow an exception on one of the paths, or the exception will not be delivered to the downstream at all.What do you think would be the correct behavior here?
Do you think it is possible to make
Observable.merge
not redirect an error toRxJavaPlugins
' errorHandler in case of "split-merge" pattern?The text was updated successfully, but these errors were encountered: