Skip to content

Commit

Permalink
Rename DelayedTask to ScheduledForkJoinTask; misc other improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
DougLea committed Feb 12, 2025
1 parent c839299 commit f1394c4
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 172 deletions.
194 changes: 106 additions & 88 deletions src/java.base/share/classes/java/util/concurrent/DelayScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ final class DelayScheduler extends Thread {

/*
* A DelayScheduler maintains a binary heap based on trigger times
* (field DelayedTask.when) along with a pending queue of tasks
* submitted by other threads. When ready, tasks are relayed to
* the pool (or run directly if in task.isImmediate).
* (field ScheduledForkJoinTask.when) along with a pending queue
* of tasks submitted by other threads. When ready, tasks are
* relayed to the pool (or run directly if in task.isImmediate).
*
* To reduce memory contention, the heap is maintained solely via
* local variables in method loop() (forcing noticeable code
Expand All @@ -69,19 +69,19 @@ final class DelayScheduler extends Thread {
* pending tasks (and/or shutdown actions) to process, otherwise
* parking either indefinitely or until the next task
* deadline. Incoming pending tasks ensure active status,
* unparking if necessary. The scheduler thread sets status to inactive
* when apparently no work, and then rechecks before actually
* parking. The active field takes on a negative value on
* termination, as a sentinel used in pool tryTerminate checks as
* well as to suppress reactivation while terminating.
* unparking if necessary. The scheduler thread sets status to
* inactive when there is apparently no work, and then rechecks
* before actually parking. The active field takes on a negative
* value on termination, as a sentinel used in pool tryTerminate
* checks as well as to suppress reactivation while terminating.
*
* The implementation is designed to accommodate usages in which
* many or even most tasks are cancelled before executing (mainly
* IO-based timeouts). Cancellations are added to the pending
* queue in method DelayedTask.cancel(), to remove them from the
* heap. (This requires some safeguards to deal with tasks
* cancelled while they are still pending.) In addition, the heap
* replace method removes any cancelled tasks seen while
* queue in method ScheduledForkJoinTask.cancel(), to remove them
* from the heap. (This requires some safeguards to deal with
* tasks cancelled while they are still pending.) In addition,
* the heap replace method removes any cancelled tasks seen while
* performing sift-down operations, in which case elements are
* removed even before processing the removal request (which is
* then a no-op).
Expand All @@ -104,7 +104,7 @@ final class DelayScheduler extends Thread {

private static final int INITIAL_HEAP_CAPACITY = 1 << 6;
private ForkJoinPool pool; // read once and detached upon starting
volatile DelayedTask<?> pending; // for submited adds and removes
volatile ScheduledForkJoinTask<?> pending; // for submited adds and removes
volatile int active; // 0: inactive, -1: stopped, +1: running
int restingSize; // written only before parking

Expand All @@ -118,7 +118,7 @@ final class DelayScheduler extends Thread {
ACTIVE = U.objectFieldOffset(klass, "active");
PENDING = U.objectFieldOffset(klass, "pending");
long ns = System.nanoTime(); // ensure negative to avoid overflow
nanoTimeOffset = Long.MIN_VALUE + (ns < 0L ? ns : 0L);
nanoTimeOffset = Long.MIN_VALUE + Math.min(ns, 0L);
}

DelayScheduler(ForkJoinPool p, String name) {
Expand All @@ -135,7 +135,8 @@ static final long now() {
}

/**
* Ensure active, unparking if necessary, unless stopped
* Ensure active, unparking if necessary, unless stopped.
* Returns the status as observed prior to activating
*/
final int ensureActive() {
int state;
Expand All @@ -148,11 +149,11 @@ final int ensureActive() {
* Inserts the task to pending queue, to add, remove, or ignore
* depending on task status when processed.
*/
final void pend(DelayedTask<?> task) {
DelayedTask<?> f = pending;
final void pend(ScheduledForkJoinTask<?> task) {
ScheduledForkJoinTask<?> f = pending;
if (task != null) {
do {} while (
f != (f = (DelayedTask<?>)
f != (f = (ScheduledForkJoinTask<?>)
U.compareAndExchangeReference(
this, PENDING, task.nextPending = f, task)));
ensureActive();
Expand All @@ -178,8 +179,8 @@ final int approximateSize() {
*/
public final void run() {
ForkJoinPool p = pool;
pool = null; // detach
if (p != null) {
pool = null; // detach
if (p != null) { // currently always true
try {
loop(p);
} finally {
Expand All @@ -200,14 +201,15 @@ public final void run() {
*/
private void loop(ForkJoinPool p) {
p.onDelaySchedulerStart();
DelayedTask<?>[] h = new DelayedTask<?>[INITIAL_HEAP_CAPACITY];
for (int n = 0;;) { // n is heap size
DelayedTask<?> t; int runStatus;
ScheduledForkJoinTask<?>[] h = // initial heap array
new ScheduledForkJoinTask<?>[INITIAL_HEAP_CAPACITY];
int cap = h.length, n = 0, prevRunStatus = 0; // n is heap size
for (;;) { // loop until stopped
ScheduledForkJoinTask<?> t; int runStatus;
while (pending != null && // process pending tasks
(t = (DelayedTask<?>)
(t = (ScheduledForkJoinTask<?>)
U.getAndSetReference(this, PENDING, null)) != null) {
DelayedTask<?> next;
int cap = h.length;
ScheduledForkJoinTask<?> next;
do {
next = t.nextPending;
long d = t.when;
Expand All @@ -220,12 +222,13 @@ private void loop(ForkJoinPool p) {
n = replace(h, i, n);
}
else if (stat >= 0) {
DelayedTask<?> parent; int pk; DelayedTask<?>[] nh;
if (n >= cap || n < 0) // couldn't resize
t.trySetCancelled();
else {
int k = n++;
while (k > 0 && // sift up
else { // add and sift up
ScheduledForkJoinTask<?> parent;
ScheduledForkJoinTask<?>[] nh;
int k = n++, pk, nc;
while (k > 0 &&
(parent = h[pk = (k - 1) >>> 1]) != null &&
(parent.when > d)) {
parent.heapIndex = k;
Expand All @@ -234,36 +237,40 @@ else if (stat >= 0) {
}
t.heapIndex = k;
h[k] = t;
if (n >= cap && (nh = growHeap(h, cap)) != null)
cap = (h = nh).length;
if (n >= cap && (nh = growHeap(h, cap)) != null &&
(nc = nh.length) > cap) {
cap = nc; // else keep using old array
h = nh;
}
}
}
} while ((t = next) != null);
}

if ((runStatus = p.delaySchedulerRunStatus()) != 0 &&
(n = tryStop(p, h, n, runStatus)) < 0)
break;
if ((runStatus = p.delaySchedulerRunStatus()) != 0) {
if ((n = tryStop(p, h, n, runStatus, prevRunStatus)) < 0)
break;
prevRunStatus = runStatus;
}

long parkTime = 0L; // zero for untimed park
if (n > 0 && h.length > 0) {
do { // submit ready tasks
DelayedTask<?> f; int stat;
if ((f = h[0]) != null) {
long d = f.when - now();
if ((stat = f.status) >= 0 && d > 0L) {
parkTime = d;
break;
}
f.heapIndex = -1;
if (stat >= 0) {
if (f.isImmediate)
f.doExec();
else
p.executeReadyDelayedTask(f);
}
while (n > 0 && h.length > 0) { // submit ready tasks
ScheduledForkJoinTask<?> f; int stat;
if ((f = h[0]) != null) {
long d = f.when - now();
if ((stat = f.status) >= 0 && d > 0L) {
parkTime = d;
break;
}
f.heapIndex = -1;
if (stat >= 0) {
if (f.isImmediate)
f.doExec();
else
p.executeReadyScheduledTask(f);
}
} while ((n = replace(h, 0, n)) > 0);
}
n = replace(h, 0, n);
}

if (pending == null) {
Expand All @@ -278,12 +285,12 @@ else if (stat >= 0) {
}

/**
* Tries to reallocate the heap array, returning existing
* array on failure.
* Tries to reallocate the heap array; returning null on failure
*/
private DelayedTask<?>[] growHeap(DelayedTask<?>[] h, int cap) {
int newCap = cap << 1;
DelayedTask<?>[] nh = h;
private ScheduledForkJoinTask<?>[] growHeap(ScheduledForkJoinTask<?>[] h,
int cap) {
int newCap = cap << 1;
ScheduledForkJoinTask<?>[] nh = null;
if (h != null && h.length == cap && cap < newCap) {
try {
nh = Arrays.copyOf(h, newCap);
Expand All @@ -298,11 +305,11 @@ private DelayedTask<?>[] growHeap(DelayedTask<?>[] h, int cap) {
* cancelled nodes found while doing so.
* @return current heap size
*/
private static int replace(DelayedTask<?>[] h, int k, int n) {
if (h != null && n <= h.length) {
while (k >= 0 && k < n) {
int alsoReplace = -1; // nonnegative if cancelled task seen
DelayedTask<?> t = null, u;
private static int replace(ScheduledForkJoinTask<?>[] h, int k, int n) {
if (h != null && h.length >= n) {
while (k >= 0 && n > k) {
int alsoReplace = -1; // non-negative if cancelled task seen
ScheduledForkJoinTask<?> t = null, u;
long d = 0L;
while (--n > k) { // find uncancelled replacement
if ((u = h[n]) != null) {
Expand All @@ -316,8 +323,9 @@ private static int replace(DelayedTask<?>[] h, int k, int n) {
}
}
if (t != null) { // sift down
int ck, rk; long cd, rd; DelayedTask<?> c, r;
while ((ck = (k << 1) + 1) < n && (c = h[ck]) != null) {
int ck, rk; long cd, rd; ScheduledForkJoinTask<?> c, r;
while ((ck = (k << 1) + 1) < n && ck >= 0 &&
(c = h[ck]) != null) {
cd = c.when;
if (c.status < 0 && alsoReplace < 0) {
alsoReplace = ck; // at most one per pass
Expand Down Expand Up @@ -351,19 +359,19 @@ else if (rd < cd) { // use right child

/**
* Call only when pool is shutdown. If called when not stopping,
* removes tasks according to policy not already done so, and if
* not empty or pool not terminating, returns. Otherwise, cancels
* all tasks in heap and pending queue.
* removes tasks according to policy if not already done so, and
* if not empty or pool not terminating, returns. Otherwise,
* cancels all tasks in heap and pending queue.
* @return negative if stop, else current heap size.
*/
private int tryStop(ForkJoinPool p, DelayedTask<?>[] h, int n,
int runStatus) {
if (runStatus > 0 && p != null) {
private int tryStop(ForkJoinPool p, ScheduledForkJoinTask<?>[] h, int n,
int runStatus, int prevRunStatus) {
if (runStatus > 0) {
if (n > 0) {
if (runStatus > 1)
n = cancelAll(h, n);
else if (h != null && h.length >= n) {
DelayedTask<?> t; int stat;
else if (runStatus != prevRunStatus && h != null && h.length >= n) {
ScheduledForkJoinTask<?> t; int stat; // remove periodic tasks
for (int i = n - 1; i >= 0; --i) {
if ((t = h[i]) != null &&
((stat = t.status) < 0 || t.nextDelay != 0L)) {
Expand All @@ -375,21 +383,21 @@ else if (h != null && h.length >= n) {
}
}
}
if (n > 0 || !p.tryStopIfEnabled())
if (n > 0 || p == null || !p.tryStopIfEnabled())
return n;
}
if (n > 0)
cancelAll(h, n);
for (DelayedTask<?> a = (DelayedTask<?>)
for (ScheduledForkJoinTask<?> a = (ScheduledForkJoinTask<?>)
U.getAndSetReference(this, PENDING, null);
a != null; a = a.nextPending)
a.trySetCancelled(); // clear pending requests
return -1;
}

private int cancelAll(DelayedTask<?>[] h, int n) {
private int cancelAll(ScheduledForkJoinTask<?>[] h, int n) {
if (h != null && h.length >= n) {
DelayedTask<?> t;
ScheduledForkJoinTask<?> t;
for (int i = 0; i < n; ++i) {
if ((t = h[i]) != null) {
h[i] = null;
Expand All @@ -405,37 +413,43 @@ private int cancelAll(DelayedTask<?>[] h, int n) {
* Task class for DelayScheduler operations
*/
@SuppressWarnings("serial")
static final class DelayedTask<T> extends ForkJoinTask.InterruptibleTask<T>
static final class ScheduledForkJoinTask<T>
extends ForkJoinTask.InterruptibleTask<T>
implements ScheduledFuture<T> {
final Runnable runnable; // only one of runnable or callable nonnull
final Runnable runnable; // only one of runnable or callable nonnull
final Callable<? extends T> callable;
final ForkJoinPool pool;
T result;
DelayedTask<?> nextPending; // for DelayScheduler submissions
final long nextDelay; // 0: once; <0: fixedDelay; >0: fixedRate
ScheduledForkJoinTask<?> nextPending; // for DelayScheduler submissions
long when; // nanoTime-based trigger time
final long nextDelay; // 0: once; <0: fixedDelay; >0: fixedRate
int heapIndex; // if non-negative, index on heap
final boolean isImmediate; // run by scheduler vs submitted when ready

public DelayedTask(Runnable runnable, Callable<T> callable, ForkJoinPool pool,
boolean isImmediate, long nextDelay, long delay) {
public ScheduledForkJoinTask(long when, long nextDelay, boolean isImmediate,
Runnable runnable, Callable<T> callable,
ForkJoinPool pool) {
heapIndex = -1;
this.when = delay; // offset by now() on schedule()
this.when = when;
this.isImmediate = isImmediate;
this.nextDelay = nextDelay;
this.runnable = runnable;
this.callable = callable;
this.pool = pool;
this.isImmediate = isImmediate;
this.nextDelay = nextDelay;
}

public final void schedule() {
pool.scheduleDelayedTask(this, when);
public void schedule() { // relay to pool, to allow independent use
pool.scheduleDelayedTask(this);
}

@Override
public final T getRawResult() { return result; }
@Override
public final void setRawResult(T v) { result = v; }
@Override
final Object adaptee() { return (runnable != null) ? runnable : callable; }

@Override
final T compute() throws Exception {
Callable<? extends T> c; Runnable r;
T res = null;
Expand All @@ -446,6 +460,7 @@ else if ((c = callable) != null)
return res;
}

@Override
final boolean postExec() { // resubmit if periodic
long d; ForkJoinPool p; DelayScheduler ds;
if ((d = nextDelay) != 0L && status >= 0 &&
Expand All @@ -464,6 +479,7 @@ final boolean postExec() { // resubmit if periodic
return true;
}

@Override
public final boolean cancel(boolean mayInterruptIfRunning) {
int s; boolean isCancelled; Thread t;
ForkJoinPool p; DelayScheduler ds;
Expand All @@ -490,7 +506,9 @@ public final long getDelay(TimeUnit unit) {
return unit.convert(when - DelayScheduler.now(), NANOSECONDS);
}
public int compareTo(Delayed other) { // never used internally
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
long diff = (other instanceof ScheduledForkJoinTask<?> t) ?
when - t.when : // avoid nanoTime calls and conversions
getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
}
Expand Down
Loading

0 comments on commit f1394c4

Please sign in to comment.