From 6858d0dbda642994a5f3eab3f79fdc0a6d6f37b4 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Wed, 3 Jan 2018 12:55:20 -0800 Subject: [PATCH] Convert remaining FluxDelaySequence tests to virtual time --- .../core/publisher/FluxDelaySequenceTest.java | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxDelaySequenceTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxDelaySequenceTest.java index 09c7da42f5..39727e74cd 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxDelaySequenceTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxDelaySequenceTest.java @@ -35,15 +35,17 @@ public class FluxDelaySequenceTest { @Test public void delayFirstInterval() { - Flux> test = Flux.interval(Duration.ofMillis(50)) - .delaySequence(Duration.ofMillis(500)) - .take(33) - .elapsed(); + Supplier>> test = () -> Flux.interval(Duration.ofMillis(50)) + .delaySequence(Duration.ofMillis(500)) + .elapsed() + .take(33); - StepVerifier.create(test) + StepVerifier.withVirtualTime(test) + .thenAwait(Duration.ofMillis(500 + 50)) .recordWith(ArrayList::new) - .assertNext(t2 -> assertThat(t2.getT1()).isGreaterThan(500)) - .thenConsumeWhile(t2 -> t2.getT1() < 70) + .assertNext(t2 -> assertThat(t2.getT1()).isEqualTo(550)) + .thenAwait(Duration.ofMillis(33 * 50)) + .thenConsumeWhile(t2 -> t2.getT1() == 50) .consumeRecordedWith(record -> { assertThat(record.stream().mapToLong(Tuple2::getT2)) .startsWith(0L, 1L, 2L) @@ -56,37 +58,42 @@ public void delayFirstInterval() { @Test public void delayFirstAsymmetricDelays() { - Flux asymmetricDelays = Flux.concat( - Mono.delay(Duration.ofMillis(400)).then(Mono.just(0L)), - Mono.delay(Duration.ofMillis(800)).then(Mono.just(1L)), - Mono.delay(Duration.ofMillis(200)).then(Mono.just(2L)), - Mono.delay(Duration.ofMillis(300)).then(Mono.just(3L)) - ); - Flux> test = asymmetricDelays - .delaySequence(Duration.ofMillis(500)) - .take(33) - .elapsed(); - - Offset OFFSET_80MS = Offset.offset(80L); + Supplier>> test = () -> { + Flux asymmetricDelays = Flux.concat( + Mono.delay(Duration.ofMillis(400)).then(Mono.just(0L)), + Mono.delay(Duration.ofMillis(800)).then(Mono.just(1L)), + Mono.delay(Duration.ofMillis(200)).then(Mono.just(2L)), + Mono.delay(Duration.ofMillis(300)).then(Mono.just(3L)) + ); + + return asymmetricDelays + .delaySequence(Duration.ofMillis(500)) + .take(33) + .elapsed(); + }; - StepVerifier.create(test) + StepVerifier.withVirtualTime(test) //first is delayed (from subscription) by additional 500ms + .thenAwait(Duration.ofMillis(500 + 400)) .assertNext(t2 -> { - assertThat(t2.getT1()).isCloseTo(400L + 500L, OFFSET_80MS); + assertThat(t2.getT1()).isEqualTo(400L + 500L); assertThat(t2.getT2()).isEqualTo(0L); }) //rest follow same delays as in source + .thenAwait(Duration.ofMillis(800)) .assertNext(t2 -> { - assertThat(t2.getT1()).isCloseTo(800L, OFFSET_80MS); + assertThat(t2.getT1()).isEqualTo(800L); assertThat(t2.getT2()).isEqualTo(1L); }) + .thenAwait(Duration.ofMillis(200)) .assertNext(t2 -> { - assertThat(t2.getT1()).isCloseTo(200L, OFFSET_80MS); + assertThat(t2.getT1()).isEqualTo(200L); assertThat(t2.getT2()).isEqualTo(2L); }) + .thenAwait(Duration.ofMillis(300)) .assertNext(t2 -> { - assertThat(t2.getT1()).isCloseTo(300L, OFFSET_80MS); + assertThat(t2.getT1()).isEqualTo(300L); assertThat(t2.getT2()).isEqualTo(3L); }) .verifyComplete();