Skip to content

Commit

Permalink
2.x: fix firstOrError back conversions not signalling NSE
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Nov 28, 2016
1 parent b238cc8 commit 8429dc7
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;

import org.reactivestreams.*;

import io.reactivex.internal.subscriptions.*;
Expand All @@ -21,15 +23,18 @@
public final class FlowableElementAt<T> extends AbstractFlowableWithUpstream<T, T> {
final long index;
final T defaultValue;
public FlowableElementAt(Publisher<T> source, long index, T defaultValue) {
final boolean errorOnFewer;

public FlowableElementAt(Publisher<T> source, long index, T defaultValue, boolean errorOnFewer) {
super(source);
this.index = index;
this.defaultValue = defaultValue;
this.errorOnFewer = errorOnFewer;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new ElementAtSubscriber<T>(s, index, defaultValue));
source.subscribe(new ElementAtSubscriber<T>(s, index, defaultValue, errorOnFewer));
}

static final class ElementAtSubscriber<T> extends DeferredScalarSubscription<T> implements Subscriber<T> {
Expand All @@ -38,17 +43,19 @@ static final class ElementAtSubscriber<T> extends DeferredScalarSubscription<T>

final long index;
final T defaultValue;
final boolean errorOnFewer;

Subscription s;

long count;

boolean done;

ElementAtSubscriber(Subscriber<? super T> actual, long index, T defaultValue) {
ElementAtSubscriber(Subscriber<? super T> actual, long index, T defaultValue, boolean errorOnFewer) {
super(actual);
this.index = index;
this.defaultValue = defaultValue;
this.errorOnFewer = errorOnFewer;
}

@Override
Expand Down Expand Up @@ -91,7 +98,11 @@ public void onComplete() {
done = true;
T v = defaultValue;
if (v == null) {
actual.onComplete();
if (errorOnFewer) {
actual.onError(new NoSuchElementException());
} else {
actual.onComplete();
}
} else {
complete(v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected void subscribeActual(MaybeObserver<? super T> s) {

@Override
public Flowable<T> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, null));
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, null, false));
}

static final class ElementAtSubscriber<T> implements Subscriber<T>, Disposable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected void subscribeActual(SingleObserver<? super T> s) {

@Override
public Flowable<T> fuseToFlowable() {
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, defaultValue));
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, defaultValue, true));
}

static final class ElementAtSubscriber<T> implements Subscriber<T>, Disposable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.reactivex.internal.operators.observable;

import java.util.NoSuchElementException;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
Expand All @@ -21,31 +23,36 @@
public final class ObservableElementAt<T> extends AbstractObservableWithUpstream<T, T> {
final long index;
final T defaultValue;
public ObservableElementAt(ObservableSource<T> source, long index, T defaultValue) {
final boolean errorOnFewer;

public ObservableElementAt(ObservableSource<T> source, long index, T defaultValue, boolean errorOnFewer) {
super(source);
this.index = index;
this.defaultValue = defaultValue;
this.errorOnFewer = errorOnFewer;
}
@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new ElementAtObserver<T>(t, index, defaultValue));
source.subscribe(new ElementAtObserver<T>(t, index, defaultValue, errorOnFewer));
}

static final class ElementAtObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final long index;
final T defaultValue;
final boolean errorOnFewer;

Disposable s;

long count;

boolean done;

ElementAtObserver(Observer<? super T> actual, long index, T defaultValue) {
ElementAtObserver(Observer<? super T> actual, long index, T defaultValue, boolean errorOnFewer) {
this.actual = actual;
this.index = index;
this.defaultValue = defaultValue;
this.errorOnFewer = errorOnFewer;
}

@Override
Expand Down Expand Up @@ -99,10 +106,14 @@ public void onComplete() {
if (!done) {
done = true;
T v = defaultValue;
if (v != null) {
actual.onNext(v);
if (v == null && errorOnFewer) {
actual.onError(new NoSuchElementException());
} else {
if (v != null) {
actual.onNext(v);
}
actual.onComplete();
}
actual.onComplete();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void subscribeActual(MaybeObserver<? super T> t) {

@Override
public Observable<T> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, null));
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, null, false));
}

static final class ElementAtObserver<T> implements Observer<T>, Disposable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void subscribeActual(SingleObserver<? super T> t) {

@Override
public Observable<T> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, defaultValue));
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, defaultValue, true));
}

static final class ElementAtObserver<T> implements Observer<T>, Disposable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,45 @@ public void firstOrErrorError() {
.assertErrorMessage("error")
.assertError(RuntimeException.class);
}

@Test
public void firstOrErrorNoElementFlowable() {
Flowable.empty()
.firstOrError()
.toFlowable()
.test()
.assertNoValues()
.assertError(NoSuchElementException.class);
}

@Test
public void firstOrErrorOneElementFlowable() {
Flowable.just(1)
.firstOrError()
.toFlowable()
.test()
.assertNoErrors()
.assertValue(1);
}

@Test
public void firstOrErrorMultipleElementsFlowable() {
Flowable.just(1, 2, 3)
.firstOrError()
.toFlowable()
.test()
.assertNoErrors()
.assertValue(1);
}

@Test
public void firstOrErrorErrorFlowable() {
Flowable.error(new RuntimeException("error"))
.firstOrError()
.toFlowable()
.test()
.assertNoValues()
.assertErrorMessage("error")
.assertError(RuntimeException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,22 @@ public void error() {
.test()
.assertFailure(TestException.class);
}

@Test
public void errorLastOrErrorFlowable() {
Flowable.error(new TestException())
.lastOrError()
.toFlowable()
.test()
.assertFailure(TestException.class);
}

@Test
public void emptyLastOrErrorFlowable() {
Flowable.empty()
.lastOrError()
.toFlowable()
.test()
.assertFailure(NoSuchElementException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,45 @@ public void firstOrErrorError() {
.assertErrorMessage("error")
.assertError(RuntimeException.class);
}

@Test
public void firstOrErrorNoElementObservable() {
Observable.empty()
.firstOrError()
.toObservable()
.test()
.assertNoValues()
.assertError(NoSuchElementException.class);
}

@Test
public void firstOrErrorOneElementObservable() {
Observable.just(1)
.firstOrError()
.toObservable()
.test()
.assertNoErrors()
.assertValue(1);
}

@Test
public void firstOrErrorMultipleElementsObservable() {
Observable.just(1, 2, 3)
.firstOrError()
.toObservable()
.test()
.assertNoErrors()
.assertValue(1);
}

@Test
public void firstOrErrorErrorObservable() {
Observable.error(new RuntimeException("error"))
.firstOrError()
.toObservable()
.test()
.assertNoValues()
.assertErrorMessage("error")
.assertError(RuntimeException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,22 @@ public void error() {
.test()
.assertFailure(TestException.class);
}

@Test
public void errorLastOrErrorObservable() {
Observable.error(new TestException())
.lastOrError()
.toObservable()
.test()
.assertFailure(TestException.class);
}

@Test
public void emptyLastOrErrorObservable() {
Observable.empty()
.lastOrError()
.toObservable()
.test()
.assertFailure(NoSuchElementException.class);
}
}

0 comments on commit 8429dc7

Please sign in to comment.