Skip to content

Commit 8d4f53d

Browse files
kouvelJo Shields
authored andcommitted
Fix a race condition in the thread pool (dotnet#68171)
* Fix a race condition in the thread pool There is a case where on a work-stealing queue, both `LocalPop()` and `TrySteal()` may fail when running concurrently, and lead to a case where there is a work item but no threads are released to process it. Fixed to always ensure that there's a thread request when there was a missed steal. Also when `LocalPop()` fails, the thread does not attempt to pop anymore and that can be an issue if that thread is the last thread to look for work items. Fixed to always check the local queue. Fixes dotnet#67545
1 parent 4576be0 commit 8d4f53d

File tree

2 files changed

+85
-37
lines changed

2 files changed

+85
-37
lines changed

src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,6 @@ public void Enqueue(object callback, bool forceGlobal)
476476
if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
477477
{
478478
tl.workStealingQueue.LocalPush(callback);
479-
tl.workState |= ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems;
480479
}
481480
else
482481
{
@@ -510,30 +509,21 @@ internal static bool LocalFindAndPop(object callback)
510509
public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
511510
{
512511
// Check for local work items
513-
object? workItem;
514-
ThreadPoolWorkQueueThreadLocals.WorkState tlWorkState = tl.workState;
515-
if ((tlWorkState & ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems) != 0)
512+
object? workItem = tl.workStealingQueue.LocalPop();
513+
if (workItem != null)
516514
{
517-
workItem = tl.workStealingQueue.LocalPop();
518-
if (workItem != null)
519-
{
520-
return workItem;
521-
}
522-
523-
Debug.Assert(tlWorkState == tl.workState);
524-
tl.workState = tlWorkState &= ~ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems;
515+
return workItem;
525516
}
526517

527518
// Check for high-priority work items
528-
if ((tlWorkState & ThreadPoolWorkQueueThreadLocals.WorkState.IsProcessingHighPriorityWorkItems) != 0)
519+
if (tl.isProcessingHighPriorityWorkItems)
529520
{
530521
if (highPriorityWorkItems.TryDequeue(out workItem))
531522
{
532523
return workItem;
533524
}
534525

535-
Debug.Assert(tlWorkState == tl.workState);
536-
tl.workState = tlWorkState &= ~ThreadPoolWorkQueueThreadLocals.WorkState.IsProcessingHighPriorityWorkItems;
526+
tl.isProcessingHighPriorityWorkItems = false;
537527
}
538528
else if (
539529
_mayHaveHighPriorityWorkItems != 0 &&
@@ -579,14 +569,14 @@ private bool TryStartProcessingHighPriorityWorkItemsAndDequeue(
579569
ThreadPoolWorkQueueThreadLocals tl,
580570
[MaybeNullWhen(false)] out object workItem)
581571
{
582-
Debug.Assert((tl.workState & ThreadPoolWorkQueueThreadLocals.WorkState.IsProcessingHighPriorityWorkItems) == 0);
572+
Debug.Assert(!tl.isProcessingHighPriorityWorkItems);
583573

584574
if (!highPriorityWorkItems.TryDequeue(out workItem))
585575
{
586576
return false;
587577
}
588578

589-
tl.workState |= ThreadPoolWorkQueueThreadLocals.WorkState.IsProcessingHighPriorityWorkItems;
579+
tl.isProcessingHighPriorityWorkItems = true;
590580
_mayHaveHighPriorityWorkItems = 1;
591581
return true;
592582
}
@@ -632,8 +622,7 @@ internal static bool Dispatch()
632622
// take over the thread, sustaining starvation. For example, when worker threads are continually starved,
633623
// high-priority work items may always be queued and normal-priority work items may not get a chance to run.
634624
bool dispatchNormalPriorityWorkFirst = workQueue._dispatchNormalPriorityWorkFirst;
635-
if (dispatchNormalPriorityWorkFirst &&
636-
(tl.workState & ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems) == 0)
625+
if (dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal)
637626
{
638627
workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst;
639628
workQueue.workItems.TryDequeue(out workItem);
@@ -670,7 +659,7 @@ internal static bool Dispatch()
670659
// reason that may have a dependency on other queued work items.
671660
workQueue.EnsureThreadRequested();
672661

673-
// After this point, this method is no longer responsible for ensuring thread requests
662+
// After this point, this method is no longer responsible for ensuring thread requests except for missed steals
674663
}
675664

676665
// Has the desire for logging changed since the last time we entered?
@@ -700,8 +689,18 @@ internal static bool Dispatch()
700689

701690
if (workItem == null)
702691
{
703-
// May have missed a steal, but this method is not responsible for ensuring thread requests anymore. See
704-
// the dequeue before the loop.
692+
//
693+
// No work.
694+
// If we missed a steal, though, there may be more work in the queue.
695+
// Instead of looping around and trying again, we'll just request another thread. Hopefully the thread
696+
// that owns the contended work-stealing queue will pick up its own workitems in the meantime,
697+
// which will be more efficient than this thread doing it anyway.
698+
//
699+
if (missedSteal)
700+
{
701+
workQueue.EnsureThreadRequested();
702+
}
703+
705704
return true;
706705
}
707706
}
@@ -753,7 +752,7 @@ internal static bool Dispatch()
753752
// to ensure that they would not be heavily delayed. Tell the caller that this thread was requested to stop
754753
// processing work items.
755754
tl.TransferLocalWork();
756-
tl.ResetWorkItemProcessingState();
755+
tl.isProcessingHighPriorityWorkItems = false;
757756
return false;
758757
}
759758

@@ -769,7 +768,7 @@ internal static bool Dispatch()
769768
{
770769
// The runtime-specific thread pool implementation requires the Dispatch loop to return to the VM
771770
// periodically to let it perform its own work
772-
tl.ResetWorkItemProcessingState();
771+
tl.isProcessingHighPriorityWorkItems = false;
773772
return true;
774773
}
775774

@@ -823,7 +822,7 @@ internal sealed class ThreadPoolWorkQueueThreadLocals
823822
[ThreadStatic]
824823
public static ThreadPoolWorkQueueThreadLocals? threadLocals;
825824

826-
public WorkState workState;
825+
public bool isProcessingHighPriorityWorkItems;
827826
public readonly ThreadPoolWorkQueue workQueue;
828827
public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
829828
public readonly Thread currentThread;
@@ -839,16 +838,12 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
839838
threadLocalCompletionCountObject = ThreadPool.GetOrCreateThreadLocalCompletionCountObject();
840839
}
841840

842-
public void ResetWorkItemProcessingState() => workState &= ~WorkState.IsProcessingHighPriorityWorkItems;
843-
844841
public void TransferLocalWork()
845842
{
846843
while (workStealingQueue.LocalPop() is object cb)
847844
{
848845
workQueue.Enqueue(cb, forceGlobal: true);
849846
}
850-
851-
workState &= ~WorkState.MayHaveLocalWorkItems;
852847
}
853848

854849
~ThreadPoolWorkQueueThreadLocals()
@@ -860,13 +855,6 @@ public void TransferLocalWork()
860855
ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
861856
}
862857
}
863-
864-
[Flags]
865-
public enum WorkState
866-
{
867-
MayHaveLocalWorkItems = 1 << 0,
868-
IsProcessingHighPriorityWorkItems = 1 << 1
869-
}
870858
}
871859

872860
// A strongly typed callback for ThreadPoolTypedWorkItemQueue<T, TCallback>.
@@ -948,7 +936,7 @@ void IThreadPoolWorkItem.Execute()
948936
// yield to the thread pool after some time. The threshold used is half of the thread pool's dispatch quantum,
949937
// which the thread pool uses for doing periodic work.
950938
if (++completedCount == uint.MaxValue ||
951-
(tl.workState & ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems) != 0 ||
939+
tl.workStealingQueue.CanSteal ||
952940
(uint)(Environment.TickCount - startTimeMs) >= ThreadPoolWorkQueue.DispatchQuantumMs / 2 ||
953941
!_workItems.TryDequeue(out workItem))
954942
{

src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System.Collections.Generic;
55
using System.Diagnostics;
6+
using System.IO;
67
using System.Linq;
78
using System.Reflection;
89
using System.Threading.Tasks;
@@ -1020,6 +1021,65 @@ public static void CooperativeBlockingWithProcessingThreadsAndGoalThreadsAndAddW
10201021
}).Dispose();
10211022
}
10221023

1024+
[ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))]
1025+
public void FileStreamFlushAsyncThreadPoolDeadlockTest()
1026+
{
1027+
// This test was occasionally causing the deadlock described in https://github.com/dotnet/runtime/pull/68171. Run it
1028+
// in a remote process to test it with a dedicated thread pool.
1029+
RemoteExecutor.Invoke(async () =>
1030+
{
1031+
const int OneKibibyte = 1 << 10;
1032+
const int FourKibibytes = OneKibibyte << 2;
1033+
const int FileSize = 1024;
1034+
1035+
string destinationFilePath = null;
1036+
try
1037+
{
1038+
destinationFilePath = CreateFileWithRandomContent(FileSize);
1039+
1040+
static string CreateFileWithRandomContent(int fileSize)
1041+
{
1042+
string filePath = Path.GetTempFileName();
1043+
File.WriteAllBytes(filePath, CreateArray(fileSize));
1044+
return filePath;
1045+
}
1046+
1047+
static byte[] CreateArray(int count)
1048+
{
1049+
var result = new byte[count];
1050+
const int Seed = 12345;
1051+
var random = new Random(Seed);
1052+
random.NextBytes(result);
1053+
return result;
1054+
}
1055+
1056+
for (int j = 0; j < 100; j++)
1057+
{
1058+
using var fileStream =
1059+
new FileStream(
1060+
destinationFilePath,
1061+
FileMode.Create,
1062+
FileAccess.Write,
1063+
FileShare.Read,
1064+
FourKibibytes,
1065+
FileOptions.None);
1066+
for (int i = 0; i < FileSize; i++)
1067+
{
1068+
fileStream.WriteByte(default);
1069+
await fileStream.FlushAsync();
1070+
}
1071+
}
1072+
}
1073+
finally
1074+
{
1075+
if (!string.IsNullOrEmpty(destinationFilePath) && File.Exists(destinationFilePath))
1076+
{
1077+
File.Delete(destinationFilePath);
1078+
}
1079+
}
1080+
}).Dispose();
1081+
}
1082+
10231083
public static bool IsThreadingAndRemoteExecutorSupported =>
10241084
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;
10251085
}

0 commit comments

Comments
 (0)