diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index 8c99038b15..9600573624 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -1645,8 +1645,12 @@ void waitTaskEvent() { try { if (event instanceof SubscriptionTaskEvent) { updateRequested(event); + ((TaskEvent) event).run(this); + serializeDrainAndSubscriptionEvent(); + } + else { + ((TaskEvent) event).run(this); } - ((TaskEvent) event).run(this); } catch (Throwable t) { Exceptions.throwIfFatal(t); diff --git a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java index af2660137e..293f1dc9a3 100644 --- a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java +++ b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java @@ -38,6 +38,7 @@ import org.junit.Test; import reactor.core.Fuseable; import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.UnicastProcessor; @@ -2312,4 +2313,34 @@ public void verifyLaterCanVerifyConnectableFlux_withAssertionErrors() { .isThrownBy(() -> deferred2.verify(Duration.ofSeconds(10))) .withMessage("expectation \"expectNext(5)\" failed (expected value: 5; actual value: 3)"); } + + @Test + public void verifyDrainOnRequestInCaseOfFusion() { + MonoProcessor processor = MonoProcessor.create(); + StepVerifier.create(processor, 0) + .expectFusion(Fuseable.ANY) + .then(() -> processor.onNext(1)) + .thenRequest(1) + .expectNext(1) + .verifyComplete(); + } + + @Test + public void verifyDrainOnRequestInCaseOfFusion2() { + ArrayList requests = new ArrayList<>(); + UnicastProcessor processor = UnicastProcessor.create(); + StepVerifier.create(processor.doOnRequest(requests::add), 0) + .expectFusion(Fuseable.ANY) + .then(() -> { + processor.onNext(1); + processor.onComplete(); + }) + .thenRequest(1) + .thenRequest(1) + .thenRequest(1) + .expectNext(1) + .verifyComplete(); + + assertThat(requests).containsExactly(1L, 1L, 1L); + } } \ No newline at end of file