Skip to content

2.x Observable.combineLatestDelayError delivers Error after completion #4986

Closed
@StanislavChumarin

Description

@StanislavChumarin

Observable.combineLatestDelayError sends error event after complete event happened and treated as unhandled.
Checked on rx.Observable, io.reactivex.Observable and Flowable

    @Test
    public void testCombine() {
        rx.observers.TestSubscriber<Integer> testSubscriber = rx.observers.TestSubscriber.create();

        rx.Observable<Long> emptyObservable = rx.Observable.empty();
        rx.Observable<Object> errorObservable = rx.Observable.error(new Exception());

        rx.Observable.combineLatestDelayError(
                Arrays.asList(
                        emptyObservable
                                .doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
                                .doOnTerminate(() -> System.out.println("emptyObservable: doFinally")),
                        errorObservable
                                .doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
                                .doOnTerminate(() -> System.out.println("errorObservable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doOnTerminate(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testSubscriber);

        testSubscriber.awaitTerminalEvent();
    }

    @Test
    public void testCombine2() {
        TestObserver<Integer> testObserver = TestObserver.create();

        Observable<Long> emptyObservable = Observable.empty();
        Observable<Object> errorObservable = Observable.error(new Exception());

        Observable.combineLatestDelayError(
                Arrays.asList(
                        emptyObservable
                                .doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
                                .doFinally(() -> System.out.println("emptyObservable: doFinally")),
                        errorObservable
                                .doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
                                .doFinally(() -> System.out.println("errorObservable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testObserver);

        testObserver.awaitTerminalEvent();
    }

    @Test
    public void testCombine2Flowable() {
        TestSubscriber<Integer> testObserver = TestSubscriber.create();

        Flowable<Integer> emptyFlowable = Flowable.empty();
        Flowable<Object> errorFlowable = Flowable.error(new Exception());

        Flowable.combineLatestDelayError(
                Arrays.asList(
                        emptyFlowable
                                .doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("emptyFlowable: doFinally")),
                        errorFlowable
                                .doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("errorFlowable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testObserver);

        testObserver.awaitTerminalEvent();
    }

Output:
testCombine

emptyObservable: [rx.Notification@2b4a2ec7 OnCompleted]
emptyObservable: doFinally
combineLatestDelayError: [rx.Notification@2b4a2ec7 OnCompleted]
combineLatestDelayError: doFinally

testCombine2

emptyObservable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyObservable: doFinally
errorObservable: OnErrorNotification[java.lang.Exception]
errorObservable: doFinally
java.lang.Exception
	at com.myproject.Test.testCombine2(Test.java:298)
	// not really important Stacktrace
Exception in thread "main" java.lang.Exception
	at com.myproject.Test.testCombine2(Test.java:298)
	// repeat of not important Stacktrace

testCombine2Flowable

emptyFlowable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyFlowable: doFinally

If error emitter goes first or add some timer instead of empty, then everything is ok.

Also noticed difference in events order between 1.x and 2.x. Is it correct?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions