Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.x] ScheduledRunnable does not respect ExecutorScheduler.interruptibleWorker #7744

Closed
vafu opened this issue Aug 9, 2024 · 1 comment · Fixed by #7745
Closed

[3.x] ScheduledRunnable does not respect ExecutorScheduler.interruptibleWorker #7744

vafu opened this issue Aug 9, 2024 · 1 comment · Fixed by #7745

Comments

@vafu
Copy link

vafu commented Aug 9, 2024

Version: 3.1.8

ExecutorScheduler uses ScheduledRunnable for Scheduled tasks with delay. Disposing ScheduledRunnable cancels the set Future here:
https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java#L117
In case you're disposing it from a different thread, mayInterruptIfRunning will be set to true when disposing contrary to ExecutorScheduler#interruptibleWorker value.
Essentially, if you do Schedulers.from(executor, interruptibleWorker = false), you'll have uncleared interrupted flag set to your thread in some cases.

Consider this example:

//Creating an executor that throws if attempting to send an interruptible cancel 
private class ThrowOnInterruptExecutor : ScheduledThreadPoolExecutor(10) {

    override fun <V : Any?> schedule(callable: Callable<V>?, delay: Long, unit: TimeUnit?): ScheduledFuture<V> {
        return CrashOnInterruptFuture(super.schedule(callable, delay, unit))
    }

    class CrashOnInterruptFuture<T>(
        private val delegate: ScheduledFuture<T>
    ) : ScheduledFuture<T> by delegate {

        override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
            return if (mayInterruptIfRunning) {
                throw IllegalStateException("Attempt to interrupt!")
            } else {
                delegate.cancel(mayInterruptIfRunning)
            }
        }
    }
}

fun main() {
    val scheduler = Schedulers.from(ThrowOnInterruptExecutor())
    val worker = scheduler.createWorker()
    val d = worker.schedule({    }, 1, TimeUnit.SECONDS)
    d.dispose()
}

We'll get

Exception in thread "main" java.lang.IllegalStateException: Attempt to interrupt!
	at ThrowOnInterruptExecutor$CrashOnInterruptFuture.cancel(Test.kt:16)
	at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.dispose(ScheduledRunnable.java:117)
	at io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:124)
	at io.reactivex.rxjava3.internal.disposables.SequentialDisposable.dispose(SequentialDisposable.java:72)
	at io.reactivex.rxjava3.internal.disposables.DisposableHelper.dispose(DisposableHelper.java:124)
	at io.reactivex.rxjava3.internal.disposables.SequentialDisposable.dispose(SequentialDisposable.java:72)

Even though the task didn't start yet, we still make an attempt to interrupt. In production i see crashes related to uncleared interrupted flag, which are caused by this.

Should ScheduledRunnable accept interruptible flag and call cancel(interruptibleWorker && async) upon dispose instead?
Thanks.

@akarnokd
Copy link
Member

Thanks for the detailed issue report. I could reproduce the issue.

Indeed this looks like an oversight in the use of the ScheduledRunnable from within Schedulers.from. It only affects the waiting period, when the task runs, the interrupts do not happen.

I'll post a fix shortly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants