diff --git a/build.gradle b/build.gradle index 251c82b442..dd97ca775d 100644 --- a/build.gradle +++ b/build.gradle @@ -58,6 +58,7 @@ ext { // Testing assertJVersion = '3.9.0' mockitoVersion = '2.10.0' + jUnitParamsVersion = '1.1.1' javadocLinks = ["https://docs.oracle.com/javase/8/docs/api/", "https://docs.oracle.com/javaee/6/api/", @@ -265,7 +266,8 @@ project('reactor-core') { "org.testng:testng:6.8.5", "org.assertj:assertj-core:$assertJVersion", "org.mockito:mockito-core:$mockitoVersion", - "org.openjdk.jol:jol-core:0.9" + "org.openjdk.jol:jol-core:0.9", + "pl.pragmatists:JUnitParams:$jUnitParamsVersion" if ("$compatibleVersion" != "SKIP") { baseline("io.projectreactor:reactor-core:$compatibleVersion") { diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 56dc8a2698..7262045bf4 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -3675,6 +3675,9 @@ public final Flux doOnComplete(Runnable onComplete) { * @see Signal */ public final Flux doOnEach(Consumer> signalConsumer) { + if (this instanceof Fuseable) { + return onAssembly(new FluxDoOnEachFuseable<>(this, signalConsumer)); + } return onAssembly(new FluxDoOnEach<>(this, signalConsumer)); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEach.java b/reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEach.java index c47adbb20d..5159dd84d3 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEach.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEach.java @@ -22,6 +22,8 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Exceptions; +import reactor.core.Fuseable; +import reactor.core.Fuseable.ConditionalSubscriber; import reactor.util.annotation.Nullable; import reactor.util.context.Context; @@ -41,14 +43,28 @@ final class FluxDoOnEach extends FluxOperator { this.onSignal = Objects.requireNonNull(onSignal, "onSignal"); } + @SuppressWarnings("unchecked") + static DoOnEachSubscriber createSubscriber(CoreSubscriber actual, + Consumer> onSignal, boolean fuseable) { + if (fuseable) { + if(actual instanceof ConditionalSubscriber) { + return new DoOnEachFuseableConditionalSubscriber<>((ConditionalSubscriber) actual, onSignal); + } + return new DoOnEachFuseableSubscriber<>(actual, onSignal); + } + + if (actual instanceof ConditionalSubscriber) { + return new DoOnEachConditionalSubscriber<>((ConditionalSubscriber) actual, onSignal); + } + return new DoOnEachSubscriber<>(actual, onSignal); + } + @Override public void subscribe(CoreSubscriber actual) { - //TODO fuseable version? - //TODO conditional version? - source.subscribe(new DoOnEachSubscriber<>(actual, onSignal)); + source.subscribe(createSubscriber(actual, onSignal, false)); } - static final class DoOnEachSubscriber implements InnerOperator, Signal { + static class DoOnEachSubscriber implements InnerOperator, Signal { final CoreSubscriber actual; final Context cachedContext; @@ -57,6 +73,8 @@ static final class DoOnEachSubscriber implements InnerOperator, Signal< T t; Subscription s; + @Nullable + Fuseable.QueueSubscription qs; boolean done; @@ -79,8 +97,11 @@ public void cancel() { @Override public void onSubscribe(Subscription s) { - this.s = s; - actual.onSubscribe(this); + if (Operators.validate(this.s, s)) { + this.s = s; + this.qs = Operators.as(s); + actual.onSubscribe(this); + } } @Override @@ -203,4 +224,105 @@ public String toString() { return "doOnEach_onNext(" + t + ")"; } } + + static class DoOnEachFuseableSubscriber extends DoOnEachSubscriber + implements Fuseable, Fuseable.QueueSubscription { + + boolean syncFused; + + DoOnEachFuseableSubscriber(CoreSubscriber actual, + Consumer> onSignal) { + super(actual, onSignal); + } + + @Override + public int requestFusion(int mode) { + QueueSubscription qs = this.qs; + if (qs != null && (mode & Fuseable.THREAD_BARRIER) == 0) { + int m = qs.requestFusion(mode); + if (m != Fuseable.NONE) { + syncFused = m == Fuseable.SYNC; + } + return m; + } + return Fuseable.NONE; + } + + @Override + public void clear() { + qs.clear(); //throws NPE, but should only be called after onSubscribe on a Fuseable + } + + @Override + public boolean isEmpty() { + return qs == null || qs.isEmpty(); + } + + @Override + @Nullable + public T poll() { + if (qs == null) { + return null; + } + T v = qs.poll(); + if (v == null && syncFused) { + done = true; + try { + onSignal.accept(Signal.complete(cachedContext)); + } + catch (Throwable e) { + throw e; + } + } else if (v != null) { + this.t = v; + onSignal.accept(this); //throws in case of error + } + return v; + } + + @Override + public int size() { + return qs == null ? 0 : qs.size(); + } + } + + static final class DoOnEachConditionalSubscriber extends DoOnEachSubscriber + implements ConditionalSubscriber { + + DoOnEachConditionalSubscriber(ConditionalSubscriber actual, + Consumer> onSignal) { + super(actual, onSignal); + } + + @Override + @SuppressWarnings("unchecked") + public boolean tryOnNext(T t) { + boolean result = ((ConditionalSubscriber)actual).tryOnNext(t); + if (result) { + this.t = t; + onSignal.accept(this); + } + return result; + } + } + + static final class DoOnEachFuseableConditionalSubscriber extends DoOnEachFuseableSubscriber + implements ConditionalSubscriber { + + DoOnEachFuseableConditionalSubscriber(ConditionalSubscriber actual, + Consumer> onSignal) { + super(actual, onSignal); + } + + @Override + @SuppressWarnings("unchecked") + public boolean tryOnNext(T t) { + boolean result = ((ConditionalSubscriber) actual).tryOnNext(t); + if (result) { + this.t = t; + onSignal.accept(this); + } + return result; + } + } } \ No newline at end of file diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEachFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEachFuseable.java new file mode 100644 index 0000000000..1a764c41c4 --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEachFuseable.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.Objects; +import java.util.function.Consumer; + +import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; + +/** + * Peek into the lifecycle events and signals of a sequence, {@link Fuseable} version of + * {@link FluxDoOnEach}. + * + * @param the value type + * + * @see Reactive-Streams-Commons + */ +final class FluxDoOnEachFuseable extends FluxOperator implements Fuseable { + + final Consumer> onSignal; + + FluxDoOnEachFuseable(Flux source, Consumer> onSignal) { + super(source); + this.onSignal = Objects.requireNonNull(onSignal, "onSignal"); + } + + @Override + public void subscribe(CoreSubscriber actual) { + this.source.subscribe(FluxDoOnEach.createSubscriber(actual, this.onSignal, true)); + } +} \ No newline at end of file diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index e431c181f8..a1017245c5 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -1774,6 +1774,9 @@ public final Mono doOnSuccess(Consumer onSuccess) { */ public final Mono doOnEach(Consumer> signalConsumer) { Objects.requireNonNull(signalConsumer, "signalConsumer"); + if (this instanceof Fuseable) { + return onAssembly(new MonoDoOnEachFuseable<>(this, signalConsumer)); + } return onAssembly(new MonoDoOnEach<>(this, signalConsumer)); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoDoOnEach.java b/reactor-core/src/main/java/reactor/core/publisher/MonoDoOnEach.java index 9b52a69fe6..b93be53f9f 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoDoOnEach.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoDoOnEach.java @@ -39,8 +39,6 @@ final class MonoDoOnEach extends MonoOperator { @Override public void subscribe(CoreSubscriber actual) { - //TODO fuseable version? - //TODO conditional version? - source.subscribe(new FluxDoOnEach.DoOnEachSubscriber<>(actual, onSignal)); + source.subscribe(FluxDoOnEach.createSubscriber(actual, onSignal, false)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoDoOnEachFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoDoOnEachFuseable.java new file mode 100644 index 0000000000..5b2099e61c --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoDoOnEachFuseable.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.Objects; +import java.util.function.Consumer; + +import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; + +/** + * Peek into the lifecycle events and signals of a sequence, {@link reactor.core.Fuseable} + * version of {@link MonoDoOnEach}. + * + * @param the value type + * + * @see Reactive-Streams-Commons + */ +final class MonoDoOnEachFuseable extends MonoOperator implements Fuseable { + + final Consumer> onSignal; + + MonoDoOnEachFuseable(Mono source, Consumer> onSignal) { + super(source); + this.onSignal = Objects.requireNonNull(onSignal, "onSignal"); + } + + @Override + public void subscribe(CoreSubscriber actual) { + source.subscribe(FluxDoOnEach.createSubscriber(actual, onSignal, true)); + } +} diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java index a1a36b4c2b..7a13ff311c 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxDoOnEachTest.java @@ -25,12 +25,20 @@ import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Exceptions; +import reactor.core.Fuseable; +import reactor.core.Fuseable.ConditionalSubscriber; import reactor.core.Scannable; +import reactor.core.publisher.FluxDoOnEach.DoOnEachConditionalSubscriber; +import reactor.core.publisher.FluxDoOnEach.DoOnEachFuseableConditionalSubscriber; +import reactor.core.publisher.FluxPeekFuseableTest.AssertQueueSubscription; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.StepVerifierOptions; @@ -40,6 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; +@RunWith(JUnitParamsRunner.class) public class FluxDoOnEachTest { @Test(expected = NullPointerException.class) @@ -47,8 +56,50 @@ public void nullSource() { new FluxDoOnEach<>(null, null); } + private static final String sourceErrorMessage = "boomSource"; + + private Object[] sourcesError() { + return new Object[] { + new Object[] { Flux.error(new IllegalStateException(sourceErrorMessage)) + .hide() }, + new Object[] { Flux.error(new IllegalStateException(sourceErrorMessage)) + .hide().filter(i -> true) }, + new Object[] { Flux.error(new IllegalStateException(sourceErrorMessage)) }, + new Object[] { Flux.error(new IllegalStateException(sourceErrorMessage)) + .filter(i -> true) } + }; + } + + private Object[] sources12Complete() { + return new Object[] { + new Object[] { Flux.just(1,2).hide() }, + new Object[] { Flux.just(1,2).hide().filter(i -> true) }, + new Object[] { Flux.just(1,2) }, + new Object[] { Flux.just(1,2).filter(i -> true) } + }; + } + + private Object[] sourcesEmpty() { + return new Object[] { + new Object[] { Flux.empty().hide() }, + new Object[] { Flux.empty().hide().filter(i -> true) }, + new Object[] { Flux.empty() }, + new Object[] { Flux.empty().filter(i -> true) } + }; + } + + private Object[] sourcesNever() { + return new Object[] { + new Object[] { Flux.never().hide() }, + new Object[] { Flux.never().hide().filter(i -> true) }, + new Object[] { Flux.never() }, + new Object[] { Flux.never().filter(i -> true) } + }; + } + @Test - public void normal() { + @Parameters(method = "sources12Complete") + public void normal(Flux source) { AssertSubscriber ts = AssertSubscriber.create(); AtomicReference onNext = new AtomicReference<>(); @@ -56,9 +107,7 @@ public void normal() { AtomicBoolean onComplete = new AtomicBoolean(); LongAdder state = new LongAdder(); - Flux.just(1, 2) - .hide() - .doOnEach(s -> { + source.doOnEach(s -> { if (s.isOnNext()) { onNext.set(s.get()); state.increment(); @@ -69,17 +118,17 @@ else if (s.isOnError()) { else if (s.isOnComplete()) { onComplete.set(true); } - }) + }) + .filter(t -> true) .subscribe(ts); - Assert.assertEquals((Integer) 2, onNext.get()); - Assert.assertNull(onError.get()); - Assert.assertTrue(onComplete.get()); + assertThat(onNext).hasValue(2); + assertThat(onError).hasValue(null); + assertThat(onComplete).isTrue(); - Assert.assertEquals(2, state.intValue()); + assertThat(state.intValue()).isEqualTo(2); } - //see https://github.com/reactor/reactor-core/issues/1056 @Test public void fusion() { @@ -100,7 +149,155 @@ public void fusion() { } @Test - public void error() { + public void fusedSync() { + AtomicReference onNext = new AtomicReference<>(); + AtomicReference onError = new AtomicReference<>(); + AtomicBoolean onComplete = new AtomicBoolean(); + LongAdder state = new LongAdder(); + + StepVerifier.create(Flux.just("sync") + .doOnEach(s -> { + if (s.isOnNext()) { + onNext.set(s.get()); + state.increment(); + } + else if (s.isOnError()) { + onError.set(s.getThrowable()); + } + else if (s.isOnComplete()) { + onComplete.set(true); + } + })) + .expectFusion(Fuseable.SYNC, Fuseable.SYNC) + .expectNext("sync") + .verifyComplete(); + + assertThat(onNext).hasValue("sync"); + assertThat(onError).hasValue(null); + assertThat(onComplete).isTrue(); + } + + @Test + public void fusedSyncCallbackError() { + AtomicReference onNext = new AtomicReference<>(); + AtomicReference onError = new AtomicReference<>(); + AtomicBoolean onComplete = new AtomicBoolean(); + LongAdder state = new LongAdder(); + + StepVerifier.create(Flux.just("sync") + .doOnEach(s -> { + if (s.isOnNext()) { + onNext.set(s.get()); + state.increment(); + } + else if (s.isOnError()) { + onError.set(s.getThrowable()); + } + else if (s.isOnComplete()) { + throw new IllegalStateException("boom"); + } + })) + .expectFusion(Fuseable.SYNC, Fuseable.SYNC) + .expectNext("sync") + .verifyErrorMessage("boom"); + + assertThat(onNext).hasValue("sync"); + assertThat(onError.get()).isNull(); + assertThat(onComplete).isFalse(); + } + + @Test + public void fusedAsync() { + AtomicReference onNext = new AtomicReference<>(); + AtomicReference onError = new AtomicReference<>(); + AtomicBoolean onComplete = new AtomicBoolean(); + LongAdder state = new LongAdder(); + + StepVerifier.create(Flux.just("foo") + .publishOn(Schedulers.immediate()) + .map(s -> s + "_async") + .doOnEach(s -> { + if (s.isOnNext()) { + onNext.set(s.get()); + state.increment(); + } + else if (s.isOnError()) { + onError.set(s.getThrowable()); + } + else if (s.isOnComplete()) { + onComplete.set(true); + } + })) + .expectFusion(Fuseable.ASYNC, Fuseable.ASYNC) + .expectNext("foo_async") + .verifyComplete(); + + assertThat(onNext).hasValue("foo_async"); + assertThat(onError).hasValue(null); + assertThat(onComplete).isTrue(); + } + + @Test + public void fusedAsyncCallbackTransientError() { + AtomicReference onNext = new AtomicReference<>(); + AtomicReference onError = new AtomicReference<>(); + AtomicBoolean onComplete = new AtomicBoolean(); + LongAdder state = new LongAdder(); + + StepVerifier.create(Flux.just("foo") + .publishOn(Schedulers.immediate()) + .map(s -> s + "_async") + .doOnEach(s -> { + if (s.isOnNext()) { + onNext.set(s.get()); + state.increment(); + } + else if (s.isOnError()) { + onError.set(s.getThrowable()); + } + else if (s.isOnComplete()) { + throw new IllegalStateException("boom"); + } + })) + .expectFusion(Fuseable.ASYNC, Fuseable.ASYNC) + .expectNext("foo_async") + .verifyErrorMessage("boom"); + + assertThat(onNext).hasValue("foo_async"); + assertThat(onError.get()).isNotNull().hasMessage("boom"); + assertThat(onComplete).isFalse(); + } + + @Test + public void fusedAsyncCallbackErrorsOnTerminal() { + AtomicReference onNext = new AtomicReference<>(); + AtomicReference onError = new AtomicReference<>(); + AtomicBoolean onComplete = new AtomicBoolean(); + LongAdder state = new LongAdder(); + + StepVerifier.create(Flux.just("foo") + .publishOn(Schedulers.immediate()) + .map(s -> s + "_async") + .doOnEach(s -> { + if (s.isOnNext()) { + onNext.set(s.get()); + } + else { + throw new IllegalStateException("boom"); + } + })) + .expectFusion(Fuseable.ASYNC, Fuseable.ASYNC) + .expectNext("foo_async") + .verifyErrorMessage("boom"); + + assertThat(onNext).hasValue("foo_async"); + assertThat(onError.get()).isNull(); + assertThat(onComplete).isFalse(); + } + + @Test + @Parameters(method = "sourcesError") + public void error(Flux source) { AssertSubscriber ts = AssertSubscriber.create(); AtomicReference onNext = new AtomicReference<>(); @@ -108,8 +305,7 @@ public void error() { AtomicBoolean onComplete = new AtomicBoolean(); LongAdder state = new LongAdder(); - Flux.error(new RuntimeException("forced " + "failure")) - .cast(Integer.class) + source .doOnEach(s -> { if (s.isOnNext()) { onNext.set(s.get()); @@ -122,15 +318,19 @@ else if (s.isOnComplete()) { onComplete.set(true); } }) + .filter(t -> true) .subscribe(ts); - Assert.assertNull(onNext.get()); - Assert.assertTrue(onError.get() instanceof RuntimeException); - Assert.assertFalse(onComplete.get()); + assertThat(onNext).hasValue(null); + assertThat(onError.get()).isInstanceOf(IllegalStateException.class) + .hasMessage(sourceErrorMessage); + assertThat(onComplete).isFalse(); + assertThat(state.intValue()).isZero(); } @Test - public void empty() { + @Parameters(method = "sourcesEmpty") + public void empty(Flux source) { AssertSubscriber ts = AssertSubscriber.create(); AtomicReference onNext = new AtomicReference<>(); @@ -138,8 +338,7 @@ public void empty() { AtomicBoolean onComplete = new AtomicBoolean(); LongAdder state = new LongAdder(); - Flux.empty() - .cast(Integer.class) + source .doOnEach(s -> { if (s.isOnNext()) { onNext.set(s.get()); @@ -152,15 +351,18 @@ else if (s.isOnComplete()) { onComplete.set(true); } }) + .filter(t -> true) .subscribe(ts); - Assert.assertNull(onNext.get()); - Assert.assertNull(onError.get()); - Assert.assertTrue(onComplete.get()); + assertThat(onNext).hasValue(null); + assertThat(onError).hasValue(null); + assertThat(onComplete).isTrue(); + assertThat(state.intValue()).isEqualTo(0); } @Test - public void never() { + @Parameters(method = "sourcesNever") + public void never(Flux source) { AssertSubscriber ts = AssertSubscriber.create(); AtomicReference onNext = new AtomicReference<>(); @@ -168,8 +370,7 @@ public void never() { AtomicBoolean onComplete = new AtomicBoolean(); LongAdder state = new LongAdder(); - Flux.never() - .cast(Integer.class) + source .doOnEach(s -> { if (s.isOnNext()) { onNext.set(s.get()); @@ -182,27 +383,31 @@ else if (s.isOnComplete()) { onComplete.set(true); } }) + .filter(t -> true) .subscribe(ts); - Assert.assertNull(onNext.get()); - Assert.assertNull(onError.get()); - Assert.assertFalse(onComplete.get()); + assertThat(onNext).hasValue(null); + assertThat(onError).hasValue(null); + assertThat(onComplete).isFalse(); + assertThat(state.intValue()).isEqualTo(0); } @Test - public void nextCallbackError() { + @Parameters(method = "sources12Complete") + public void nextCallbackError(Flux source) { AssertSubscriber ts = AssertSubscriber.create(); LongAdder state = new LongAdder(); Throwable err = new Exception("test"); - Flux.just(1) + source .doOnEach(s -> { if (s.isOnNext()) { state.increment(); throw Exceptions.propagate(err); } }) + .filter(t -> true) .subscribe(ts); //nominal error path (DownstreamException) @@ -211,20 +416,22 @@ public void nextCallbackError() { } @Test - public void nextCallbackBubbleError() { + @Parameters(method = "sources12Complete") + public void nextCallbackBubbleError(Flux source) { AssertSubscriber ts = AssertSubscriber.create(); LongAdder state = new LongAdder(); Throwable err = new Exception("test"); try { - Flux.just(1) + source .doOnEach(s -> { if (s.isOnNext()) { state.increment(); throw Exceptions.bubble(err); } }) + .filter(t -> true) .subscribe(ts); fail(); @@ -236,19 +443,21 @@ public void nextCallbackBubbleError() { } @Test - public void completeCallbackError() { + @Parameters(method = "sources12Complete") + public void completeCallbackError(Flux source) { AssertSubscriber ts = AssertSubscriber.create(); LongAdder state = new LongAdder(); Throwable err = new Exception("test"); - Flux.just(1) + source .doOnEach(s -> { if (s.isOnComplete()) { state.increment(); throw Exceptions.propagate(err); } }) + .filter(t -> true) .subscribe(ts); ts.assertErrorMessage("test"); @@ -256,29 +465,91 @@ public void completeCallbackError() { } @Test - public void errorCallbackError() { - AssertSubscriber ts = AssertSubscriber.create(); + @Parameters(method = "sourcesError") + public void errorCallbackError(Flux source) { + AssertSubscriber ts = AssertSubscriber.create(); LongAdder state = new LongAdder(); IllegalStateException err = new IllegalStateException("test"); - Flux.error(new IllegalStateException("bar")) - .cast(String.class) + source .doOnEach(s -> { if (s.isOnError()) { state.increment(); throw Exceptions.propagate(err); } }) + .filter(t -> true) .subscribe(ts); ts.assertNoValues(); ts.assertError(IllegalStateException.class); - ts.assertErrorWith(e -> e.getSuppressed()[0].getMessage().equals("bar")); + ts.assertErrorWith(e -> e.getSuppressed()[0].getMessage().equals(sourceErrorMessage)); + ts.assertErrorWith(e -> e.getMessage().equals("test")); Assert.assertEquals(1, state.intValue()); } + @Test + public void conditionalTryOnNext() { + ArrayList> signals = new ArrayList<>(); + ConditionalSubscriber actual = new FluxPeekFuseableTest.ConditionalAssertSubscriber() { + @Override + public boolean tryOnNext(Boolean v) { + super.tryOnNext(v); + return v; + } + }; + DoOnEachConditionalSubscriber test = new DoOnEachConditionalSubscriber<>(actual, signals::add); + AssertQueueSubscription qs = new AssertQueueSubscription<>(); + + test.onSubscribe(qs); + + assertThat(test.tryOnNext(true)).isTrue(); + assertThat(test.tryOnNext(false)).isFalse(); + test.onComplete(); + + assertThat(signals).hasSize(2); + assertThat(signals.get(0)).matches(Signal::isOnNext) + .matches(s -> s.get() == Boolean.TRUE); + assertThat(signals.get(1)).matches(Signal::isOnComplete); + + List actualTryNext = ((FluxPeekFuseableTest.ConditionalAssertSubscriber) actual).next; + assertThat(actualTryNext).hasSize(2); + assertThat(actualTryNext.get(0)).isTrue(); + assertThat(actualTryNext.get(1)).isFalse(); + } + + @Test + public void conditionalFuseableTryOnNext() { + ArrayList> signals = new ArrayList<>(); + ConditionalSubscriber actual = new FluxPeekFuseableTest.ConditionalAssertSubscriber() { + @Override + public boolean tryOnNext(Boolean v) { + super.tryOnNext(v); + return v; + } + }; + DoOnEachFuseableConditionalSubscriber test = new DoOnEachFuseableConditionalSubscriber<>(actual, signals::add); + AssertQueueSubscription qs = new AssertQueueSubscription<>(); + + test.onSubscribe(qs); + + assertThat(test.tryOnNext(true)).isTrue(); + assertThat(test.tryOnNext(false)).isFalse(); + test.onComplete(); + + assertThat(signals).hasSize(2); + assertThat(signals.get(0)).matches(Signal::isOnNext) + .matches(s -> s.get() == Boolean.TRUE); + assertThat(signals.get(1)).matches(Signal::isOnComplete); + + List actualTryNext = ((FluxPeekFuseableTest.ConditionalAssertSubscriber) actual).next; + assertThat(actualTryNext).hasSize(2); + assertThat(actualTryNext.get(0)).isTrue(); + assertThat(actualTryNext.get(1)).isFalse(); + } + @Test public void nextCompleteAndErrorHaveContext() { Context context = Context.of("foo", "bar"); diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoDoOnEachTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoDoOnEachTest.java index 38f167ccd1..07718fac94 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoDoOnEachTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoDoOnEachTest.java @@ -29,6 +29,7 @@ import org.mockito.Mockito; import reactor.core.CoreSubscriber; import reactor.core.Exceptions; +import reactor.core.Scannable; import reactor.test.StepVerifier; import reactor.test.subscriber.AssertSubscriber; import reactor.util.context.Context; @@ -71,6 +72,28 @@ public void usesFluxDoOnEachSubscriber() { assertThat(argumentCaptor.getValue()).isInstanceOf(FluxDoOnEach.DoOnEachSubscriber.class); } + @Test + public void usesFluxDoOnEachConditionalSubscriber() { + AtomicReference ref = new AtomicReference<>(); + Mono source = Mono.just("foo") + .doOnSubscribe(sub -> ref.set(Scannable.from(sub))) + .hide() + .filter(t -> true); + + final MonoDoOnEach test = + new MonoDoOnEach<>(source, s -> { }); + + test.filter(t -> true) + .subscribe(); + + Class expected = FluxDoOnEach.DoOnEachConditionalSubscriber.class; + assertThat(ref.get() + .actuals() + .map(Object::getClass) + ) + .contains(expected); + } + @Test public void normal() { AssertSubscriber ts = AssertSubscriber.create(); @@ -226,6 +249,7 @@ public void consumerBubbleError() { public void nextComplete() { List> signalsAndContext = new ArrayList<>(); Mono.just(1) + .hide() .doOnEach(s -> signalsAndContext.add(Tuples.of(s, s.getContext()))) .subscriberContext(Context.of("foo", "bar")) .subscribe(); diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index 76c8e74df9..5cee991eab 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -894,6 +894,11 @@ final static class DefaultVerifySubscriber this.errorFormatter = errorFormatter; } + @Override + public String toString() { + return "StepVerifier Subscriber"; + } + static Queue> conflateScript(List> script, @Nullable Logger logger) { ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue<>(script); ConcurrentLinkedQueue> conflated = new ConcurrentLinkedQueue<>();