You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
ExecutorScheduler uses ScheduledRunnable for Scheduled tasks with delay. Disposing ScheduledRunnable cancels the set Future here: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java#L117
In case you're disposing it from a different thread, mayInterruptIfRunning will be set to true when disposing contrary to ExecutorScheduler#interruptibleWorker value.
Essentially, if you do Schedulers.from(executor, interruptibleWorker = false), you'll have uncleared interrupted flag set to your thread in some cases.
Consider this example:
//Creating an executor that throws if attempting to send an interruptible cancel
private class ThrowOnInterruptExecutor : ScheduledThreadPoolExecutor(10) {
override fun <V : Any?> schedule(callable: Callable<V>?, delay: Long, unit: TimeUnit?): ScheduledFuture<V> {
return CrashOnInterruptFuture(super.schedule(callable, delay, unit))
}
class CrashOnInterruptFuture<T>(
private val delegate: ScheduledFuture<T>
) : ScheduledFuture<T> by delegate {
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
return if (mayInterruptIfRunning) {
throw IllegalStateException("Attempt to interrupt!")
} else {
delegate.cancel(mayInterruptIfRunning)
}
}
}
}
fun main() {
val scheduler = Schedulers.from(ThrowOnInterruptExecutor())
val worker = scheduler.createWorker()
val d = worker.schedule({ }, 1, TimeUnit.SECONDS)
d.dispose()
}
We'll get
Exception in thread "main" java.lang.IllegalStateException: Attempt to interrupt!
at ThrowOnInterruptExecutor$CrashOnInterruptFuture.cancel(Test.kt:16)
at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.dispose(ScheduledRunnable.java:117)
at io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:124)
at io.reactivex.rxjava3.internal.disposables.SequentialDisposable.dispose(SequentialDisposable.java:72)
at io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:124)
at io.reactivex.rxjava3.internal.disposables.SequentialDisposable.dispose(SequentialDisposable.java:72)
Even though the task didn't start yet, we still make an attempt to interrupt. In production i see crashes related to uncleared interrupted flag, which are caused by this.
Should ScheduledRunnable accept interruptible flag and call cancel(interruptibleWorker && async) upon dispose instead?
Thanks.
The text was updated successfully, but these errors were encountered:
Thanks for the detailed issue report. I could reproduce the issue.
Indeed this looks like an oversight in the use of the ScheduledRunnable from within Schedulers.from. It only affects the waiting period, when the task runs, the interrupts do not happen.
Version: 3.1.8
ExecutorScheduler
usesScheduledRunnable
for Scheduled tasks with delay. DisposingScheduledRunnable
cancels the setFuture
here:https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java#L117
In case you're disposing it from a different thread,
mayInterruptIfRunning
will be set totrue
when disposing contrary toExecutorScheduler#interruptibleWorker
value.Essentially, if you do
Schedulers.from(executor, interruptibleWorker = false)
, you'll have unclearedinterrupted
flag set to your thread in some cases.Consider this example:
We'll get
Even though the task didn't start yet, we still make an attempt to interrupt. In production i see crashes related to uncleared interrupted flag, which are caused by this.
Should
ScheduledRunnable
acceptinterruptible
flag and callcancel(interruptibleWorker && async)
upon dispose instead?Thanks.
The text was updated successfully, but these errors were encountered: