diff --git a/src/main/java/io/reactivex/internal/observers/LambdaObserver.java b/src/main/java/io/reactivex/internal/observers/LambdaObserver.java index 95a66ed7e8..bfda5eb703 100644 --- a/src/main/java/io/reactivex/internal/observers/LambdaObserver.java +++ b/src/main/java/io/reactivex/internal/observers/LambdaObserver.java @@ -67,7 +67,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (!isDisposed()) { - dispose(); + lazySet(DisposableHelper.DISPOSED); try { onError.accept(t); } catch (Throwable e) { @@ -80,7 +80,7 @@ public void onError(Throwable t) { @Override public void onComplete() { if (!isDisposed()) { - dispose(); + lazySet(DisposableHelper.DISPOSED); try { onComplete.run(); } catch (Throwable e) { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java index cdc2187141..8e6078b486 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java @@ -15,7 +15,7 @@ import static org.junit.Assert.*; -import java.util.List; +import java.util.*; import org.junit.Test; import org.reactivestreams.*; @@ -438,4 +438,84 @@ public void onComplete() { assertEquals(1, calls); } + + @Test + public void eventOrdering() { + final List list = new ArrayList(); + + Flowable.error(new TestException()) + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + list.add("cancel"); + } + }) + .doFinally(new Action() { + @Override + public void run() throws Exception { + list.add("finally"); + } + }) + .subscribe( + new Consumer() { + @Override + public void accept(Object v) throws Exception { + list.add("onNext"); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + list.add("onError"); + } + }, + new Action() { + @Override + public void run() throws Exception { + list.add("onComplete"); + } + }); + + assertEquals(Arrays.asList("onError", "finally"), list); + } + + @Test + public void eventOrdering2() { + final List list = new ArrayList(); + + Flowable.just(1) + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + list.add("cancel"); + } + }) + .doFinally(new Action() { + @Override + public void run() throws Exception { + list.add("finally"); + } + }) + .subscribe( + new Consumer() { + @Override + public void accept(Object v) throws Exception { + list.add("onNext"); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + list.add("onError"); + } + }, + new Action() { + @Override + public void run() throws Exception { + list.add("onComplete"); + } + }); + + assertEquals(Arrays.asList("onNext", "onComplete", "finally"), list); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java index 8f127b431b..f3f18fe33a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java @@ -91,6 +91,8 @@ public void testUnsubscribesFromUpstreamFlowable() { public void run() { unsub.set(true); }}) + .ignoreElements() + .toFlowable() .subscribe().dispose(); assertTrue(unsub.get()); @@ -207,6 +209,7 @@ public void testUnsubscribesFromUpstream() { public void run() { unsub.set(true); }}) + .ignoreElements() .subscribe().dispose(); assertTrue(unsub.get()); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoFinallyTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoFinallyTest.java index 7e2aa53213..293c9fc005 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoFinallyTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoFinallyTest.java @@ -15,11 +15,13 @@ import static org.junit.Assert.*; -import java.util.List; +import java.util.*; import org.junit.Test; -import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.TestHelper; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; @@ -442,4 +444,86 @@ public void onComplete() { assertEquals(1, calls); } + + + @Test + public void eventOrdering() { + final List list = new ArrayList(); + + Observable.error(new TestException()) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + list.add("dispose"); + } + }) + .doFinally(new Action() { + @Override + public void run() throws Exception { + list.add("finally"); + } + }) + .subscribe( + new Consumer() { + @Override + public void accept(Object v) throws Exception { + list.add("onNext"); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + list.add("onError"); + } + }, + new Action() { + @Override + public void run() throws Exception { + list.add("onComplete"); + } + }); + + assertEquals(Arrays.asList("onError", "finally"), list); + } + + @Test + public void eventOrdering2() { + final List list = new ArrayList(); + + Observable.just(1) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + list.add("dispose"); + } + }) + .doFinally(new Action() { + @Override + public void run() throws Exception { + list.add("finally"); + } + }) + .subscribe( + new Consumer() { + @Override + public void accept(Object v) throws Exception { + list.add("onNext"); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + list.add("onError"); + } + }, + new Action() { + @Override + public void run() throws Exception { + list.add("onComplete"); + } + }); + + assertEquals(Arrays.asList("onNext", "onComplete", "finally"), list); + } + } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java index c4940a68e1..05623556ad 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java @@ -82,12 +82,16 @@ public void testErrorReceivedObservable() { @Test public void testUnsubscribesFromUpstreamObservable() { final AtomicBoolean unsub = new AtomicBoolean(); - Observable.range(1, 10).doOnDispose(new Action() { + Observable.range(1, 10).concatWith(Observable.never()) + .doOnDispose(new Action() { @Override public void run() { unsub.set(true); }}) - .subscribe(); + .ignoreElements() + .toObservable() + .subscribe() + .dispose(); assertTrue(unsub.get()); } @@ -145,12 +149,15 @@ public void testErrorReceived() { @Test public void testUnsubscribesFromUpstream() { final AtomicBoolean unsub = new AtomicBoolean(); - Observable.range(1, 10).doOnDispose(new Action() { + Observable.range(1, 10).concatWith(Observable.never()) + .doOnDispose(new Action() { @Override public void run() { unsub.set(true); }}) - .subscribe(); + .ignoreElements() + .subscribe() + .dispose(); assertTrue(unsub.get()); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java index a74ca367e5..668df9ca30 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java @@ -67,8 +67,13 @@ public void run() { unsubscribed.set(true); } }; - Observable.just(1).doOnDispose(unsubscribeAction) - .takeLast(1).subscribe(); + Observable.just(1) + .concatWith(Observable.never()) + .doOnDispose(unsubscribeAction) + .takeLast(1) + .subscribe() + .dispose(); + assertTrue(unsubscribed.get()); }