Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Inconsistent behavior on throwing UndeliverableException in Observable.merge #5779

Closed
tony-root opened this issue Dec 27, 2017 · 9 comments

Comments

@tony-root
Copy link

Library version: 2.1.7

As stated in RxJava2 Error Handling, no Throwable should be silently swallowed in 2.x. Throwables that cannot be delivered to a disposed downstream get redirected to RxJavaPlugins' errorHandler.

However, Observable.merge operator does not behave consistently in this case:

  • If two different PublishSubjects are merged, and emit onError one after the other, the second exception does not get delivered to RxJavaPlugins' errorHandler.
  • If one PublishSubject get merged with itself (after being filtered for example, which I expect to be a common scenario), and it emit single onError, the exception emitted gets delivered to both onError and RxJavaPlugins' errorHandler.

The snippet below

public class MergeErrorsExample {
    public static void main(String[] args) {
        RxJavaPlugins.setErrorHandler((error) ->
                printError("RxJavaPlugins.errorHandler", error)
        );

        mergeTwoDifferentObservablesDoesNotThrowUndeliverable();
        mergeSameObservableThrowsUndeliverable();
        mergeSameObservableWithPublishThrowsUndeliverable();
    }

    static void mergeTwoDifferentObservablesDoesNotThrowUndeliverable() {
        System.out.println("Merge two different PublishSubjects DOES NOT throw UndeliverableException");
        PublishSubject<Object> ps1 = PublishSubject.create();
        PublishSubject<Object> ps2 = PublishSubject.create();

        Observable.merge(ps1, ps2).subscribe(
                (next) -> System.out.println("onNext " + next),
                (error) -> printError("onError", error),
                () -> System.out.println("onComplete")
        );

        ps1.onError(new RuntimeException("ps1 exception"));
        ps2.onError(new RuntimeException("ps2 exception"));
        System.out.println();
    }

    static void mergeSameObservableThrowsUndeliverable() {
        System.out.println("Merge same PublishSubject throws UndeliverableException");
        PublishSubject<Boolean> ps1 = PublishSubject.create();

        Observable.merge(
                ps1.filter((condition) -> condition),
                ps1.filter((condition) -> !condition)
        ).subscribe(
                (next) -> System.out.println("onNext " + next),
                (error) -> printError("onError", error),
                () -> System.out.println("onComplete")
        );

        ps1.onError(new RuntimeException("ps1 exception"));
        System.out.println();
    }

    static void mergeSameObservableWithPublishThrowsUndeliverable() {
        System.out.println("Merge same Observable with publish() throws UndeliverableException");
        Observable<Boolean> o1 = Observable.error(new RuntimeException("o1 exception"));

        o1.publish((observable) ->
                Observable.merge(
                        observable.filter((condition) -> condition),
                        observable.filter((condition) -> !condition)
                )
        ).subscribe(
                (next) -> System.out.println("onNext " + next),
                (error) -> printError("onError", error),
                () -> System.out.println("onComplete")
        );
        System.out.println();
    }

    private static void printError(String message, Throwable t) {
        System.out.println(message + ": [" + t.getClass().getSimpleName() + "] " + t.getMessage());
    }
}

produces

Merge two different PublishSubjects DOES NOT throw UndeliverableException
onError: [RuntimeException] ps1 exception

Merge same PublishSubject throws UndeliverableException
onError: [RuntimeException] ps1 exception
RxJavaPlugins.errorHandler: [UndeliverableException] java.lang.RuntimeException: ps1 exception

Merge same Observable with publish() throws UndeliverableException
onError: [RuntimeException] o1 exception
RxJavaPlugins.errorHandler: [UndeliverableException] java.lang.RuntimeException: o1 exception

I'm still not sure which of the options is correct, as "splitting" an Observable and then merging it back seems to be a common pattern, which will always cause an UndeliverableException in RxJavaPlugins' errorHandler in case of onError.
Observable.mergeDelayErrors is an option for "split-merge" pattern, but the user of it has to make sure he does not swallow an exception on one of the paths, or the exception will not be delivered to the downstream at all.

What do you think would be the correct behavior here?
Do you think it is possible to make Observable.merge not redirect an error to RxJavaPlugins' errorHandler in case of "split-merge" pattern?

@akarnokd
Copy link
Member

Hi, here are the explanations:

  • mergeTwoDifferentObservablesDoesNotThrowUndeliverable

The second PublishSubject gets unsubscribed before you call onError thus there is no Observer at that moment to receive an error. A Subject can't know if it will never be subscribed again thus signalling an undeliverable error is likely too much.

  • mergeSameObservableThrowsUndeliverable

When the PublishSubject emits an error, it uses the last snapshot of available Observers and tries to emit the error to them. Here, the second Observer gets unsubscribed during the emission loop, and therefore, the error gets routed to the plugin error handler.

  • mergeSameObservableWithPublishThrowsUndeliverable

publish() operates with the same snapshot-emission logic as PublishSubject and the same effect applies.

Do you think it is possible to make Observable.merge not redirect an error to RxJavaPlugins' errorHandler in case of "split-merge" pattern?

No, that's why mergeDelayError exists which will get you all errors in a composite. The other option is to not let the errors reach merge. If you want only the very first error and ignore the rest, you could use some additional logic:

public static <T> Observable<T> mergeFirstErrorOnly(Observable<T>... sources) {
    return Observable.defer(() -> {
        AtomicBoolean once = new AtomicBoolean();
        PublishSubject<Void> errorSubject = PublishSubject.create();
        for (int i = 0; i < sources.length; i++) {
            sources[i] = sources[i].onErrorResumeNext(e -> {
                if (once.compareAndSet(false, true)) {
                    errorSubject.onError(e);
                }
                return Observable.empty();
            });
        }
        return Observable.merge(sources).takeUntil(errorSubject);
    });
}

@tony-root
Copy link
Author

tony-root commented Dec 27, 2017

Hi @akarnokd,

Thanks for the detailed explanation!
mergeDelayError is indeed what we ended up with.

I now see that it is expected implementation-wise, however from "rxjava-user" point of view I did not expect that something as simple as

        o1.publish((observable) ->
                Observable.merge(
                        observable.filter((condition) -> condition),
                        observable.filter((condition) -> !condition)
                )
        )

would crash on Android (if no custom plugin error handler is installed). I would rather expect merge to behave similar to your mergeFirstErrorOnly implementation.

Do you think it is worth mentioning in the documentation of merge operator at least?

@akarnokd
Copy link
Member

See #5780 and #5781 for the proposed JavaDoc improvements.

@akarnokd
Copy link
Member

akarnokd commented Jan 8, 2018

Closing via #5786.

@tony-root
Copy link
Author

Hi @akarnokd !

After some usage of the suggested mergeFirstErrorOnly we found that it still produces an UndeliverableException to the RxJavaPlugins' errorHandler if called from multiple threads.
Below in the complete test that reproduces it:

package com.test;

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

public class MergeFirstErrorOnlyTest {

    @Test
    public void mergeFirstErrorOnlyWithConcurrentUsage() {

        final List<Throwable> undeliverableExceptions = new ArrayList<>();

        RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                undeliverableExceptions.add(throwable);
            }
        });

        for (int i = 0; i < 10000; ++i) {
            PublishSubject<String> subject1 = PublishSubject.create();
            PublishSubject<String> subject2 = PublishSubject.create();

            Disposable disposable = Observables
                    .mergeFirstErrorOnly(
                            subject1.observeOn(Schedulers.io()),
                            subject2.observeOn(Schedulers.io())
                    )
                    .subscribe(
                            new Consumer<String>() {
                                @Override
                                public void accept(String s) {
                                    // do nothing
                                }
                            },
                            new Consumer<Throwable>() {
                                @Override
                                public void accept(Throwable throwable) {
                                    // do nothing
                                }
                            }
                    );

            subject1.onError(new Exception("Exception 1"));
            subject2.onError(new Exception("Exception 2"));

            disposable.dispose();
        }

        if (!undeliverableExceptions.isEmpty()) {
            undeliverableExceptions.get(0).printStackTrace();
        }

        Assertions.assertThat(undeliverableExceptions).isEmpty();

        RxJavaPlugins.reset();
    }
}

class Observables {
    public static <T> Observable<T> mergeFirstErrorOnly(final Observable<T>... sources) {
        return Observable.defer(new Callable<ObservableSource<? extends T>>() {
            @Override
            public ObservableSource<? extends T> call() {
                final AtomicBoolean once = new AtomicBoolean();
                final PublishSubject<Void> errorSubject = PublishSubject.create();
                for (int i = 0; i < sources.length; i++) {
                    sources[i] = sources[i].onErrorResumeNext(new Function<Throwable, ObservableSource<? extends T>>() {
                        @Override
                        public ObservableSource<? extends T> apply(Throwable e) {
                            if (once.compareAndSet(false, true)) {
                                errorSubject.onError(e);
                            }
                            return Observable.empty();
                        }
                    });
                }
                return Observable.merge(Arrays.asList(sources)).takeUntil(errorSubject);
            }
        });
    }
}

The above fails with results similar to

java.lang.AssertionError: 
Expecting empty but was:<[io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Exception 2,
    io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Exception 2]>

	at com.test.MergeFirstErrorOnlyTest.mergeFirstErrorOnlyWithConcurrentUsage(MergeFirstErrorOnlyTest.java:68)

The exception itself looks like

io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Exception 2
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at io.reactivex.subjects.PublishSubject$PublishDisposable.onError(PublishSubject.java:315)
	at io.reactivex.subjects.PublishSubject.onError(PublishSubject.java:245)
	at com.test.MergeFirstErrorOnlyTest.mergeFirstErrorOnlyWithConcurrentUsage(MergeFirstErrorOnlyTest.java:59)

For now I don't see a way to solve the issue completely using the current RxJava 2 API.
Looks like the publish subject itself is missing something similar to tryOnError, WDYT?

@tony-root
Copy link
Author

Something like

public final class PublishSubject<T> extends Subject<T> {
    
    public void tryOnError(Throwable t) {
        ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        if (subscribers.get() == TERMINATED) {
            RxJavaPlugins.onError(t);
            return;
        }
        error = t;

        for (PublishDisposable<T> s : subscribers.getAndSet(TERMINATED)) {
            s.tryOnError(t);
        }
    }

    static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {

        public void tryOnError(Throwable t) {
            if (!get()) {
                actual.onError(t);
            }
        }
    }
}

I can submit a PR if you're OK with the proposal.
Still, I totally understand that I might be missing some deeper implications of such change so please advise.

@akarnokd
Copy link
Member

If you cancel a PublishSubject while it is about to emit an error, the global error handler is involved. Your racing test runs into this case. This property is described in the JavaDoc.

The implication of a tryOnError is that it reports false when the error was not delivered, but when there are more than one source, it is unclear what the result should be if some have not received the error.

@akarnokd
Copy link
Member

Also looks like you are in control of those Subjects. In that case, don't error each individual subject, but use a separate one:

PublishSubject ps1 = PublishSubject.create();
PublishSubject ps2 = PublishSubject.create();

AtomicBoolean once = new AtomicBoolean();
PublishSubject error = PublishSubject.create();

Observable.merge(ps1, ps2).takeUntil(error);

if (once.compareAndSet(false, true)) {
    error.onError(new RuntimeException());
}

This way, the merge cancelling the non-failing ps1 and ps2 won't signal error to the global handler.

@tony-root
Copy link
Author

tony-root commented Mar 15, 2018

OK, thanks, I see the point.
Will indeed try to route errors from all streams to a single PublishSubject's onError call, guarded by AtomicBoolean.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants