Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScheduledRunnable to honor interrupt settings from Schedulers.from usage #7745

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks);
ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks, interruptibleWorker);
tasks.add(sr);

if (executor instanceof ScheduledExecutorService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>

private static final long serialVersionUID = -6120223772001106981L;
final Runnable actual;
final boolean interruptOnCancel;

/** Indicates that the parent tracking this task has been notified about its completion. */
static final Object PARENT_DISPOSED = new Object();
Expand All @@ -41,12 +42,26 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
/**
* Creates a ScheduledRunnable by wrapping the given action and setting
* up the optional parent.
* The underlying future will be interrupted if the task is disposed asynchronously.
* @param actual the runnable to wrap, not-null (not verified)
* @param parent the parent tracking container or null if none
*/
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
this(actual, parent, true);
}

/**
* Creates a ScheduledRunnable by wrapping the given action and setting
* up the optional parent.
* @param actual the runnable to wrap, not-null (not verified)
* @param parent the parent tracking container or null if none
* @param interruptOnCancel if true, the underlying future will be interrupted when disposing
* this task from a different thread than it is running on.
*/
public ScheduledRunnable(Runnable actual, DisposableContainer parent, boolean interruptOnCancel) {
super(3);
this.actual = actual;
this.interruptOnCancel = interruptOnCancel;
this.lazySet(0, parent);
}

Expand Down Expand Up @@ -95,7 +110,7 @@ public void setFuture(Future<?> f) {
return;
}
if (o == ASYNC_DISPOSED) {
f.cancel(true);
f.cancel(interruptOnCancel);
return;
}
if (compareAndSet(FUTURE_INDEX, o, f)) {
Expand All @@ -114,7 +129,7 @@ public void dispose() {
boolean async = get(THREAD_INDEX) != Thread.currentThread();
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
if (o != null) {
((Future<?>)o).cancel(async);
((Future<?>)o).cancel(async && interruptOnCancel);
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,4 +965,149 @@ public void run() {
exec.shutdown();
}
}

public static class TrackInterruptScheduledExecutor extends ScheduledThreadPoolExecutor {

public final AtomicBoolean interruptReceived = new AtomicBoolean();

public TrackInterruptScheduledExecutor() {
super(10);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return new TrackingScheduledFuture<V>(super.schedule(callable, delay, unit));
}

class TrackingScheduledFuture<V> implements ScheduledFuture<V> {

ScheduledFuture<V> original;

TrackingScheduledFuture(ScheduledFuture<V> original) {
this.original = original;
}

@Override
public long getDelay(TimeUnit unit) {
return original.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
return original.compareTo(o);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (mayInterruptIfRunning) {
interruptReceived.set(true);
}
return original.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return original.isCancelled();
}

@Override
public boolean isDone() {
return original.isDone();
}

@Override
public V get() throws InterruptedException, ExecutionException {
return original.get();
}

@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return get(timeout, unit);
}
}
}

@Test
public void noInterruptBeforeRunningDelayedWorker() throws Throwable {
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();

try {
Scheduler sch = Schedulers.from(exec, false);

Worker worker = sch.createWorker();

Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS);

d.dispose();

int i = 150;

while (i-- > 0) {
assertFalse("Task interrupt detected", exec.interruptReceived.get());
Thread.sleep(10);
}

} finally {
exec.shutdownNow();
}
}

@Test
public void hasInterruptBeforeRunningDelayedWorker() throws Throwable {
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();

try {
Scheduler sch = Schedulers.from(exec, true);

Worker worker = sch.createWorker();

Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS);

d.dispose();

Thread.sleep(100);
assertTrue("Task interrupt detected", exec.interruptReceived.get());

} finally {
exec.shutdownNow();
}
}

@Test
public void noInterruptAfterRunningDelayedWorker() throws Throwable {
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();

try {
Scheduler sch = Schedulers.from(exec, false);

Worker worker = sch.createWorker();
AtomicBoolean taskRun = new AtomicBoolean();

Disposable d = worker.schedule(() -> {
taskRun.set(true);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
exec.interruptReceived.set(true);
}
}, 100, TimeUnit.MILLISECONDS);

Thread.sleep(150);
;
d.dispose();

int i = 50;

while (i-- > 0) {
assertFalse("Task interrupt detected", exec.interruptReceived.get());
Thread.sleep(10);
}

assertTrue("Task run at all", taskRun.get());

} finally {
exec.shutdownNow();
}
}
}
Loading