Skip to content

Commit

Permalink
Showing 2 changed files with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -123,8 +123,7 @@ public void every50msThenErrorDelaysError() {

StepVerifier.withVirtualTime(test)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(50))
.expectNoEvent(Duration.ofMillis(1000))
.expectNoEvent(Duration.ofMillis(1050))
.expectNext(0L)
.expectNoEvent(Duration.ofMillis(50))
.expectNext(1L)
Original file line number Diff line number Diff line change
@@ -186,7 +186,10 @@ public static void reset() {

volatile boolean shutdown;

final VirtualTimeWorker directWorker;

protected VirtualTimeScheduler() {
directWorker = createWorker();
}

/**
@@ -236,15 +239,15 @@ public Disposable schedule(Runnable task) {
if (shutdown) {
throw Exceptions.failWithRejected();
}
return createWorker().schedule(task);
return directWorker.schedule(task);
}

@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
if (shutdown) {
throw Exceptions.failWithRejected();
}
return createWorker().schedule(task, delay, unit);
return directWorker.schedule(task, delay, unit);
}

@Override
@@ -274,11 +277,9 @@ public Disposable schedulePeriodically(Runnable task,
throw Exceptions.failWithRejected();
}

final Worker w = createWorker();

PeriodicDirectTask periodicTask = new PeriodicDirectTask(task, w);
PeriodicDirectTask periodicTask = new PeriodicDirectTask(task);

w.schedulePeriodically(periodicTask, initialDelay, period, unit);
directWorker.schedulePeriodically(periodicTask, initialDelay, period, unit);

return periodicTask;
}
@@ -379,7 +380,7 @@ public Disposable schedule(Runnable run, long delayTime, TimeUnit unit) {
run,
COUNTER.getAndIncrement(VirtualTimeScheduler.this));
queue.add(timedTask);

advanceTime();
return () -> queue.remove(timedTask);
}

@@ -397,7 +398,7 @@ public Disposable schedulePeriodically(Runnable task,
periodInNanoseconds);

replace(periodicTask, schedule(periodicTask, initialDelay, unit));

advanceTime();
return periodicTask;
}

@@ -488,13 +489,10 @@ static class PeriodicDirectTask implements Runnable, Disposable {

final Runnable run;

final Scheduler.Worker worker;

volatile boolean disposed;

PeriodicDirectTask(Runnable run, Worker worker) {
PeriodicDirectTask(Runnable run) {
this.run = run;
this.worker = worker;
}

@Override
@@ -505,7 +503,6 @@ public void run() {
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
worker.dispose();
throw Exceptions.propagate(ex);
}
}
@@ -514,7 +511,6 @@ public void run() {
@Override
public void dispose() {
disposed = true;
worker.dispose();
}
}

0 comments on commit 81f7f2f

Please sign in to comment.