Skip to content

Commit

Permalink
fix reactor#2042 collect() discards on consumer error when fused
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle authored Feb 17, 2020
1 parent 20113d5 commit 5e39c93
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1566,10 +1566,9 @@ public MonoSubscriber(CoreSubscriber<? super O> actual) {
@Override
public void cancel() {
O v = value;
if (STATE.getAndSet(this, CANCELLED) <= HAS_REQUEST_NO_VALUE) {
discard(v);
}
value = null;
STATE.set(this, CANCELLED);
discard(v);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
Expand Down Expand Up @@ -154,6 +155,39 @@ public void discardElementOnAccumulatorFailure() {
.hasDiscardedExactly(1);
}

@Test
public void discardElementAndBufferOnAccumulatorLateFailure() {
Flux.just(1, 2, 3, 4)
.hide()
.collect(ArrayList::new, (l, t) -> {
if (t == 3) {
throw new IllegalStateException("accumulator: boom");
}
l.add(t);
})
.as(StepVerifier::create)
.expectErrorMessage("accumulator: boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void discardElementAndBufferOnAccumulatorLateFailure_fused() {
Flux.just(1, 2, 3, 4)
.collect(ArrayList::new, (l, t) -> {
if (t == 3) {
throw new IllegalStateException("accumulator: boom");
}
l.add(t);
})
.as(StepVerifier::create)
//WARNING: we need to request fusion so this expectFusion is important
.expectFusion(Fuseable.ASYNC)
.expectErrorMessage("accumulator: boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3);
}

@Test
public void discardListElementsOnError() {
Mono<List<Integer>> test =
Expand Down

0 comments on commit 5e39c93

Please sign in to comment.