Skip to content

Commit

Permalink
Reduce nanoTime usage; extend tck tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DougLea committed Jan 23, 2025
1 parent 1f0a5cf commit 97a2920
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 166 deletions.
88 changes: 42 additions & 46 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3449,14 +3449,17 @@ final void remove(DelayedTask<?> task) {
}

public final void run() {
try {
ThreadLocalRandom.localInit();
heap = new DelayedTask<?>[INITIAL_HEAP_CAPACITY];
runScheduler(pool);
cancelAll();
} finally {
schedulerState = STOPPED;
pool.tryTerminate(false, false);
ForkJoinPool p;
if ((p = pool) != null) {
try {
ThreadLocalRandom.localInit();
heap = new DelayedTask<?>[INITIAL_HEAP_CAPACITY];
runScheduler(p);
cancelAll();
} finally {
schedulerState = STOPPED;
p.tryTerminate(false, false);
}
}
}

Expand All @@ -3470,8 +3473,7 @@ private void runScheduler(ForkJoinPool p) {
removedPeriodic = true;
removePeriodicTasks();
}
if (waitTime == 0L &&
(schedulerState & (ACTIVE | WORKING)) == 0 &&
if ((schedulerState & (ACTIVE | WORKING)) == 0 &&
(p.tryTerminate(false, false) & STOP) != 0L)
break;
}
Expand All @@ -3494,8 +3496,6 @@ else if (working || (state & WORKING) == 0 ||
}
else {
idle = false;
if (waitTime != 0L) // 1 usec minumum timed wait
waitTime = Math.max(waitTime, 1000L);
if (!Thread.interrupted() &&
(schedulerState & ACTIVE) == 0)
U.park(false, waitTime);
Expand Down Expand Up @@ -3565,51 +3565,48 @@ private int processPending(ForkJoinPool p, DelayedTask<?>[] h, int hs) {
int s = hs;
if (p != null && h != null) {
for (;;) {
DelayedTask<?> t = null, next;
if (removals != null)
t = (DelayedTask<?>)
U.getAndSetReference(this, REMOVALS, null);
else if (additions != null)
t = (DelayedTask<?>)
U.getAndSetReference(this, ADDITIONS, null);
DelayedTask<?> t =
(removals != null) ?
(DelayedTask<?>)U.getAndSetReference(this, REMOVALS, null):
(additions != null) ?
(DelayedTask<?>)U.getAndSetReference(this, ADDITIONS, null):
null;
if (t == null)
break;
long now = now();
do {
int idx, cap = h.length, newCap;
for (;;) {
DelayedTask<?> next; int idx, cap, newCap;
if ((next = t.nextPending) != null)
t.nextPending = null;
if ((idx = t.heapIndex) < 0) {
if (t.nextDelay != 0L && p.isShutdown())
t.trySetCancelled();
else if (t.status < 0)
;
else if (t.when - now <= 0L)
pushReadyTask(t, p);
else {
if (s >= cap && (newCap = cap << 1) > cap) {
else if (t.status >= 0) {
if (s >= (cap = h.length) &&
(newCap = cap << 1) > cap) {
try {
h = heap = Arrays.copyOf(heap, newCap);
heap = h = Arrays.copyOf(heap, newCap);
cap = h.length;
} catch (Error | RuntimeException ex) {
}
}
if (s >= cap) // can't grow
if (s >= cap || cap <= 0) // can't grow
t.trySetCancelled();
else if (s > 0)
s = heapAdd(h, s, t);
else if (cap > 0) {
else {
h[0] = t;
s = 1;
}
}
}
else if (idx < s && s <= cap && h[idx] == t) {
else if (idx < s && idx < h.length && h[idx] == t) {
h[idx] = null;
t.heapIndex = -1;
s = (s > 1) ? heapReplace(h, idx, s) : 0;
}
} while ((t = next) != null);
if ((t = next) == null)
break;
}
}
}
if (s != hs)
Expand All @@ -3620,18 +3617,18 @@ else if (idx < s && s <= cap && h[idx] == t) {
private static int heapAdd(DelayedTask<?>[] h,
int s, DelayedTask<?> t) {
if (h != null && s >= 0 && s < h.length && t != null) {
DelayedTask<?> u, p;
DelayedTask<?> u, par;
while (s > 0 && (u = h[s - 1]) != null && u.status < 0) {
u.heapIndex = -1; // clear trailing cancelled tasks
h[--s] = null;
}
int k = s++, parent, ck;
int k = s++, pk;
long d = t.when;
while (k > 0 && (p = h[parent = (k - 1) >>> 1]) != null &&
(p.status < 0 || d < p.when)) {
p.heapIndex = k;
h[k] = p;
k = parent;
while (k > 0 && (par = h[pk = (k - 1) >>> 1]) != null &&
(d < par.when || par.status < 0)) {
par.heapIndex = k;
h[k] = par;
k = pk;
}
t.heapIndex = k;
h[k] = t;
Expand All @@ -3654,21 +3651,21 @@ private static int heapReplace(DelayedTask<?>[] h, int k, int s) {
}
}
if (t != null) {
int child, right; DelayedTask<?> c, r;
int ck, rk; DelayedTask<?> c, r;
long d = t.when, rd;
while ((child = (k << 1) + 1) < s && (c = h[child]) != null) {
while ((ck = (k << 1) + 1) < s && (c = h[ck]) != null) {
long cd = (c.status < 0) ? Long.MAX_VALUE : c.when;
if ((right = child + 1) < s && (r = h[right]) != null &&
if ((rk = ck + 1) < s && (r = h[rk]) != null &&
r.status >= 0 && (rd = r.when) < cd) {
cd = rd;
c = r;
child = right;
ck = rk;
}
if (d <= cd)
break;
c.heapIndex = k;
h[k] = c;
k = child;
k = ck;
}
t.heapIndex = k;
h[k] = t;
Expand All @@ -3681,7 +3678,6 @@ private void removePeriodicTasks() {
DelayedTask<?>[] h; int s;
if ((s = heapSize) > 0 && (h = heap) != null && h.length >= s) {
DelayedTask<?> t; int stat;
long now = System.nanoTime();
for (int i = 0; i < s && (t = h[s]) != null; ) {
if ((stat = t.status) < 0 || t.nextDelay != 0L) {
h[i] = null;
Expand Down
153 changes: 33 additions & 120 deletions test/jdk/java/util/concurrent/tck/ForkJoinPool20Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,29 @@ public Boolean realCall() {
}
}

/**
* delayed schedule of callable successfully executes after delay
* even if shutdown.
*/
public void testSchedule1b() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
final CountDownLatch done = new CountDownLatch(1);
Callable<Boolean> task = new CheckedCallable<>() {
public Boolean realCall() {
done.countDown();
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
return Boolean.TRUE;
}};
Future<Boolean> f = p.schedule(task, timeoutMillis(), MILLISECONDS);
p.shutdown();
assertSame(Boolean.TRUE, f.get());
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
assertEquals(0L, done.getCount());
}
}

/**
* delayed schedule of runnable successfully executes after delay
*/
Expand Down Expand Up @@ -378,6 +401,7 @@ static class RunnableCounter implements Runnable {
* scheduleAtFixedRate executes series of tasks at given rate.
* Eventually, it must hold that:
* cycles - 1 <= elapsedMillis/delay < cycles
* Additionally, periodic tasks are not run after shutdown.
*/
public void testFixedRateSequence() throws InterruptedException {
final ForkJoinPool p = new ForkJoinPool(4);
Expand All @@ -392,20 +416,21 @@ public void testFixedRateSequence() throws InterruptedException {
p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
final int totalDelayMillis = (cycles - 1) * delay;
await(done, totalDelayMillis + LONG_DELAY_MS);
periodicTask.cancel(true);
final long elapsedMillis = millisElapsedSince(startTime);
assertTrue(elapsedMillis >= totalDelayMillis);
if (elapsedMillis <= cycles * delay)
return;
// else retry with longer delay
periodicTask.cancel(true); // retry with longer delay
}
fail("unexpected execution rate");
}
}
/**
* scheduleWithFixedDelay executes series of tasks with given period.
* Eventually, it must hold that each task starts at least delay and at
* most 2 * delay after the termination of the previous task.
* scheduleWithFixedDelay executes series of tasks with given
* period. Eventually, it must hold that each task starts at
* least delay and at most 2 * delay after the termination of the
* previous task. Additionally, periodic tasks are not run after
* shutdown.
*/
public void testFixedDelaySequence() throws InterruptedException {
final ForkJoinPool p = new ForkJoinPool(1);
Expand Down Expand Up @@ -436,12 +461,11 @@ public void realRun() {
p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS);
final int totalDelayMillis = (cycles - 1) * delay;
await(done, totalDelayMillis + cycles * LONG_DELAY_MS);
periodicTask.cancel(true);
final long elapsedMillis = millisElapsedSince(startTime);
assertTrue(elapsedMillis >= totalDelayMillis);
if (!tryLongerDelay.get())
return;
// else retry with longer delay
periodicTask.cancel(true); // retry with longer delay
}
fail("unexpected execution rate");
}
Expand Down Expand Up @@ -497,9 +521,9 @@ public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException
done.countDown(); // release blocking tasks
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));

// assertTaskSubmissionsAreRejected(p);
}
}

/**
* A fixed delay task with overflowing period should not prevent a
* one-shot task from executing.
Expand All @@ -520,6 +544,7 @@ public void testScheduleWithFixedDelay_overflow() throws Exception {
await(immediateDone);
}
}

/**
* shutdownNow cancels tasks that were not run
*/
Expand All @@ -540,116 +565,4 @@ public void testShutdownNow_delayedTasks() throws InterruptedException {
assertTrue(p.isTerminated());
}


/**
* Periodic tasks are nt run after shutdown and
* delayed tasks keep running after shutdown.
*/
// @SuppressWarnings("FutureReturnValueIgnored")
// public void testShutdown_cancellation() throws Exception {
// final int poolSize = 4;
// final ForkJoinPool p = new ForkJoinPool(poolSize);
// final ThreadLocalRandom rnd = ThreadLocalRandom.current();
// final long delay = 1;
// final int rounds = 2;

// // Strategy: Wedge the pool with one wave of "blocker" tasks,
// // then add a second wave that waits in the queue until unblocked.
// final AtomicInteger ran = new AtomicInteger(0);
// final CountDownLatch poolBlocked = new CountDownLatch(poolSize);
// final CountDownLatch unblock = new CountDownLatch(1);
// final RuntimeException exception = new RuntimeException();

// class Task implements Runnable {
// public void run() {
// try {
// ran.getAndIncrement();
// poolBlocked.countDown();
// await(unblock);
// } catch (Throwable fail) { threadUnexpectedException(fail); }
// }
// }

// class PeriodicTask extends Task {
// PeriodicTask(int rounds) { this.rounds = rounds; }
// int rounds;
// public void run() {
// if (--rounds == 0) super.run();
// // throw exception to surely terminate this periodic task,
// // but in a separate execution and in a detectable way.
// if (rounds == -1) throw exception;
// }
// }

// Runnable task = new Task();

// List<Future<?>> immediates = new ArrayList<>();
// List<Future<?>> delayeds = new ArrayList<>();
// List<Future<?>> periodics = new ArrayList<>();

// immediates.add(p.submit(task));
// delayeds.add(p.schedule(task, delay, MILLISECONDS));
// periodics.add(p.scheduleAtFixedRate(
// new PeriodicTask(rounds), delay, 1, MILLISECONDS));
// periodics.add(p.scheduleWithFixedDelay(
// new PeriodicTask(rounds), delay, 1, MILLISECONDS));

// await(poolBlocked);

// assertEquals(poolSize, ran.get());

// // Add second wave of tasks.
// immediates.add(p.submit(task));
// delayeds.add(p.schedule(task, delay, MILLISECONDS));
// periodics.add(p.scheduleAtFixedRate(
// new PeriodicTask(rounds), delay, 1, MILLISECONDS));
// periodics.add(p.scheduleWithFixedDelay(
// new PeriodicTask(rounds), delay, 1, MILLISECONDS));

// assertEquals(poolSize, ran.get());

// immediates.forEach(
// f -> assertTrue(
// (!(f instanceof ScheduledFuture) ||
// ((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L)));

// Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream)
// .forEach(f -> assertFalse(f.isDone()));

// try { p.shutdown(); } catch (SecurityException ok) { return; }
// assertTrue(p.isShutdown());
// assertFalse(p.isTerminated());

// assertThrows(
// RejectedExecutionException.class,
// () -> p.submit(task),
// () -> p.schedule(task, 1, SECONDS),
// () -> p.scheduleAtFixedRate(
// new PeriodicTask(1), 1, 1, SECONDS),
// () -> p.scheduleWithFixedDelay(
// new PeriodicTask(2), 1, 1, SECONDS));

// immediates.forEach(f -> assertFalse(f.isDone()));

// assertFalse(delayeds.get(0).isDone());
// assertFalse(delayeds.get(1).isDone());
// periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone()));
// periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled()));

// unblock.countDown(); // Release all pool threads

// assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
// assertTrue(p.isTerminated());

// Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream)
// .forEach(f -> assertTrue(f.isDone()));

// for (Future<?> f : immediates) assertNull(f.get());

// assertNull(delayeds.get(0).get());
// assertNull(delayeds.get(1).get());
// periodics.forEach(f -> assertTrue(f.isCancelled()));

// }

}

0 comments on commit 97a2920

Please sign in to comment.