Skip to content

Commit

Permalink
fix reactor#2008 Add onErrorContinue support to FluxFlattenIterable
Browse files Browse the repository at this point in the history
  • Loading branch information
Miel Donkers authored and simonbasle committed Jan 13, 2020
1 parent 0953bcf commit a2fc0e3
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 8 deletions.
12 changes: 12 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -5066,6 +5066,12 @@ public final <R> Flux<R> flatMap(
* @param mapper the {@link Function} to transform input sequence into N {@link Iterable}
* @param <R> the merged output sequence type
*
* @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
* @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux}
*/
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper) {
Expand All @@ -5091,6 +5097,12 @@ public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<
* @param prefetch the maximum in-flight elements from each inner {@link Iterable} sequence
* @param <R> the merged output sequence type
*
* @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors}
* (including when fusion is enabled). Exceptions thrown by the consumer are passed to
* the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer
* is not invoked, as the source element will be part of the sequence). The onNext
* signal is then propagated as normal.
*
* @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux}
*/
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> act
}
catch (Throwable ex) {
Context ctx = actual.currentContext();
Operators.error(actual, Operators.onOperatorError(ex, ctx));
Throwable e_ = Operators.onNextError(v, ex, ctx);
Operators.onDiscard(v, ctx);
if (e_ != null) {
Operators.error(actual, e_);
}
else {
Operators.complete(actual);
}
return null;
}

Expand Down Expand Up @@ -341,8 +347,11 @@ void drainAsync() {
catch (Throwable exc) {
it = null;
Context ctx = actual.currentContext();
onError(Operators.onOperatorError(s, exc, t, ctx));
Throwable e_ = Operators.onNextError(t, exc, ctx, s);
Operators.onDiscard(t, ctx);
if (e_ != null) {
onError(e_);
}
continue;
}

Expand Down Expand Up @@ -525,9 +534,13 @@ void drainSync() {
catch (Throwable exc) {
current = null;
Context ctx = actual.currentContext();
a.onError(Operators.onOperatorError(s, exc, t, ctx));
Throwable e_ = Operators.onNextError(t, exc, ctx, s);
Operators.onDiscard(t, ctx);
return;
if (e_ != null) {
a.onError(e_);
return;
}
continue;
}

if (!b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package reactor.core.publisher;

import static org.assertj.core.api.Java6Assertions.assertThat;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
Expand All @@ -28,18 +29,16 @@

import org.junit.Test;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.Scannable.Attr;
import reactor.core.Scannable.Attr;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.concurrent.Queues;

import static org.assertj.core.api.Java6Assertions.assertThat;

public class FluxFlattenIterableTest extends FluxOperatorTest<String, String> {

@Override
Expand Down Expand Up @@ -396,4 +395,128 @@ public void syncDrainWithPollFailure() {
.expectErrorMessage("boom")
.verify(Duration.ofSeconds(1));
}

@Test
public void errorModeContinueNullPublisherNotFused() {
Flux<Integer> test = Flux
.just(1, 2)
.hide()
.flatMapIterable(f -> {
if (f == 1) {
return null;
}
return Arrays.asList(f);
})
.onErrorContinue(OnNextFailureStrategyTest::drop);

StepVerifier.create(test)
.expectNoFusionSupport()
.expectNext(2)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(1)
.hasDroppedErrors(1);
}

@Test
public void errorModeContinueInternalErrorNotFused() {
Flux<Integer> test = Flux
.just(1, 2)
.hide()
.flatMapIterable(f -> {
if (f == 1) {
throw new IllegalStateException("boom");
}
return Arrays.asList(f);
})
.onErrorContinue(OnNextFailureStrategyTest::drop);

StepVerifier.create(test)
.expectNoFusionSupport()
.expectNext(2)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(1)
.hasDroppedErrors(1);
}

@Test
public void errorModeContinueSingleElementNotFused() {
Flux<Integer> test = Flux
.just(1)
.hide()
.flatMapIterable(f -> {
if (f == 1) {
throw new IllegalStateException("boom");
}
return Arrays.asList(f);
})
.onErrorContinue(OnNextFailureStrategyTest::drop);

StepVerifier.create(test)
.expectNoFusionSupport()
.expectComplete()
.verifyThenAssertThat()
.hasDropped(1)
.hasDroppedErrors(1);
}

@Test
public void errorModeContinueNullPublisherFused() {
Flux<Integer> test = Flux
.just(1, 2)
.flatMapIterable(f -> {
if (f == 1) {
return null;
}
return Arrays.asList(f);
})
.onErrorContinue(OnNextFailureStrategyTest::drop);

StepVerifier.create(test)
.expectNext(2)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(1)
.hasDroppedErrors(1);
}

@Test
public void errorModeContinueInternalErrorFused() {
Flux<Integer> test = Flux
.just(1, 2)
.flatMapIterable(f -> {
if (f == 1) {
throw new IllegalStateException("boom");
}
return Arrays.asList(f);
})
.onErrorContinue(OnNextFailureStrategyTest::drop);

StepVerifier.create(test)
.expectNext(2)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(1)
.hasDroppedErrors(1);
}

@Test
public void errorModeContinueSingleElementFused() {
Flux<Integer> test = Flux
.just(1)
.flatMapIterable(f -> {
if (f == 1) {
throw new IllegalStateException("boom");
}
return Arrays.asList(f);
})
.onErrorContinue(OnNextFailureStrategyTest::drop);

StepVerifier.create(test)
.expectComplete()
.verifyThenAssertThat()
.hasDropped(1)
.hasDroppedErrors(1);
}
}

0 comments on commit a2fc0e3

Please sign in to comment.