From ce40bd0ede042bae112f695df0c4b07561663048 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 5 Dec 2019 18:21:56 +0200 Subject: [PATCH] StepVerifier fix For Fusion and request after onNext (#1982) Bug explanation: if we have ASYNC fusion and onNext(null) appears when there is nothing yet requested (for instance, StepVerifier.create(flux, 0)) then when request appears over `thenRequest(n)` the drain loop will NOT be invoked so the actual enqueued element will not be delivered and we will get a hanging test. Signed-off-by: Oleh Dokuka g --- .../test/DefaultStepVerifierBuilder.java | 6 +++- .../java/reactor/test/StepVerifierTests.java | 31 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index 80419a80a1..6e545230eb 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -1570,8 +1570,12 @@ void waitTaskEvent() { try { if (event instanceof SubscriptionTaskEvent) { updateRequested(event); + ((TaskEvent) event).run(this); + serializeDrainAndSubscriptionEvent(); + } + else { + ((TaskEvent) event).run(this); } - ((TaskEvent) event).run(this); } catch (Throwable t) { Exceptions.throwIfFatal(t); diff --git a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java index a573c56ffd..65afd8d373 100644 --- a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java +++ b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java @@ -36,6 +36,7 @@ import org.junit.Test; import reactor.core.Fuseable; import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.UnicastProcessor; @@ -2101,4 +2102,34 @@ public void virtualTimeNoEventExpectationButError() { .withMessage("Unexpected error during a no-event expectation: java.lang.IllegalStateException: boom") .withCause(new IllegalStateException("boom")); } + + @Test + public void verifyDrainOnRequestInCaseOfFusion() { + MonoProcessor processor = MonoProcessor.create(); + StepVerifier.create(processor, 0) + .expectFusion(Fuseable.ANY) + .then(() -> processor.onNext(1)) + .thenRequest(1) + .expectNext(1) + .verifyComplete(); + } + + @Test + public void verifyDrainOnRequestInCaseOfFusion2() { + ArrayList requests = new ArrayList<>(); + UnicastProcessor processor = UnicastProcessor.create(); + StepVerifier.create(processor.doOnRequest(requests::add), 0) + .expectFusion(Fuseable.ANY) + .then(() -> { + processor.onNext(1); + processor.onComplete(); + }) + .thenRequest(1) + .thenRequest(1) + .thenRequest(1) + .expectNext(1) + .verifyComplete(); + + assertThat(requests).containsExactly(1L, 1L, 1L); + } } \ No newline at end of file