Skip to content

Commit

Permalink
2.x: Fix Flowable.elementAt on empty sources. Plus sync tests (#4707)
Browse files Browse the repository at this point in the history
  • Loading branch information
vanniktech authored and akarnokd committed Oct 15, 2016
1 parent 212db45 commit d250ae7
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (index <= count && !done) {
if (!done) {
done = true;
T v = defaultValue;
if (v == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

import static org.junit.Assert.*;

import io.reactivex.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.List;
import java.util.NoSuchElementException;
import org.junit.Test;

import io.reactivex.Flowable;
import org.reactivestreams.*;

public class FlowableElementAtTest {

Expand Down Expand Up @@ -175,4 +180,59 @@ public void elementAtOrErrorIndex1OnEmptySource() {
.test()
.assertFailure(NoSuchElementException.class);
}


@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeFlowable(new Function<Flowable<Object>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Object> o) throws Exception {
return o.elementAt(0).toFlowable();
}
});
}

@Test
public void elementAtIndex1WithDefaultOnEmptySourceObservable() {
Flowable.empty()
.elementAt(1, 10)
.toFlowable()
.test()
.assertResult(10);
}

@Test
public void errorFlowable() {
Flowable.error(new TestException())
.elementAt(1, 10)
.toFlowable()
.test()
.assertFailure(TestException.class);
}

@Test
public void badSource() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());

subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new TestException());
subscriber.onComplete();
}
}
.elementAt(0)
.toFlowable()
.test()
.assertResult(1);

TestHelper.assertError(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
}

0 comments on commit d250ae7

Please sign in to comment.