Skip to content

[3.x] ScheduledRunnable does not respect ExecutorScheduler.interruptibleWorker #7744

Closed
@vafu

Description

@vafu

Version: 3.1.8

ExecutorScheduler uses ScheduledRunnable for Scheduled tasks with delay. Disposing ScheduledRunnable cancels the set Future here:
https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java#L117
In case you're disposing it from a different thread, mayInterruptIfRunning will be set to true when disposing contrary to ExecutorScheduler#interruptibleWorker value.
Essentially, if you do Schedulers.from(executor, interruptibleWorker = false), you'll have uncleared interrupted flag set to your thread in some cases.

Consider this example:

//Creating an executor that throws if attempting to send an interruptible cancel 
private class ThrowOnInterruptExecutor : ScheduledThreadPoolExecutor(10) {

    override fun <V : Any?> schedule(callable: Callable<V>?, delay: Long, unit: TimeUnit?): ScheduledFuture<V> {
        return CrashOnInterruptFuture(super.schedule(callable, delay, unit))
    }

    class CrashOnInterruptFuture<T>(
        private val delegate: ScheduledFuture<T>
    ) : ScheduledFuture<T> by delegate {

        override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
            return if (mayInterruptIfRunning) {
                throw IllegalStateException("Attempt to interrupt!")
            } else {
                delegate.cancel(mayInterruptIfRunning)
            }
        }
    }
}

fun main() {
    val scheduler = Schedulers.from(ThrowOnInterruptExecutor())
    val worker = scheduler.createWorker()
    val d = worker.schedule({    }, 1, TimeUnit.SECONDS)
    d.dispose()
}

We'll get

Exception in thread "main" java.lang.IllegalStateException: Attempt to interrupt!
	at ThrowOnInterruptExecutor$CrashOnInterruptFuture.cancel(Test.kt:16)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.dispose(ScheduledRunnable.java:117)
	at io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:124)
	at io.reactivex.rxjava3.internal.disposables.SequentialDisposable.dispose(SequentialDisposable.java:72)
	at io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:124)
	at io.reactivex.rxjava3.internal.disposables.SequentialDisposable.dispose(SequentialDisposable.java:72)

Even though the task didn't start yet, we still make an attempt to interrupt. In production i see crashes related to uncleared interrupted flag, which are caused by this.

Should ScheduledRunnable accept interruptible flag and call cancel(interruptibleWorker && async) upon dispose instead?
Thanks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions