diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java index 5aaf364441..b04bcaf9c8 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java @@ -13,6 +13,8 @@ package io.reactivex.internal.operators.flowable; +import java.util.NoSuchElementException; + import org.reactivestreams.*; import io.reactivex.internal.subscriptions.*; @@ -21,15 +23,18 @@ public final class FlowableElementAt extends AbstractFlowableWithUpstream { final long index; final T defaultValue; - public FlowableElementAt(Publisher source, long index, T defaultValue) { + final boolean errorOnFewer; + + public FlowableElementAt(Publisher source, long index, T defaultValue, boolean errorOnFewer) { super(source); this.index = index; this.defaultValue = defaultValue; + this.errorOnFewer = errorOnFewer; } @Override protected void subscribeActual(Subscriber s) { - source.subscribe(new ElementAtSubscriber(s, index, defaultValue)); + source.subscribe(new ElementAtSubscriber(s, index, defaultValue, errorOnFewer)); } static final class ElementAtSubscriber extends DeferredScalarSubscription implements Subscriber { @@ -38,6 +43,7 @@ static final class ElementAtSubscriber extends DeferredScalarSubscription final long index; final T defaultValue; + final boolean errorOnFewer; Subscription s; @@ -45,10 +51,11 @@ static final class ElementAtSubscriber extends DeferredScalarSubscription boolean done; - ElementAtSubscriber(Subscriber actual, long index, T defaultValue) { + ElementAtSubscriber(Subscriber actual, long index, T defaultValue, boolean errorOnFewer) { super(actual); this.index = index; this.defaultValue = defaultValue; + this.errorOnFewer = errorOnFewer; } @Override @@ -91,7 +98,11 @@ public void onComplete() { done = true; T v = defaultValue; if (v == null) { - actual.onComplete(); + if (errorOnFewer) { + actual.onError(new NoSuchElementException()); + } else { + actual.onComplete(); + } } else { complete(v); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java index b2ee24f2c0..15b4f69887 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java @@ -38,7 +38,7 @@ protected void subscribeActual(MaybeObserver s) { @Override public Flowable fuseToFlowable() { - return RxJavaPlugins.onAssembly(new FlowableElementAt(source, index, null)); + return RxJavaPlugins.onAssembly(new FlowableElementAt(source, index, null, false)); } static final class ElementAtSubscriber implements Subscriber, Disposable { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java index a53c6421d3..4207c68e4b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java @@ -42,7 +42,7 @@ protected void subscribeActual(SingleObserver s) { @Override public Flowable fuseToFlowable() { - return RxJavaPlugins.onAssembly(new FlowableElementAt(source, index, defaultValue)); + return RxJavaPlugins.onAssembly(new FlowableElementAt(source, index, defaultValue, true)); } static final class ElementAtSubscriber implements Subscriber, Disposable { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java index 4e217b94e8..2319264fe6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java @@ -13,6 +13,8 @@ package io.reactivex.internal.operators.observable; +import java.util.NoSuchElementException; + import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -21,20 +23,24 @@ public final class ObservableElementAt extends AbstractObservableWithUpstream { final long index; final T defaultValue; - public ObservableElementAt(ObservableSource source, long index, T defaultValue) { + final boolean errorOnFewer; + + public ObservableElementAt(ObservableSource source, long index, T defaultValue, boolean errorOnFewer) { super(source); this.index = index; this.defaultValue = defaultValue; + this.errorOnFewer = errorOnFewer; } @Override public void subscribeActual(Observer t) { - source.subscribe(new ElementAtObserver(t, index, defaultValue)); + source.subscribe(new ElementAtObserver(t, index, defaultValue, errorOnFewer)); } static final class ElementAtObserver implements Observer, Disposable { final Observer actual; final long index; final T defaultValue; + final boolean errorOnFewer; Disposable s; @@ -42,10 +48,11 @@ static final class ElementAtObserver implements Observer, Disposable { boolean done; - ElementAtObserver(Observer actual, long index, T defaultValue) { + ElementAtObserver(Observer actual, long index, T defaultValue, boolean errorOnFewer) { this.actual = actual; this.index = index; this.defaultValue = defaultValue; + this.errorOnFewer = errorOnFewer; } @Override @@ -99,10 +106,14 @@ public void onComplete() { if (!done) { done = true; T v = defaultValue; - if (v != null) { - actual.onNext(v); + if (v == null && errorOnFewer) { + actual.onError(new NoSuchElementException()); + } else { + if (v != null) { + actual.onNext(v); + } + actual.onComplete(); } - actual.onComplete(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java index 5a5dc024bb..e68f2c299b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java @@ -33,7 +33,7 @@ public void subscribeActual(MaybeObserver t) { @Override public Observable fuseToObservable() { - return RxJavaPlugins.onAssembly(new ObservableElementAt(source, index, null)); + return RxJavaPlugins.onAssembly(new ObservableElementAt(source, index, null, false)); } static final class ElementAtObserver implements Observer, Disposable { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java index e572b987ff..f38961d40e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java @@ -39,7 +39,7 @@ public void subscribeActual(SingleObserver t) { @Override public Observable fuseToObservable() { - return RxJavaPlugins.onAssembly(new ObservableElementAt(source, index, defaultValue)); + return RxJavaPlugins.onAssembly(new ObservableElementAt(source, index, defaultValue, true)); } static final class ElementAtObserver implements Observer, Disposable { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java index f5db3a9d65..3a6c968d49 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFirstTest.java @@ -544,4 +544,45 @@ public void firstOrErrorError() { .assertErrorMessage("error") .assertError(RuntimeException.class); } + + @Test + public void firstOrErrorNoElementFlowable() { + Flowable.empty() + .firstOrError() + .toFlowable() + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void firstOrErrorOneElementFlowable() { + Flowable.just(1) + .firstOrError() + .toFlowable() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void firstOrErrorMultipleElementsFlowable() { + Flowable.just(1, 2, 3) + .firstOrError() + .toFlowable() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void firstOrErrorErrorFlowable() { + Flowable.error(new RuntimeException("error")) + .firstOrError() + .toFlowable() + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java index d38e0a717e..a00d4e5ebc 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableLastTest.java @@ -357,4 +357,22 @@ public void error() { .test() .assertFailure(TestException.class); } + + @Test + public void errorLastOrErrorFlowable() { + Flowable.error(new TestException()) + .lastOrError() + .toFlowable() + .test() + .assertFailure(TestException.class); + } + + @Test + public void emptyLastOrErrorFlowable() { + Flowable.empty() + .lastOrError() + .toFlowable() + .test() + .assertFailure(NoSuchElementException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java index 9c0df1c5ec..c6dbe3ce3f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFirstTest.java @@ -541,4 +541,45 @@ public void firstOrErrorError() { .assertErrorMessage("error") .assertError(RuntimeException.class); } + + @Test + public void firstOrErrorNoElementObservable() { + Observable.empty() + .firstOrError() + .toObservable() + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } + + @Test + public void firstOrErrorOneElementObservable() { + Observable.just(1) + .firstOrError() + .toObservable() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void firstOrErrorMultipleElementsObservable() { + Observable.just(1, 2, 3) + .firstOrError() + .toObservable() + .test() + .assertNoErrors() + .assertValue(1); + } + + @Test + public void firstOrErrorErrorObservable() { + Observable.error(new RuntimeException("error")) + .firstOrError() + .toObservable() + .test() + .assertNoValues() + .assertErrorMessage("error") + .assertError(RuntimeException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java index ed20ed08b2..499f303f76 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableLastTest.java @@ -358,4 +358,22 @@ public void error() { .test() .assertFailure(TestException.class); } + + @Test + public void errorLastOrErrorObservable() { + Observable.error(new TestException()) + .lastOrError() + .toObservable() + .test() + .assertFailure(TestException.class); + } + + @Test + public void emptyLastOrErrorObservable() { + Observable.empty() + .lastOrError() + .toObservable() + .test() + .assertFailure(NoSuchElementException.class); + } }