Description
Version: 3.1.8
ExecutorScheduler
uses ScheduledRunnable
for Scheduled tasks with delay. Disposing ScheduledRunnable
cancels the set Future
here:
https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java#L117
In case you're disposing it from a different thread, mayInterruptIfRunning
will be set to true
when disposing contrary to ExecutorScheduler#interruptibleWorker
value.
Essentially, if you do Schedulers.from(executor, interruptibleWorker = false)
, you'll have uncleared interrupted
flag set to your thread in some cases.
Consider this example:
//Creating an executor that throws if attempting to send an interruptible cancel
private class ThrowOnInterruptExecutor : ScheduledThreadPoolExecutor(10) {
override fun <V : Any?> schedule(callable: Callable<V>?, delay: Long, unit: TimeUnit?): ScheduledFuture<V> {
return CrashOnInterruptFuture(super.schedule(callable, delay, unit))
}
class CrashOnInterruptFuture<T>(
private val delegate: ScheduledFuture<T>
) : ScheduledFuture<T> by delegate {
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
return if (mayInterruptIfRunning) {
throw IllegalStateException("Attempt to interrupt!")
} else {
delegate.cancel(mayInterruptIfRunning)
}
}
}
}
fun main() {
val scheduler = Schedulers.from(ThrowOnInterruptExecutor())
val worker = scheduler.createWorker()
val d = worker.schedule({ }, 1, TimeUnit.SECONDS)
d.dispose()
}
We'll get
Exception in thread "main" java.lang.IllegalStateException: Attempt to interrupt!
at ThrowOnInterruptExecutor$CrashOnInterruptFuture.cancel(Test.kt:16)
at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.dispose(ScheduledRunnable.java:117)
at io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:124)
at io.reactivex.rxjava3.internal.disposables.SequentialDisposable.dispose(SequentialDisposable.java:72)
at io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:124)
at io.reactivex.rxjava3.internal.disposables.SequentialDisposable.dispose(SequentialDisposable.java:72)
Even though the task didn't start yet, we still make an attempt to interrupt. In production i see crashes related to uncleared interrupted flag, which are caused by this.
Should ScheduledRunnable
accept interruptible
flag and call cancel(interruptibleWorker && async)
upon dispose instead?
Thanks.