From d250ae7e90424f56cc267a96435a10eec99a8dcc Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Sat, 15 Oct 2016 14:44:59 +0200 Subject: [PATCH] 2.x: Fix Flowable.elementAt on empty sources. Plus sync tests (#4707) --- .../operators/flowable/FlowableElementAt.java | 2 +- .../flowable/FlowableElementAtTest.java | 64 ++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java index 69faf7916e..5aaf364441 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java @@ -87,7 +87,7 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (index <= count && !done) { + if (!done) { done = true; T v = defaultValue; if (v == null) { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java index f3ed4ec1f7..dfa0da02ea 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java @@ -15,10 +15,15 @@ import static org.junit.Assert.*; +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import java.util.List; import java.util.NoSuchElementException; import org.junit.Test; - -import io.reactivex.Flowable; +import org.reactivestreams.*; public class FlowableElementAtTest { @@ -175,4 +180,59 @@ public void elementAtOrErrorIndex1OnEmptySource() { .test() .assertFailure(NoSuchElementException.class); } + + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable o) throws Exception { + return o.elementAt(0).toFlowable(); + } + }); + } + + @Test + public void elementAtIndex1WithDefaultOnEmptySourceObservable() { + Flowable.empty() + .elementAt(1, 10) + .toFlowable() + .test() + .assertResult(10); + } + + @Test + public void errorFlowable() { + Flowable.error(new TestException()) + .elementAt(1, 10) + .toFlowable() + .test() + .assertFailure(TestException.class); + } + + @Test + public void badSource() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber subscriber) { + subscriber.onSubscribe(new BooleanSubscription()); + + subscriber.onNext(1); + subscriber.onNext(2); + subscriber.onError(new TestException()); + subscriber.onComplete(); + } + } + .elementAt(0) + .toFlowable() + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } }