diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index 9688dc746d..8c99038b15 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; @@ -58,6 +59,7 @@ import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; +import reactor.core.publisher.ParallelFlux; import reactor.core.publisher.Signal; import reactor.test.scheduler.VirtualTimeScheduler; import reactor.util.Logger; @@ -182,15 +184,15 @@ static StepVerifier.FirstStep newVerifier(StepVerifierOptions options, return new DefaultStepVerifierBuilder<>(options, scenarioSupplier); } - final SignalEvent defaultFirstStep; - final List> script; - final MessageFormatter messageFormatter; - final long initialRequest; - final Supplier vtsLookup; - @Nullable - final Supplier> sourceSupplier; - private final StepVerifierOptions options; + final SignalEvent defaultFirstStep; + final List> script; + final MessageFormatter messageFormatter; + final long initialRequest; + final Supplier vtsLookup; + final StepVerifierOptions options; + @Nullable + Supplier> sourceSupplier; long hangCheckRequested; int requestedFusionMode = -1; int expectedFusionMode = -1; @@ -627,6 +629,29 @@ public DefaultStepVerifier thenCancel() { return build(); } + @Override + public DefaultStepVerifier expectTimeout(Duration duration) { + if (sourceSupplier == null) { + throw new IllegalStateException("Attempting to call expectTimeout() without a Supplier"); + } + Supplier> originalSupplier = sourceSupplier; + this.sourceSupplier = () -> Flux.from(originalSupplier.get()).timeout(duration); + + WaitEvent timeout = new WaitEvent<>(duration, "expectTimeout-wait"); + SignalEvent timeoutVerification = new SignalEvent<>((signal, se) -> { + if (signal.isOnError() && signal.getThrowable() instanceof TimeoutException) { + return Optional.empty(); + } + else { + return messageFormatter.failOptional(se, "expected: timeout(%s); actual: %s", + ValueFormatters.DURATION_CONVERTER.apply(duration), signal); + } + }, "expectTimeout"); + this.script.add(timeout); + this.script.add(timeoutVerification); + return build(); + } + @Override public Duration verifyError() { return expectError().verify(); diff --git a/reactor-test/src/main/java/reactor/test/StepVerifier.java b/reactor-test/src/main/java/reactor/test/StepVerifier.java index 2bba0f0ede..0be4af865a 100644 --- a/reactor-test/src/main/java/reactor/test/StepVerifier.java +++ b/reactor-test/src/main/java/reactor/test/StepVerifier.java @@ -450,6 +450,21 @@ interface LastStep { */ StepVerifier expectErrorSatisfies(Consumer assertionConsumer); + /** + * Verify that the {@link Publisher} under test doesn't terminate but + * rather times out after the provided {@link Duration} (a timeout implying + * a cancellation of the source). + *

+ * This is equivalent to appending the {@link reactor.core.publisher.Flux#timeout(Duration) timeout} + * operator to the publisher and expecting a {@link java.util.concurrent.TimeoutException} + * {@link #expectError(Class) onError signal}, while also triggering a {@link Step#thenAwait(Duration) wait} + * to ensure unexpected signals are detected. + * + * @param duration the {@link Duration} for which no new event is expected + * @return the built verification scenario, ready to be verified + */ + StepVerifier expectTimeout(Duration duration); + /** * Expect the completion signal. * @@ -544,6 +559,27 @@ interface LastStep { */ Duration verifyErrorMatches(Predicate predicate); + /** + * Trigger the {@link #verify() verification}, expecting that the {@link Publisher} + * under test doesn't terminate but rather times out after the provided {@link Duration}. + * A timeout implies a cancellation of the source. + *

+ * This is a convenience method that calls {@link #verify()} in addition to {@link #expectTimeout(Duration)}. + * The later is equivalent to appending the {@link reactor.core.publisher.Flux#timeout(Duration) timeout} + * operator to the publisher and expecting a {@link java.util.concurrent.TimeoutException} + * {@link #verifyError(Class) onError signal}, while also triggering a {@link Step#thenAwait(Duration) wait} + * to ensure unexpected signals are detected. + * + * @param duration the {@link Duration} for which no new event is expected + * @return the actual {@link Duration} the verification took. + * + * @see #expectTimeout(Duration) + * @see #verify() + */ + default Duration verifyTimeout(Duration duration) { + return expectTimeout(duration).verify(); + } + /** * Trigger the {@link #verify() verification}, expecting an error as terminal event * which gets asserted via assertion(s) provided as a {@link Consumer}. @@ -808,6 +844,14 @@ default Step assertNext(Consumer assertionConsumer) { * Expect that no event has been observed by the verifier for the length of * the provided {@link Duration}. If virtual time is used, this duration is * verified using the virtual clock. + *

+ * Note that you should only use this method as the first expectation if you + * actually don't expect a subscription to happen. Use + * {@link FirstStep#expectSubscription()} combined with {@link Step#expectNoEvent(Duration)} + * to work around that. + *

+ * Also avoid using this method at the end of the set of expectations: + * prefer {@link #expectTimeout(Duration)} rather than {@code expectNoEvent(...).thenCancel()}. * * @param duration the duration for which to observe no event has been received * @@ -986,12 +1030,17 @@ interface FirstStep extends Step { * actually don't expect a subscription to happen. Use * {@link FirstStep#expectSubscription()} combined with {@link Step#expectNoEvent(Duration)} * to work around that. + *

+ * Also avoid using this method at the end of the set of expectations: + * prefer {@link #expectTimeout(Duration)} rather than {@code expectNoEvent(...).thenCancel()}. * * @param duration the duration for which to observe no event has been received * * @return this builder + * @deprecated should probably always first use {@link #expectSubscription()} or equivalent */ @Override + @Deprecated FirstStep expectNoEvent(Duration duration); /** diff --git a/reactor-test/src/main/java/reactor/test/ValueFormatters.java b/reactor-test/src/main/java/reactor/test/ValueFormatters.java index 67b1686814..2394fa335e 100644 --- a/reactor-test/src/main/java/reactor/test/ValueFormatters.java +++ b/reactor-test/src/main/java/reactor/test/ValueFormatters.java @@ -16,6 +16,7 @@ package reactor.test; +import java.time.Duration; import java.util.Collection; import java.util.Spliterator; import java.util.function.BiFunction; @@ -48,6 +49,15 @@ public final class ValueFormatters { private ValueFormatters() {} + /** + * Default {@link Duration#toString() Duration} {@link ToStringConverter} that removes the PT prefix and + * switches {@link String#toLowerCase() to lower case}. + */ + public static final ToStringConverter DURATION_CONVERTER = new ClassBasedToStringConverter<>( + Duration.class, + d -> true, + d -> d.toString().replaceFirst("PT", "").toLowerCase()); + /** * A generic {@link Object} to {@link String} conversion {@link Function} which is * also a {@link Predicate}, and which only applies a custom conversion to targets that diff --git a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java index 26cf07eab7..af2660137e 100644 --- a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java +++ b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.LockSupport; +import java.util.function.Function; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -666,12 +667,11 @@ public void verifyDurationTimeout() { } @Test - public void verifyNever() { + public void verifyNeverWithExpectTimeout() { Flux flux = Flux.never(); StepVerifier.create(flux) - .expectSubscription() - .thenCancel() + .expectTimeout(Duration.ofMillis(500)) .verify(); } @@ -1059,25 +1059,51 @@ public void verifyVirtualTimeNoEventIntervalError() { .hasMessageContaining("expectation failed (expected no event: onComplete()"); } + //see https://github.com/reactor/reactor-core/issues/1913 @Test - public void verifyVirtualTimeNoEventNever() { - StepVerifier.withVirtualTime(() -> Mono.never() - .log()) - .expectSubscription() - .expectNoEvent(Duration.ofDays(10000)) - .thenCancel() - .verify(); + public void verifyExpectTimeoutFailsWhenSomeEvent() { + assertThatExceptionOfType(AssertionError.class) + .isThrownBy(() -> StepVerifier.create(Mono.just("foo")) + .expectTimeout(Duration.ofMillis(1300)) + .verify()) + .withMessage("expectation \"expectTimeout\" failed (expected: timeout(1.3s); actual: onNext(foo))"); } + //see https://github.com/reactor/reactor-core/issues/1913 @Test - public void verifyVirtualTimeNoEventNeverError() { + public void verifyVirtualTimeExpectTimeoutFailsWhenSomeEvent() { assertThatExceptionOfType(AssertionError.class) - .isThrownBy(() -> StepVerifier.withVirtualTime(() -> Mono.never() - .log()) - .expectNoEvent(Duration.ofDays(10000)) - .thenCancel() + .isThrownBy(() -> StepVerifier.withVirtualTime(() -> Mono.just("foo")) + .expectTimeout(Duration.ofDays(3)) .verify()) - .withMessageStartingWith("expectation failed (expected no event: onSubscribe("); + .withMessage("expectation \"expectTimeout\" failed (expected: timeout(72h); actual: onNext(foo))"); + } + + @Test + public void verifyExpectTimeoutNever() { + StepVerifier.create(Mono.never()) + .expectSubscription() + .expectTimeout(Duration.ofSeconds(1)) + .verify(); + } + + @Test + public void verifyVirtualTimeExpectTimeoutNever() { + StepVerifier.withVirtualTime(Mono::never) + .expectSubscription() + .expectTimeout(Duration.ofDays(10000)) + .verify(); + } + + @Test + public void verifyExpectTimeoutDoesntCareAboutSubscription() { + StepVerifier.withVirtualTime(Mono::never) + .expectTimeout(Duration.ofDays(10000)) + .verify(); + + StepVerifier.create(Mono.never()) + .expectTimeout(Duration.ofSeconds(1)) + .verify(); } @Test @@ -1198,8 +1224,7 @@ public void verifyCreatedForAllSchedulerUsesVirtualTime() { public void noSignalRealTime() { Duration verifyDuration = StepVerifier.create(Mono.never()) .expectSubscription() - .expectNoEvent(Duration.ofSeconds(1)) - .thenCancel() + .expectTimeout(Duration.ofSeconds(1)) .verify(Duration.ofMillis(1100)); assertThat(verifyDuration.toMillis()).isGreaterThanOrEqualTo(1000L); @@ -1209,8 +1234,7 @@ public void noSignalRealTime() { public void noSignalVirtualTime() { StepVerifier.withVirtualTime(Mono::never, 1) .expectSubscription() - .expectNoEvent(Duration.ofSeconds(100)) - .thenCancel() + .expectTimeout(Duration.ofSeconds(100)) .verify(); } @@ -1225,11 +1249,40 @@ public void longDelayAndNoTermination() { .expectNext("foo") .expectNoEvent(Duration.ofSeconds(5)) .expectNextCount(1) - .expectNoEvent(Duration.ofMillis(10)) - .thenCancel() + .expectTimeout(Duration.ofHours(10)) .verify(); } + //source: https://stackoverflow.com/questions/58486417/how-to-verify-with-stepverifier-that-provided-mono-did-not-completed + @Test + public void expectTimeoutSmokeTest() { + Mono neverMono = Mono.never(); + Mono completingMono = Mono.empty(); + + StepVerifier.create(neverMono, StepVerifierOptions.create().scenarioName("neverMono should pass")) + .expectTimeout(Duration.ofSeconds(1)) + .verify(); + + StepVerifier shouldFail = StepVerifier.create(completingMono).expectTimeout(Duration.ofSeconds(1)); + + assertThatExceptionOfType(AssertionError.class) + .isThrownBy(shouldFail::verify) + .withMessage("expectation \"expectTimeout\" failed (expected: timeout(1s); actual: onComplete())"); + } + + @Test + public void verifyTimeoutSmokeTest() { + Mono neverMono = Mono.never(); + Mono completingMono = Mono.empty(); + + StepVerifier.create(neverMono, StepVerifierOptions.create().scenarioName("neverMono should pass")) + .verifyTimeout(Duration.ofSeconds(1)); + + assertThatExceptionOfType(AssertionError.class) + .isThrownBy(() -> StepVerifier.create(completingMono).verifyTimeout(Duration.ofSeconds(1))) + .withMessage("expectation \"expectTimeout\" failed (expected: timeout(1s); actual: onComplete())"); + } + @Test public void thenAwaitThenCancelWaitsForDuration() { Duration verifyDuration = StepVerifier.create(Flux.just("foo", "bar")