diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 36d3b03765..bfc2b2a83d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -5066,6 +5066,12 @@ public final Flux flatMap( * @param mapper the {@link Function} to transform input sequence into N {@link Iterable} * @param the merged output sequence type * + * @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors} + * (including when fusion is enabled). Exceptions thrown by the consumer are passed to + * the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer + * is not invoked, as the source element will be part of the sequence). The onNext + * signal is then propagated as normal. + * * @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux} */ public final Flux flatMapIterable(Function> mapper) { @@ -5091,6 +5097,12 @@ public final Flux flatMapIterable(Function the merged output sequence type * + * @reactor.errorMode This operator supports {@link #onErrorContinue(BiConsumer) resuming on errors} + * (including when fusion is enabled). Exceptions thrown by the consumer are passed to + * the {@link #onErrorContinue(BiConsumer)} error consumer (the value consumer + * is not invoked, as the source element will be part of the sequence). The onNext + * signal is then propagated as normal. + * * @return a concatenation of the values from the Iterables obtained from each element in this {@link Flux} */ public final Flux flatMapIterable(Function> mapper, int prefetch) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java index 3b7cf8f748..30da88db9a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java @@ -98,8 +98,14 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act } catch (Throwable ex) { Context ctx = actual.currentContext(); - Operators.error(actual, Operators.onOperatorError(ex, ctx)); + Throwable e_ = Operators.onNextError(v, ex, ctx); Operators.onDiscard(v, ctx); + if (e_ != null) { + Operators.error(actual, e_); + } + else { + Operators.complete(actual); + } return null; } @@ -341,8 +347,11 @@ void drainAsync() { catch (Throwable exc) { it = null; Context ctx = actual.currentContext(); - onError(Operators.onOperatorError(s, exc, t, ctx)); + Throwable e_ = Operators.onNextError(t, exc, ctx, s); Operators.onDiscard(t, ctx); + if (e_ != null) { + onError(e_); + } continue; } @@ -525,9 +534,13 @@ void drainSync() { catch (Throwable exc) { current = null; Context ctx = actual.currentContext(); - a.onError(Operators.onOperatorError(s, exc, t, ctx)); + Throwable e_ = Operators.onNextError(t, exc, ctx, s); Operators.onDiscard(t, ctx); - return; + if (e_ != null) { + a.onError(e_); + return; + } + continue; } if (!b) { diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java index a907d581bb..2d65bf6b41 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java @@ -16,10 +16,11 @@ package reactor.core.publisher; +import static org.assertj.core.api.Java6Assertions.assertThat; + import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.function.Function; @@ -28,18 +29,16 @@ import org.junit.Test; import org.reactivestreams.Subscription; + import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.Scannable; import reactor.core.Scannable.Attr; -import reactor.core.Scannable.Attr; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; import reactor.test.subscriber.AssertSubscriber; import reactor.util.concurrent.Queues; -import static org.assertj.core.api.Java6Assertions.assertThat; - public class FluxFlattenIterableTest extends FluxOperatorTest { @Override @@ -396,4 +395,128 @@ public void syncDrainWithPollFailure() { .expectErrorMessage("boom") .verify(Duration.ofSeconds(1)); } + + @Test + public void errorModeContinueNullPublisherNotFused() { + Flux test = Flux + .just(1, 2) + .hide() + .flatMapIterable(f -> { + if (f == 1) { + return null; + } + return Arrays.asList(f); + }) + .onErrorContinue(OnNextFailureStrategyTest::drop); + + StepVerifier.create(test) + .expectNoFusionSupport() + .expectNext(2) + .expectComplete() + .verifyThenAssertThat() + .hasDropped(1) + .hasDroppedErrors(1); + } + + @Test + public void errorModeContinueInternalErrorNotFused() { + Flux test = Flux + .just(1, 2) + .hide() + .flatMapIterable(f -> { + if (f == 1) { + throw new IllegalStateException("boom"); + } + return Arrays.asList(f); + }) + .onErrorContinue(OnNextFailureStrategyTest::drop); + + StepVerifier.create(test) + .expectNoFusionSupport() + .expectNext(2) + .expectComplete() + .verifyThenAssertThat() + .hasDropped(1) + .hasDroppedErrors(1); + } + + @Test + public void errorModeContinueSingleElementNotFused() { + Flux test = Flux + .just(1) + .hide() + .flatMapIterable(f -> { + if (f == 1) { + throw new IllegalStateException("boom"); + } + return Arrays.asList(f); + }) + .onErrorContinue(OnNextFailureStrategyTest::drop); + + StepVerifier.create(test) + .expectNoFusionSupport() + .expectComplete() + .verifyThenAssertThat() + .hasDropped(1) + .hasDroppedErrors(1); + } + + @Test + public void errorModeContinueNullPublisherFused() { + Flux test = Flux + .just(1, 2) + .flatMapIterable(f -> { + if (f == 1) { + return null; + } + return Arrays.asList(f); + }) + .onErrorContinue(OnNextFailureStrategyTest::drop); + + StepVerifier.create(test) + .expectNext(2) + .expectComplete() + .verifyThenAssertThat() + .hasDropped(1) + .hasDroppedErrors(1); + } + + @Test + public void errorModeContinueInternalErrorFused() { + Flux test = Flux + .just(1, 2) + .flatMapIterable(f -> { + if (f == 1) { + throw new IllegalStateException("boom"); + } + return Arrays.asList(f); + }) + .onErrorContinue(OnNextFailureStrategyTest::drop); + + StepVerifier.create(test) + .expectNext(2) + .expectComplete() + .verifyThenAssertThat() + .hasDropped(1) + .hasDroppedErrors(1); + } + + @Test + public void errorModeContinueSingleElementFused() { + Flux test = Flux + .just(1) + .flatMapIterable(f -> { + if (f == 1) { + throw new IllegalStateException("boom"); + } + return Arrays.asList(f); + }) + .onErrorContinue(OnNextFailureStrategyTest::drop); + + StepVerifier.create(test) + .expectComplete() + .verifyThenAssertThat() + .hasDropped(1) + .hasDroppedErrors(1); + } }