Skip to content

Deadlock when using TaskPoolScheduler recursively. #2203

@JakenVeina

Description

@JakenVeina

Bug

Which library version?

System.Reactive 6.0.1

What are the platform(s), environment(s) and related component version(s)?

Windows 10 x64

Do you have a code snippet or project that reproduces the problem?

The following code consistently produces a deadlock, within 10 seconds or so of runtime:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace Test;

public static class EntryPoint
{
    public static void Main()
    {
        using var subscriptionA = BuildLoop("A")
            .Subscribe(Console.WriteLine);

        using var subscriptionB = BuildLoop("B")
            .Subscribe(Console.WriteLine);

        Console.ReadKey();
    }

    public static IObservable<string> BuildLoop(string name)
        => Observable.Interval(TimeSpan.FromMilliseconds(2))
            .Select(count => $"Loop {name}: Iteration {count}")
            .CustomOperator(scheduler: TaskPoolScheduler.Default);

    public static IObservable<T> CustomOperator<T>(
            this    IObservable<T>  source,
                    IScheduler      scheduler)
        => Observable.Create<T>(observer
            => new CustomOperatorSubscription<T>(
                source:     source,
                observer:   observer,
                scheduler:  scheduler));

    private sealed class CustomOperatorSubscription<T>
        : IDisposable
    {
        public CustomOperatorSubscription(
            IObservable<T>  source,
            IObserver<T>    observer,
            IScheduler      scheduler)
        {
            _scheduler              = scheduler;
            _synchronizationGate    = new();

            _sourceSubscription = source
                .Synchronize(_synchronizationGate)
                .Do(value => Process())
                .SubscribeSafe(observer);
        }

        public void Dispose()
        {
            lock (_synchronizationGate)
            {
                _sourceSubscription.Dispose();
            }
        }

        private void Process()
            => _scheduler.Schedule(
                state:      this,
                dueTime:    _scheduler.Now + TimeSpan.FromMilliseconds(2),
                action:     static (_, @this) =>
                {
                    lock (@this._synchronizationGate)
                    {
                        @this.Process();
                    }

                    return Disposable.Empty;
                });

        private readonly IScheduler     _scheduler;
        private readonly IDisposable    _sourceSubscription;
        private readonly object         _synchronizationGate;
    }
}

What is the use case or problem?

Even though there are two entirely separate "loop" subscriptions here, each using its own separate _synchronizationGate object for locking and .Synchronize(), the use of the shared TaskPoolScheduler.Default scheduler instance results in a deadlock.

The deadlock arises from the fact that TaskPoolScheduler implements locking internally (inherited from LocalScheduler) here, and the fact that TaskPoolScheduler will sometimes execute previously-scheduled actions synchronously, within calls to .Schedule(), due to the use of TaskContinuationOptions.ExecuteSynchronously here and here.

Obviously, this code does not directly represent a useful scenario, it's an extremely-trimmed-down version of the production code that encountered this issue. Details about the real-world scenario can be found here. The biggest difference is probably that actions do not always re-schedule themselves, there is decision logic that ensures only one background action is scheduled at-a-time, per operator subscription.

Only the TaskPoolScheduler exhibits this behavior. Switching to other multi-threading schedulers like DefaultScheduler or ThreadPoolScheduler eliminates the issue.

What is the expected outcome?

My expectation was that calling IScheduler.Schedule() for a multi-threading scheduler, would be a "fire-and-forget" operation. That there would be no risk of this call accessing resources that might be locked by another thread, except for those used internally by the scheduler. Thus, it would be safe to implement a "loop" by having scheduler actions re-schedule themselves, after their work is complete.

What is the actual outcome?

The way the deadlock presents is as follows:

  • Thread 1 enters Lock A
  • Thread 1 enters Scheduler Lock
  • Thread 1 adds Action Observable.Cast<T> #1 to scheduler queue
  • Thread 1 leaves Scheduler Lock
  • Thread 1 leaves Lock A
  • Thread 2 enters Lock B
  • Thread 2 enters Scheduler Lock
  • Thread 3 enters Lock A
  • Thread 3 waits for Scheduler Lock
  • Thread 2 adds Action Rx 2.2 beta and Xamarin.Android #2 to scheduler queue
  • Thread 2 sees that Action Observable.Cast<T> #1 is due and invokes it
  • Thread 2 waits for Lock A, due to Action Roadmap #1859

We now have Thread 2 holding a lock that Thread 3 is waiting on, and Thread 3 holding a lock that Thread 2 is waiting on.

The root cause of this, I would say, is the fact that the actions being invoked by the scheduler will themselves invoke the scheduler, and the scheduler will sometimes synchronously-invoke actions other than the one that was just scheduled.

What is the stacktrace of the exception(s) if any?

No exceptions, but here's the calls stacks during a deadlock:

Thread 16216:

 	[Deadlocked, double-click or press enter to view all thread stacks]	
 	[Waiting on lock owned by Thread 28752, double-click or press enter to switch to thread]	
>	Sandbox.dll!Test.EntryPoint.CustomOperatorSubscription<string>.Process.AnonymousMethod__2_0(System.Reactive.Concurrency.IScheduler _, Test.EntryPoint.CustomOperatorSubscription<string> this) Line 67	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.WorkItem<Test.EntryPoint.CustomOperatorSubscription<string>>.InvokeCore(System.Reactive.Concurrency.IScheduler scheduler) Line 482	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.WorkItem.Invoke(System.Reactive.Concurrency.IScheduler scheduler) Line 449	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.ExecuteNextShortTermWorkItem(System.Reactive.Concurrency.IScheduler scheduler, System.IDisposable cancel) Line 260	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.ScheduleShortTermWork.AnonymousMethod__22_0(System.Reactive.Concurrency.IScheduler self, (System.Reactive.Concurrency.LocalScheduler this, System.Reactive.Disposables.SingleAssignmentDisposable d) tuple) Line 197	C#
 	System.Reactive.dll!System.Reactive.Concurrency.TaskPoolScheduler.SlowlyScheduledWorkItem<(System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable)>..ctor.AnonymousMethod__4_0(System.Threading.Tasks.Task _, object thisObject) Line 99	C#
 	System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot, System.Threading.Thread threadPoolThread)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.ThreadPoolTaskScheduler.TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.TaskScheduler.TryRunInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.TaskContinuation.InlineIfPossibleOrElseQueue(System.Threading.Tasks.Task task, bool needsProtection)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.ContinueWithTaskContinuation.Run(System.Threading.Tasks.Task completedTask, bool canInlineContinuationTask)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.ContinueWithCore(System.Threading.Tasks.Task continuationTask, System.Threading.Tasks.TaskScheduler scheduler, System.Threading.CancellationToken cancellationToken, System.Threading.Tasks.TaskContinuationOptions options)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.ContinueWith(System.Action<System.Threading.Tasks.Task, object> continuationAction, object state, System.Threading.Tasks.TaskScheduler scheduler, System.Threading.CancellationToken cancellationToken, System.Threading.Tasks.TaskContinuationOptions continuationOptions)	Unknown
 	System.Reactive.dll!System.Reactive.Concurrency.TaskPoolScheduler.SlowlyScheduledWorkItem<(System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable)>.SlowlyScheduledWorkItem(System.Reactive.Concurrency.TaskPoolScheduler scheduler, (System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable) state, System.TimeSpan dueTime, System.Func<System.Reactive.Concurrency.IScheduler, (System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable), System.IDisposable> action) Line 92	C#
 	System.Reactive.dll!System.Reactive.Concurrency.TaskPoolScheduler.ScheduleSlow<(System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable)>((System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable) state, System.TimeSpan dueTime, System.Func<System.Reactive.Concurrency.IScheduler, (System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable), System.IDisposable> action) Line 213	C#
 	System.Reactive.dll!System.Reactive.Concurrency.TaskPoolScheduler.Schedule<(System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable)>((System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable) state, System.TimeSpan dueTime, System.Func<System.Reactive.Concurrency.IScheduler, (System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable), System.IDisposable> action) Line 208	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.ScheduleShortTermWork(System.Reactive.Concurrency.LocalScheduler.WorkItem item) Line 197	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.Enqueue<Test.EntryPoint.CustomOperatorSubscription<string>>(Test.EntryPoint.CustomOperatorSubscription<string> state, System.DateTimeOffset dueTime, System.Func<System.Reactive.Concurrency.IScheduler, Test.EntryPoint.CustomOperatorSubscription<string>, System.IDisposable> action) Line 157	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.Schedule<Test.EntryPoint.CustomOperatorSubscription<string>>(Test.EntryPoint.CustomOperatorSubscription<string> state, System.DateTimeOffset dueTime, System.Func<System.Reactive.Concurrency.IScheduler, Test.EntryPoint.CustomOperatorSubscription<string>, System.IDisposable> action) Line 61	C#
 	Sandbox.dll!Test.EntryPoint.CustomOperatorSubscription<string>.Process() Line 62	C#
 	Sandbox.dll!Test.EntryPoint.CustomOperatorSubscription<string>.Process.AnonymousMethod__2_0(System.Reactive.Concurrency.IScheduler _, Test.EntryPoint.CustomOperatorSubscription<string> this) Line 69	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.WorkItem<Test.EntryPoint.CustomOperatorSubscription<string>>.InvokeCore(System.Reactive.Concurrency.IScheduler scheduler) Line 482	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.WorkItem.Invoke(System.Reactive.Concurrency.IScheduler scheduler) Line 449	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.ExecuteNextShortTermWorkItem(System.Reactive.Concurrency.IScheduler scheduler, System.IDisposable cancel) Line 260	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.ScheduleShortTermWork.AnonymousMethod__22_0(System.Reactive.Concurrency.IScheduler self, (System.Reactive.Concurrency.LocalScheduler this, System.Reactive.Disposables.SingleAssignmentDisposable d) tuple) Line 197	C#
 	System.Reactive.dll!System.Reactive.Concurrency.TaskPoolScheduler.SlowlyScheduledWorkItem<(System.Reactive.Concurrency.LocalScheduler, System.Reactive.Disposables.SingleAssignmentDisposable)>..ctor.AnonymousMethod__4_0(System.Threading.Tasks.Task _, object thisObject) Line 99	C#
 	System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot, System.Threading.Thread threadPoolThread)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.ThreadPoolTaskScheduler.TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.TaskScheduler.TryRunInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.TaskContinuation.InlineIfPossibleOrElseQueue(System.Threading.Tasks.Task task, bool needsProtection)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.ContinueWithTaskContinuation.Run(System.Threading.Tasks.Task completedTask, bool canInlineContinuationTask)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.RunContinuations(object continuationObject)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.TrySetResult()	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.DelayPromise.CompleteTimedOut()	Unknown
 	System.Private.CoreLib.dll!System.Threading.TimerQueueTimer.Fire(bool isThreadPool)	Unknown
 	System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()	Unknown
 	System.Private.CoreLib.dll!System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()	Unknown

Thread 28752:

 	[Deadlocked, double-click or press enter to view all thread stacks]	
 	[Waiting on lock owned by Thread 16216, double-click or press enter to switch to thread]	
>	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.ScheduleShortTermWork(System.Reactive.Concurrency.LocalScheduler.WorkItem item) Line 175	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.Enqueue<Test.EntryPoint.CustomOperatorSubscription<string>>(Test.EntryPoint.CustomOperatorSubscription<string> state, System.DateTimeOffset dueTime, System.Func<System.Reactive.Concurrency.IScheduler, Test.EntryPoint.CustomOperatorSubscription<string>, System.IDisposable> action) Line 157	C#
 	System.Reactive.dll!System.Reactive.Concurrency.LocalScheduler.Schedule<Test.EntryPoint.CustomOperatorSubscription<string>>(Test.EntryPoint.CustomOperatorSubscription<string> state, System.DateTimeOffset dueTime, System.Func<System.Reactive.Concurrency.IScheduler, Test.EntryPoint.CustomOperatorSubscription<string>, System.IDisposable> action) Line 61	C#
 	Sandbox.dll!Test.EntryPoint.CustomOperatorSubscription<string>.Process() Line 62	C#
 	Sandbox.dll!Test.EntryPoint.CustomOperatorSubscription<string>..ctor.AnonymousMethod__0_0(string value) Line 49	C#
 	System.Reactive.dll!System.Reactive.Linq.ObservableImpl.Do<string>.OnNext._.OnNext(string value) Line 38	C#
 	System.Reactive.dll!System.Reactive.Sink<string>.ForwardOnNext(string value) Line 49	C#
 	System.Reactive.dll!System.Reactive.Concurrency.Synchronize<string>._.OnNext(string value) Line 41	C#
 	System.Reactive.dll!System.Reactive.Sink<string>.ForwardOnNext(string value) Line 49	C#
 	System.Reactive.dll!System.Reactive.Linq.ObservableImpl.Select<long, string>.Selector._.OnNext(long value) Line 47	C#
 	System.Reactive.dll!System.Reactive.Sink<long>.ForwardOnNext(long value) Line 49	C#
 	System.Reactive.dll!System.Reactive.Linq.ObservableImpl.Timer.Periodic._.Tick() Line 168	C#
 	System.Reactive.dll!System.Reactive.Linq.ObservableImpl.Timer.Periodic._.Run.AnonymousMethod__4_0(System.Reactive.Linq.ObservableImpl.Timer.Periodic._ this) Line 141	C#
 	System.Reactive.dll!System.Reactive.Concurrency.Scheduler.SchedulePeriodic.AnonymousMethod__67_0((System.Reactive.Linq.ObservableImpl.Timer.Periodic._ state, System.Action<System.Reactive.Linq.ObservableImpl.Timer.Periodic._> action) t) Line 79	C#
 	System.Reactive.dll!System.Reactive.Concurrency.DefaultScheduler.PeriodicallyScheduledWorkItem<(System.Reactive.Linq.ObservableImpl.Timer.Periodic._, System.Action<System.Reactive.Linq.ObservableImpl.Timer.Periodic._>)>.Tick.AnonymousMethod__5_0(System.Reactive.Concurrency.DefaultScheduler.PeriodicallyScheduledWorkItem<(System.Reactive.Linq.ObservableImpl.Timer.Periodic._, System.Action<System.Reactive.Linq.ObservableImpl.Timer.Periodic._>)> closureWorkItem) Line 127	C#
 	System.Reactive.dll!System.Reactive.Concurrency.AsyncLock.Wait.AnonymousMethod__5_0(System.Delegate actionObject, object stateObject) Line 53	C#
 	System.Reactive.dll!System.Reactive.Concurrency.AsyncLock.Wait(object state, System.Delegate delegate, System.Action<System.Delegate, object> action) Line 93	C#
 	System.Reactive.dll!System.Reactive.Concurrency.AsyncLock.Wait<System.Reactive.Concurrency.DefaultScheduler.PeriodicallyScheduledWorkItem<(System.Reactive.Linq.ObservableImpl.Timer.Periodic._, System.Action<System.Reactive.Linq.ObservableImpl.Timer.Periodic._>)>>(System.Reactive.Concurrency.DefaultScheduler.PeriodicallyScheduledWorkItem<(System.Reactive.Linq.ObservableImpl.Timer.Periodic._, System.Action<System.Reactive.Linq.ObservableImpl.Timer.Periodic._>)> state, System.Action<System.Reactive.Concurrency.DefaultScheduler.PeriodicallyScheduledWorkItem<(System.Reactive.Linq.ObservableImpl.Timer.Periodic._, System.Action<System.Reactive.Linq.ObservableImpl.Timer.Periodic._>)>> action) Line 53	C#
 	System.Reactive.dll!System.Reactive.Concurrency.DefaultScheduler.PeriodicallyScheduledWorkItem<(System.Reactive.Linq.ObservableImpl.Timer.Periodic._, System.Action<System.Reactive.Linq.ObservableImpl.Timer.Periodic._>)>.Tick() Line 125	C#
 	System.Reactive.dll!System.Reactive.Concurrency.ConcurrencyAbstractionLayerImpl.PeriodicTimer.Tick() Line 209	C#
 	System.Reactive.dll!System.Reactive.Concurrency.ConcurrencyAbstractionLayerImpl.PeriodicTimer..ctor.AnonymousMethod__2_0(object this) Line 206	C#
 	System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)	Unknown
 	System.Private.CoreLib.dll!System.Threading.TimerQueueTimer.Fire(bool isThreadPool)	Unknown
 	System.Private.CoreLib.dll!System.Threading.TimerQueue.FireNextTimers()	Unknown
 	System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()	Unknown
 	System.Private.CoreLib.dll!System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()	Unknown

Possible Solutions

Disable synchronous continuation

I was able to clone System.Reactive and demonstrate that changing TaskContinuationOptions.ExecuteSynchronously to TaskContinuationOptions.RunContinuationsAsynchronously here and here eliminates the issue. This seems heavy-handed to me, as it means TaskPoolScheduler can no longer leverage one of the fundamental benefits of scheduling with Tasks.

Disable synchronous continuation of previously-scheduled actions

Instead of just swapping to TaskContinuationOptions.RunContinuationsAsynchronously all the time, perhaps it could be used selectively, such that the only action that might be invoked synchronously within .Schedule() is the one actively being scheduled, I.E. the way Task continuations generally work. I dunno if this would really be useful.

Release locks before invoking actions

The locking within TaskPoolScheduler seems to be related to accessing the internal queue, so in theory it would be possible to pull actions out of that queue, and then unlock before invoking them, so there's no risk of re-entrant deadlocking. Off the top of my head, I would say that maybe this would allow for out-of-order invocation of actions. Not sure if that's currently a guarantee.

Update documentation

If no real fix is feasible, I think it should at least be stated in documentation somewhere that recursive scheduling and/or locking while scheduling is not recommended for TaskPoolScheduler.

I'll be happy to work a PR for this, if a fix can be decided on.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions