Skip to content

Commit

Permalink
Convert remaining FluxDelaySequence tests to virtual time
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Jan 3, 2018
1 parent c5eead2 commit 6858d0d
Showing 1 changed file with 31 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ public class FluxDelaySequenceTest {

@Test
public void delayFirstInterval() {
Flux<Tuple2<Long, Long>> test = Flux.interval(Duration.ofMillis(50))
.delaySequence(Duration.ofMillis(500))
.take(33)
.elapsed();
Supplier<Flux<Tuple2<Long, Long>>> test = () -> Flux.interval(Duration.ofMillis(50))
.delaySequence(Duration.ofMillis(500))
.elapsed()
.take(33);

StepVerifier.create(test)
StepVerifier.withVirtualTime(test)
.thenAwait(Duration.ofMillis(500 + 50))
.recordWith(ArrayList::new)
.assertNext(t2 -> assertThat(t2.getT1()).isGreaterThan(500))
.thenConsumeWhile(t2 -> t2.getT1() < 70)
.assertNext(t2 -> assertThat(t2.getT1()).isEqualTo(550))
.thenAwait(Duration.ofMillis(33 * 50))
.thenConsumeWhile(t2 -> t2.getT1() == 50)
.consumeRecordedWith(record -> {
assertThat(record.stream().mapToLong(Tuple2::getT2))
.startsWith(0L, 1L, 2L)
Expand All @@ -56,37 +58,42 @@ public void delayFirstInterval() {

@Test
public void delayFirstAsymmetricDelays() {
Flux<Long> asymmetricDelays = Flux.concat(
Mono.delay(Duration.ofMillis(400)).then(Mono.just(0L)),
Mono.delay(Duration.ofMillis(800)).then(Mono.just(1L)),
Mono.delay(Duration.ofMillis(200)).then(Mono.just(2L)),
Mono.delay(Duration.ofMillis(300)).then(Mono.just(3L))
);

Flux<Tuple2<Long, Long>> test = asymmetricDelays
.delaySequence(Duration.ofMillis(500))
.take(33)
.elapsed();

Offset<Long> OFFSET_80MS = Offset.offset(80L);
Supplier<Flux<Tuple2<Long, Long>>> test = () -> {
Flux<Long> asymmetricDelays = Flux.concat(
Mono.delay(Duration.ofMillis(400)).then(Mono.just(0L)),
Mono.delay(Duration.ofMillis(800)).then(Mono.just(1L)),
Mono.delay(Duration.ofMillis(200)).then(Mono.just(2L)),
Mono.delay(Duration.ofMillis(300)).then(Mono.just(3L))
);

return asymmetricDelays
.delaySequence(Duration.ofMillis(500))
.take(33)
.elapsed();
};

StepVerifier.create(test)
StepVerifier.withVirtualTime(test)
//first is delayed (from subscription) by additional 500ms
.thenAwait(Duration.ofMillis(500 + 400))
.assertNext(t2 -> {
assertThat(t2.getT1()).isCloseTo(400L + 500L, OFFSET_80MS);
assertThat(t2.getT1()).isEqualTo(400L + 500L);
assertThat(t2.getT2()).isEqualTo(0L);
})
//rest follow same delays as in source
.thenAwait(Duration.ofMillis(800))
.assertNext(t2 -> {
assertThat(t2.getT1()).isCloseTo(800L, OFFSET_80MS);
assertThat(t2.getT1()).isEqualTo(800L);
assertThat(t2.getT2()).isEqualTo(1L);
})
.thenAwait(Duration.ofMillis(200))
.assertNext(t2 -> {
assertThat(t2.getT1()).isCloseTo(200L, OFFSET_80MS);
assertThat(t2.getT1()).isEqualTo(200L);
assertThat(t2.getT2()).isEqualTo(2L);
})
.thenAwait(Duration.ofMillis(300))
.assertNext(t2 -> {
assertThat(t2.getT1()).isCloseTo(300L, OFFSET_80MS);
assertThat(t2.getT1()).isEqualTo(300L);
assertThat(t2.getT2()).isEqualTo(3L);
})
.verifyComplete();
Expand Down

0 comments on commit 6858d0d

Please sign in to comment.