Skip to content

Commit

Permalink
fix reactor#783 Let VirtualTimeScheduler defer advanceTime if queue i…
Browse files Browse the repository at this point in the history
…s empty

This commit changes the way clock advanceTime is performed when the
`VirtualTimeScheduler`'s task queue is empty, deferring the clock move
until a task is scheduled.

This eliminates ordering issues where a scheduling task (like interval)
would happen AFTER the clock was moved (eg. by a `StepVerifier`'s
`thenAwait`), resulting in the interval being scheduled too far away in
the "future" and never happening.

The commit also moves `RaceTestUtils` from core test sourceset to
reactor-test main sourceset and fixes an issue in
`DefaultVerifierSubscriber` where the current expectation is not
refreshed from the script queue after wait tasks have been extracted.

Reviewed-In: reactor#1127
  • Loading branch information
simonbasle authored May 1, 2018
1 parent d9d76ab commit 66051ff
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import reactor.core.Disposables.CompositeDisposable;
import reactor.core.scheduler.Schedulers;
import reactor.test.FakeDisposable;
import reactor.test.RaceTestUtils;
import reactor.test.util.RaceTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.junit.Test;
import reactor.core.scheduler.Schedulers;
import reactor.test.FakeDisposable;
import reactor.test.RaceTestUtils;
import reactor.test.util.RaceTestUtils;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.junit.Before;
import org.junit.Test;
import reactor.test.RaceTestUtils;
import reactor.test.util.RaceTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import reactor.core.publisher.FluxExpand.ExpandDepthSubscriber;
import reactor.core.publisher.FluxExpand.ExpandDepthSubscription;
import reactor.core.scheduler.Schedulers;
import reactor.test.RaceTestUtils;
import reactor.test.StepVerifier;
import reactor.test.StepVerifierOptions;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.RaceTestUtils;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.util.RaceTestUtils;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.junit.Test;
import reactor.core.Disposable;
import reactor.core.scheduler.Schedulers;
import reactor.test.RaceTestUtils;
import reactor.test.StepVerifier;
import reactor.test.publisher.MonoOperatorTest;
import reactor.test.publisher.TestPublisher;
import reactor.test.util.RaceTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.scheduler.Schedulers;
import reactor.test.RaceTestUtils;
import reactor.test.StepVerifier;
import reactor.test.StepVerifierOptions;
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.util.RaceTestUtils;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.scheduler.Schedulers;
import reactor.test.RaceTestUtils;
import reactor.test.util.RaceTestUtils;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import reactor.core.publisher.Operators.MonoSubscriber;
import reactor.core.publisher.Operators.MultiSubscriptionSubscriber;
import reactor.core.publisher.Operators.ScalarSubscription;
import reactor.test.RaceTestUtils;
import reactor.test.util.RaceTestUtils;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.junit.Test;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Hooks;
import reactor.test.RaceTestUtils;
import reactor.test.util.RaceTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,9 @@ final void onExpectation(Signal<T> actualSignal) {
setFailure(null, actualSignal, "did not expect: %s", actualSignal);
return;
}
onTaskEvent();
if (onTaskEvent()) {
event = this.script.peek();
}

if (event instanceof DefaultStepVerifierBuilder.SignalConsumeWhileEvent) {
if (consumeWhile(actualSignal, (SignalConsumeWhileEvent<T>) event)) {
Expand Down Expand Up @@ -1322,9 +1324,6 @@ else if (event instanceof SignalEvent) {
}

event = this.script.peek();
if (event == null || !(event instanceof EagerEvent)) {
return;
}

for (; ; ) {
if (event == null || !(event instanceof EagerEvent)) {
Expand Down Expand Up @@ -1469,21 +1468,23 @@ final boolean onSignalCount(Signal<T> actualSignal, SignalCountEvent<T> event) {
return false;
}

void onTaskEvent() {
boolean onTaskEvent() {
Event<T> event;
boolean foundTaskEvents = false;
for (; ; ) {
if (isCancelled()) {
return;
return foundTaskEvents;
}
event = this.script.peek();
if (!(event instanceof TaskEvent)) {
break;
return foundTaskEvents;
}
event = this.script.poll();
if (!(event instanceof TaskEvent)) {
return;
return foundTaskEvents;
}
taskEvents.add((TaskEvent<T>) event);
foundTaskEvents = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -184,6 +186,13 @@ public static void reset() {

volatile long nanoTime;

volatile long deferredNanoTime;
static final AtomicLongFieldUpdater<VirtualTimeScheduler> DEFERRED_NANO_TIME = AtomicLongFieldUpdater.newUpdater(VirtualTimeScheduler.class, "deferredNanoTime");

volatile int advanceTimeWip;
static final AtomicIntegerFieldUpdater<VirtualTimeScheduler> ADVANCE_TIME_WIP =
AtomicIntegerFieldUpdater.newUpdater(VirtualTimeScheduler.class, "advanceTimeWip");

volatile boolean shutdown;

final VirtualTimeWorker directWorker;
Expand All @@ -206,7 +215,7 @@ public void advanceTime() {
* @param delayTime the amount of time to move the {@link VirtualTimeScheduler}'s clock forward
*/
public void advanceTimeBy(Duration delayTime) {
advanceTime(nanoTime + delayTime.toNanos());
advanceTime(delayTime.toNanos());
}

/**
Expand All @@ -218,7 +227,7 @@ public void advanceTimeBy(Duration delayTime) {
public void advanceTimeTo(Instant instant) {
long targetTime = TimeUnit.NANOSECONDS.convert(instant.toEpochMilli(),
TimeUnit.MILLISECONDS);
advanceTime(targetTime);
advanceTime(targetTime - nanoTime);
}

@Override
Expand All @@ -231,7 +240,7 @@ public VirtualTimeWorker createWorker() {

@Override
public long now(TimeUnit unit) {
return unit.convert(nanoTime, TimeUnit.NANOSECONDS);
return unit.convert(nanoTime + deferredNanoTime, TimeUnit.NANOSECONDS);
}

@Override
Expand Down Expand Up @@ -285,22 +294,44 @@ public Disposable schedulePeriodically(Runnable task,
return periodicTask;
}

final void advanceTime(long targetTimeInNanoseconds) {
while (!queue.isEmpty()) {
TimedRunnable current = queue.peek();
if (current.time > targetTimeInNanoseconds) {
break;
final void advanceTime(long timeShiftInNanoseconds) {
Operators.addCap(DEFERRED_NANO_TIME, this, timeShiftInNanoseconds);
drain();
}

final void drain() {
int remainingWork = ADVANCE_TIME_WIP.incrementAndGet(this);
if (remainingWork != 1) {
return;
}
for(;;) {
if (!queue.isEmpty()) {
//resetting for the first time a delayed schedule is called after a deferredNanoTime is set
long targetNanoTime = nanoTime + DEFERRED_NANO_TIME.getAndSet(this, 0);

while (!queue.isEmpty()) {
TimedRunnable current = queue.peek();
if (current == null || current.time > targetNanoTime) {
break;
}
//for the benefit of tasks that call `now()`
// if scheduled time is 0 (immediate) use current virtual time
nanoTime = current.time == 0 ? nanoTime : current.time;
queue.remove();

// Only execute if not unsubscribed
if (!current.scheduler.shutdown) {
current.run.run();
}
}
nanoTime = targetNanoTime;
}
// if scheduled time is 0 (immediate) use current virtual time
nanoTime = current.time == 0 ? nanoTime : current.time;
queue.remove();

// Only execute if not unsubscribed
if (!current.scheduler.shutdown) {
current.run.run();
remainingWork = ADVANCE_TIME_WIP.addAndGet(this, -remainingWork);
if (remainingWork == 0) {
break;
}
}
nanoTime = targetTimeInNanoseconds;
}

static final class TimedRunnable implements Comparable<TimedRunnable> {
Expand Down Expand Up @@ -358,6 +389,8 @@ final class VirtualTimeWorker implements Worker {

volatile boolean shutdown;

VirtualTimeWorker() { }

@Override
public Disposable schedule(Runnable run) {
if (shutdown) {
Expand All @@ -368,7 +401,11 @@ public Disposable schedule(Runnable run) {
run,
COUNTER.getAndIncrement(VirtualTimeScheduler.this));
queue.add(timedTask);
return () -> queue.remove(timedTask);
drain();
return () -> {
queue.remove(timedTask);
drain();
};
}

@Override
Expand All @@ -381,7 +418,11 @@ public Disposable schedule(Runnable run, long delayTime, TimeUnit unit) {
run,
COUNTER.getAndIncrement(VirtualTimeScheduler.this));
queue.add(timedTask);
return () -> queue.remove(timedTask);
drain();
return () -> {
queue.remove(timedTask);
drain();
};
}

@Override
Expand All @@ -390,7 +431,7 @@ public Disposable schedulePeriodically(Runnable task,
long period,
TimeUnit unit) {
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstNowNanoseconds = nanoTime;
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

PeriodicTask periodicTask = new PeriodicTask(firstStartInNanoseconds, task,
Expand Down Expand Up @@ -439,7 +480,7 @@ public void run() {

long nextTick;

long nowNanoseconds = now(TimeUnit.NANOSECONDS);
long nowNanoseconds = nanoTime;
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds || nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
nextTick = nowNanoseconds + periodInNanoseconds;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package reactor.test;
package reactor.test.util;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -35,6 +35,18 @@
*/
public class RaceTestUtils {

/**
* Synchronizes the execution of two concurrent state modifications as much as
* possible to test race conditions. The method blocks until the given {@link Predicate}
* matches. It performs a {@link BiPredicate} test at the end to validate the end
* result.
*
* @param initial the initial state
* @param race the state-modification {@link Function}
* @param stopRace the stop condition for the race loop, as a {@link Predicate}
* @param terminate the validation check, as a {@link BiPredicate}
* @return the result of the {@code terminate} check
*/
public static <T> boolean race(T initial, Function<? super T, ? extends T> race,
Predicate<? super T> stopRace,
BiPredicate<? super T, ? super T> terminate) {
Expand Down Expand Up @@ -97,6 +109,13 @@ public static void race(final Runnable r1, final Runnable r2) {
race(r1, r2, Schedulers.single());
}

/**
* Synchronizes the execution of two {@link Runnable} as much as possible
* to test race conditions. The method blocks until both have run to completion.
* @param r1 the first runnable
* @param r2 the second runnable
* @param s the {@link Scheduler} on which to execute the runnables
*/
public static void race(final Runnable r1, final Runnable r2, Scheduler s) {
final AtomicInteger count = new AtomicInteger(2);
final CountDownLatch cdl = new CountDownLatch(2);
Expand Down
Loading

0 comments on commit 66051ff

Please sign in to comment.