Skip to content

Commit

Permalink
fix reactor#1913 expectTimeout as an alternative to expectNoEvent.the…
Browse files Browse the repository at this point in the history
…nCancel

The `expectTimeout(d)` method is a preferred alternative to the
`expectNoEvent(d).thenCancel()` combination. It should do a better job
at avoiding false positives, and is more expressive of the expectation
that a Publisher "never ends" (although it is impossible to meaningfully
assert that without waiting for the heat death of the universe).
  • Loading branch information
simonbasle authored Nov 6, 2019
1 parent f94cb33 commit 7e70124
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -58,6 +59,7 @@
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.ParallelFlux;
import reactor.core.publisher.Signal;
import reactor.test.scheduler.VirtualTimeScheduler;
import reactor.util.Logger;
Expand Down Expand Up @@ -182,15 +184,15 @@ static <T> StepVerifier.FirstStep<T> newVerifier(StepVerifierOptions options,
return new DefaultStepVerifierBuilder<>(options, scenarioSupplier);
}

final SignalEvent<T> defaultFirstStep;
final List<Event<T>> script;
final MessageFormatter messageFormatter;
final long initialRequest;
final Supplier<? extends VirtualTimeScheduler> vtsLookup;
@Nullable
final Supplier<? extends Publisher<? extends T>> sourceSupplier;
private final StepVerifierOptions options;
final SignalEvent<T> defaultFirstStep;
final List<Event<T>> script;
final MessageFormatter messageFormatter;
final long initialRequest;
final Supplier<? extends VirtualTimeScheduler> vtsLookup;
final StepVerifierOptions options;

@Nullable
Supplier<? extends Publisher<? extends T>> sourceSupplier;
long hangCheckRequested;
int requestedFusionMode = -1;
int expectedFusionMode = -1;
Expand Down Expand Up @@ -627,6 +629,29 @@ public DefaultStepVerifier<T> thenCancel() {
return build();
}

@Override
public DefaultStepVerifier<T> expectTimeout(Duration duration) {
if (sourceSupplier == null) {
throw new IllegalStateException("Attempting to call expectTimeout() without a Supplier<Publisher>");
}
Supplier<? extends Publisher<? extends T>> originalSupplier = sourceSupplier;
this.sourceSupplier = () -> Flux.from(originalSupplier.get()).timeout(duration);

WaitEvent<T> timeout = new WaitEvent<>(duration, "expectTimeout-wait");
SignalEvent<T> timeoutVerification = new SignalEvent<>((signal, se) -> {
if (signal.isOnError() && signal.getThrowable() instanceof TimeoutException) {
return Optional.empty();
}
else {
return messageFormatter.failOptional(se, "expected: timeout(%s); actual: %s",
ValueFormatters.DURATION_CONVERTER.apply(duration), signal);
}
}, "expectTimeout");
this.script.add(timeout);
this.script.add(timeoutVerification);
return build();
}

@Override
public Duration verifyError() {
return expectError().verify();
Expand Down
49 changes: 49 additions & 0 deletions reactor-test/src/main/java/reactor/test/StepVerifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,21 @@ interface LastStep {
*/
StepVerifier expectErrorSatisfies(Consumer<Throwable> assertionConsumer);

/**
* Verify that the {@link Publisher} under test doesn't terminate but
* rather times out after the provided {@link Duration} (a timeout implying
* a cancellation of the source).
* <p>
* This is equivalent to appending the {@link reactor.core.publisher.Flux#timeout(Duration) timeout}
* operator to the publisher and expecting a {@link java.util.concurrent.TimeoutException}
* {@link #expectError(Class) onError signal}, while also triggering a {@link Step#thenAwait(Duration) wait}
* to ensure unexpected signals are detected.
*
* @param duration the {@link Duration} for which no new event is expected
* @return the built verification scenario, ready to be verified
*/
StepVerifier expectTimeout(Duration duration);

/**
* Expect the completion signal.
*
Expand Down Expand Up @@ -544,6 +559,27 @@ interface LastStep {
*/
Duration verifyErrorMatches(Predicate<Throwable> predicate);

/**
* Trigger the {@link #verify() verification}, expecting that the {@link Publisher}
* under test doesn't terminate but rather times out after the provided {@link Duration}.
* A timeout implies a cancellation of the source.
* <p>
* This is a convenience method that calls {@link #verify()} in addition to {@link #expectTimeout(Duration)}.
* The later is equivalent to appending the {@link reactor.core.publisher.Flux#timeout(Duration) timeout}
* operator to the publisher and expecting a {@link java.util.concurrent.TimeoutException}
* {@link #verifyError(Class) onError signal}, while also triggering a {@link Step#thenAwait(Duration) wait}
* to ensure unexpected signals are detected.
*
* @param duration the {@link Duration} for which no new event is expected
* @return the actual {@link Duration} the verification took.
*
* @see #expectTimeout(Duration)
* @see #verify()
*/
default Duration verifyTimeout(Duration duration) {
return expectTimeout(duration).verify();
}

/**
* Trigger the {@link #verify() verification}, expecting an error as terminal event
* which gets asserted via assertion(s) provided as a {@link Consumer}.
Expand Down Expand Up @@ -808,6 +844,14 @@ default Step<T> assertNext(Consumer<? super T> assertionConsumer) {
* Expect that no event has been observed by the verifier for the length of
* the provided {@link Duration}. If virtual time is used, this duration is
* verified using the virtual clock.
* <p>
* Note that you should only use this method as the first expectation if you
* actually don't expect a subscription to happen. Use
* {@link FirstStep#expectSubscription()} combined with {@link Step#expectNoEvent(Duration)}
* to work around that.
* <p>
* Also avoid using this method at the end of the set of expectations:
* prefer {@link #expectTimeout(Duration)} rather than {@code expectNoEvent(...).thenCancel()}.
*
* @param duration the duration for which to observe no event has been received
*
Expand Down Expand Up @@ -986,12 +1030,17 @@ interface FirstStep<T> extends Step<T> {
* actually don't expect a subscription to happen. Use
* {@link FirstStep#expectSubscription()} combined with {@link Step#expectNoEvent(Duration)}
* to work around that.
* <p>
* Also avoid using this method at the end of the set of expectations:
* prefer {@link #expectTimeout(Duration)} rather than {@code expectNoEvent(...).thenCancel()}.
*
* @param duration the duration for which to observe no event has been received
*
* @return this builder
* @deprecated should probably always first use {@link #expectSubscription()} or equivalent
*/
@Override
@Deprecated
FirstStep<T> expectNoEvent(Duration duration);

/**
Expand Down
10 changes: 10 additions & 0 deletions reactor-test/src/main/java/reactor/test/ValueFormatters.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.test;

import java.time.Duration;
import java.util.Collection;
import java.util.Spliterator;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -48,6 +49,15 @@ public final class ValueFormatters {

private ValueFormatters() {}

/**
* Default {@link Duration#toString() Duration} {@link ToStringConverter} that removes the PT prefix and
* switches {@link String#toLowerCase() to lower case}.
*/
public static final ToStringConverter DURATION_CONVERTER = new ClassBasedToStringConverter<>(
Duration.class,
d -> true,
d -> d.toString().replaceFirst("PT", "").toLowerCase());

/**
* A generic {@link Object} to {@link String} conversion {@link Function} which is
* also a {@link Predicate}, and which only applies a custom conversion to targets that
Expand Down
97 changes: 75 additions & 22 deletions reactor-test/src/test/java/reactor/test/StepVerifierTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
Expand Down Expand Up @@ -666,12 +667,11 @@ public void verifyDurationTimeout() {
}

@Test
public void verifyNever() {
public void verifyNeverWithExpectTimeout() {
Flux<String> flux = Flux.never();

StepVerifier.create(flux)
.expectSubscription()
.thenCancel()
.expectTimeout(Duration.ofMillis(500))
.verify();
}

Expand Down Expand Up @@ -1059,25 +1059,51 @@ public void verifyVirtualTimeNoEventIntervalError() {
.hasMessageContaining("expectation failed (expected no event: onComplete()");
}

//see https://github.com/reactor/reactor-core/issues/1913
@Test
public void verifyVirtualTimeNoEventNever() {
StepVerifier.withVirtualTime(() -> Mono.never()
.log())
.expectSubscription()
.expectNoEvent(Duration.ofDays(10000))
.thenCancel()
.verify();
public void verifyExpectTimeoutFailsWhenSomeEvent() {
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.create(Mono.just("foo"))
.expectTimeout(Duration.ofMillis(1300))
.verify())
.withMessage("expectation \"expectTimeout\" failed (expected: timeout(1.3s); actual: onNext(foo))");
}

//see https://github.com/reactor/reactor-core/issues/1913
@Test
public void verifyVirtualTimeNoEventNeverError() {
public void verifyVirtualTimeExpectTimeoutFailsWhenSomeEvent() {
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.withVirtualTime(() -> Mono.never()
.log())
.expectNoEvent(Duration.ofDays(10000))
.thenCancel()
.isThrownBy(() -> StepVerifier.withVirtualTime(() -> Mono.just("foo"))
.expectTimeout(Duration.ofDays(3))
.verify())
.withMessageStartingWith("expectation failed (expected no event: onSubscribe(");
.withMessage("expectation \"expectTimeout\" failed (expected: timeout(72h); actual: onNext(foo))");
}

@Test
public void verifyExpectTimeoutNever() {
StepVerifier.create(Mono.never())
.expectSubscription()
.expectTimeout(Duration.ofSeconds(1))
.verify();
}

@Test
public void verifyVirtualTimeExpectTimeoutNever() {
StepVerifier.withVirtualTime(Mono::never)
.expectSubscription()
.expectTimeout(Duration.ofDays(10000))
.verify();
}

@Test
public void verifyExpectTimeoutDoesntCareAboutSubscription() {
StepVerifier.withVirtualTime(Mono::never)
.expectTimeout(Duration.ofDays(10000))
.verify();

StepVerifier.create(Mono.never())
.expectTimeout(Duration.ofSeconds(1))
.verify();
}

@Test
Expand Down Expand Up @@ -1198,8 +1224,7 @@ public void verifyCreatedForAllSchedulerUsesVirtualTime() {
public void noSignalRealTime() {
Duration verifyDuration = StepVerifier.create(Mono.never())
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.thenCancel()
.expectTimeout(Duration.ofSeconds(1))
.verify(Duration.ofMillis(1100));

assertThat(verifyDuration.toMillis()).isGreaterThanOrEqualTo(1000L);
Expand All @@ -1209,8 +1234,7 @@ public void noSignalRealTime() {
public void noSignalVirtualTime() {
StepVerifier.withVirtualTime(Mono::never, 1)
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(100))
.thenCancel()
.expectTimeout(Duration.ofSeconds(100))
.verify();
}

Expand All @@ -1225,11 +1249,40 @@ public void longDelayAndNoTermination() {
.expectNext("foo")
.expectNoEvent(Duration.ofSeconds(5))
.expectNextCount(1)
.expectNoEvent(Duration.ofMillis(10))
.thenCancel()
.expectTimeout(Duration.ofHours(10))
.verify();
}

//source: https://stackoverflow.com/questions/58486417/how-to-verify-with-stepverifier-that-provided-mono-did-not-completed
@Test
public void expectTimeoutSmokeTest() {
Mono<String> neverMono = Mono.never();
Mono<String> completingMono = Mono.empty();

StepVerifier.create(neverMono, StepVerifierOptions.create().scenarioName("neverMono should pass"))
.expectTimeout(Duration.ofSeconds(1))
.verify();

StepVerifier shouldFail = StepVerifier.create(completingMono).expectTimeout(Duration.ofSeconds(1));

assertThatExceptionOfType(AssertionError.class)
.isThrownBy(shouldFail::verify)
.withMessage("expectation \"expectTimeout\" failed (expected: timeout(1s); actual: onComplete())");
}

@Test
public void verifyTimeoutSmokeTest() {
Mono<String> neverMono = Mono.never();
Mono<String> completingMono = Mono.empty();

StepVerifier.create(neverMono, StepVerifierOptions.create().scenarioName("neverMono should pass"))
.verifyTimeout(Duration.ofSeconds(1));

assertThatExceptionOfType(AssertionError.class)
.isThrownBy(() -> StepVerifier.create(completingMono).verifyTimeout(Duration.ofSeconds(1)))
.withMessage("expectation \"expectTimeout\" failed (expected: timeout(1s); actual: onComplete())");
}

@Test
public void thenAwaitThenCancelWaitsForDuration() {
Duration verifyDuration = StepVerifier.create(Flux.just("foo", "bar")
Expand Down

0 comments on commit 7e70124

Please sign in to comment.