Skip to content

Commit

Permalink
FlowableScanSeed - prevent post-terminal events
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Dec 3, 2016
1 parent 846afd3 commit 9918a4f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ static final class ScanSeedSubscriber<T, R> extends SinglePostCompleteSubscriber
private static final long serialVersionUID = -1776795561228106469L;

final BiFunction<R, ? super T, R> accumulator;

boolean done = false;

ScanSeedSubscriber(Subscriber<? super R> actual, BiFunction<R, ? super T, R> accumulator, R value) {
super(actual);
Expand Down Expand Up @@ -80,12 +82,20 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (done) {
return;
}
done = true;
value = null;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
complete(value);
}
}
Expand Down
64 changes: 64 additions & 0 deletions src/test/java/io/reactivex/flowable/FlowableScanTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package io.reactivex.flowable;

import java.util.HashMap;
import java.util.concurrent.Callable;

import org.junit.Test;

import io.reactivex.Flowable;
import io.reactivex.flowable.FlowableEventStream.Event;
import io.reactivex.functions.*;

Expand All @@ -41,4 +43,66 @@ public void accept(HashMap<String, String> v) {
}
});
}

@Test
public void testFlowableScanSeedDoesNotEmitErrorTwiceIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
Burst.item(1).error(e).scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
}

@Test
public void testFlowableScanSeedDoesNotEmitTerminalEventTwiceIfScanFunctionThrows() {
final RuntimeException e = new RuntimeException();
Burst.item(1).create().scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer n1, Integer n2) throws Exception {
throw e;
}})
.test()
.assertNoValues()
.assertError(e);
}

@Test
public void testFlowableScanSeedCompletesNormally() {
Flowable.just(1,2,3).scan(0, new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}})
.test()
.assertValues(0, 1, 3, 6)
.assertComplete();
}

@Test
public void testFlowableScanSeedWhenScanSeedProviderThrows() {
final RuntimeException e = new RuntimeException();
Flowable.just(1,2,3).scanWith(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw e;
}
},
new BiFunction<Integer, Integer, Integer>() {

@Override
public Integer apply(Integer t1, Integer t2) throws Exception {
return t1 + t2;
}
})
.test()
.assertError(e)
.assertNoValues();
}
}

0 comments on commit 9918a4f

Please sign in to comment.