Skip to content

Commit 57bd35a

Browse files
authored
[8.0] Add a config flag to experiment with some work item prioritization changes in cases involving a lot of sync-over-async (#103984)
* [8.0] Add a config flag to experiment with some work item prioritization changes in cases involving a lot of sync-over-async Some services that use a lot of sync-over-async were seen to experience stalls due to some priority inversion issues in work items that get queued to the thread pool. For instance, a work item W1 queues another work item W2 to the global queue and blocks waiting for a task to complete, where W2 would need to run in order to complete the task, but W2 is queued behind a number of other work items that operate like W1, and this sometimes leads to long-duration stalls. This change adds an experimental config option that when enabled, enqueues some kinds of work items to a new low-priority global queue that is checked after all other global queues. This was seen to help in some cases.
1 parent 787a48b commit 57bd35a

File tree

2 files changed

+178
-8
lines changed

2 files changed

+178
-8
lines changed

src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,17 @@ public int Count
381381
}
382382
}
383383

384+
#if CORECLR
385+
// This config var can be used to enable an experimental mode that may reduce the effects of some priority inversion
386+
// issues seen in cases involving a lot of sync-over-async. See EnqueueForPrioritizationExperiment() for more
387+
// information. The mode is experimental and may change in the future.
388+
internal static readonly bool s_prioritizationExperiment =
389+
AppContextConfigHelper.GetBooleanConfig(
390+
"System.Threading.ThreadPool.PrioritizationExperiment",
391+
"DOTNET_ThreadPool_PrioritizationExperiment",
392+
defaultValue: false);
393+
#endif
394+
384395
private const int ProcessorsPerAssignableWorkItemQueue = 16;
385396
private static readonly int s_assignableWorkItemQueueCount =
386397
Environment.ProcessorCount <= 32 ? 0 :
@@ -394,6 +405,11 @@ public int Count
394405
internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();
395406
internal readonly ConcurrentQueue<object> highPriorityWorkItems = new ConcurrentQueue<object>();
396407

408+
#if CORECLR
409+
internal readonly ConcurrentQueue<object> lowPriorityWorkItems =
410+
s_prioritizationExperiment ? new ConcurrentQueue<object>() : null!;
411+
#endif
412+
397413
// SOS's ThreadPool command depends on the following name. The global queue doesn't scale well beyond a point of
398414
// concurrency. Some additional queues may be added and assigned to a limited number of worker threads if necessary to
399415
// help with limiting the concurrency level.
@@ -598,23 +614,68 @@ public void Enqueue(object callback, bool forceGlobal)
598614
if (_loggingEnabled && FrameworkEventSource.Log.IsEnabled())
599615
FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);
600616

601-
ThreadPoolWorkQueueThreadLocals? tl;
602-
if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
617+
#if CORECLR
618+
if (s_prioritizationExperiment)
603619
{
604-
tl.workStealingQueue.LocalPush(callback);
620+
EnqueueForPrioritizationExperiment(callback, forceGlobal);
605621
}
606622
else
623+
#endif
607624
{
608-
ConcurrentQueue<object> queue =
609-
s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
610-
? tl.assignedGlobalWorkItemQueue
611-
: workItems;
612-
queue.Enqueue(callback);
625+
ThreadPoolWorkQueueThreadLocals? tl;
626+
if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
627+
{
628+
tl.workStealingQueue.LocalPush(callback);
629+
}
630+
else
631+
{
632+
ConcurrentQueue<object> queue =
633+
s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
634+
? tl.assignedGlobalWorkItemQueue
635+
: workItems;
636+
queue.Enqueue(callback);
637+
}
613638
}
614639

615640
EnsureThreadRequested();
616641
}
617642

643+
#if CORECLR
644+
[MethodImpl(MethodImplOptions.NoInlining)]
645+
private void EnqueueForPrioritizationExperiment(object callback, bool forceGlobal)
646+
{
647+
ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
648+
if (!forceGlobal && tl != null)
649+
{
650+
tl.workStealingQueue.LocalPush(callback);
651+
return;
652+
}
653+
654+
ConcurrentQueue<object> queue;
655+
656+
// This is a rough and experimental attempt at identifying work items that should be lower priority than other
657+
// global work items (even ones that haven't been queued yet), and to queue them to a low-priority global queue that
658+
// is checked after all other global queues. In some cases, a work item may queue another work item that is part of
659+
// the same set of work. For global work items, the second work item would typically get queued behind other global
660+
// work items. In some cases involving a lot of sync-over-async, that can significantly delay worker threads from
661+
// getting unblocked.
662+
if (tl == null && callback is QueueUserWorkItemCallbackBase)
663+
{
664+
queue = lowPriorityWorkItems;
665+
}
666+
else if (s_assignableWorkItemQueueCount > 0 && tl != null)
667+
{
668+
queue = tl.assignedGlobalWorkItemQueue;
669+
}
670+
else
671+
{
672+
queue = workItems;
673+
}
674+
675+
queue.Enqueue(callback);
676+
}
677+
#endif
678+
618679
public void EnqueueAtHighPriority(object workItem)
619680
{
620681
Debug.Assert((workItem is IThreadPoolWorkItem) ^ (workItem is Task));
@@ -691,6 +752,14 @@ internal static bool LocalFindAndPop(object callback)
691752
}
692753
}
693754

755+
#if CORECLR
756+
// Check for low-priority work items
757+
if (s_prioritizationExperiment && lowPriorityWorkItems.TryDequeue(out workItem))
758+
{
759+
return workItem;
760+
}
761+
#endif
762+
694763
// Try to steal from other threads' local work items
695764
{
696765
WorkStealingQueue localWsq = tl.workStealingQueue;
@@ -750,6 +819,13 @@ public long GlobalCount
750819
get
751820
{
752821
long count = (long)highPriorityWorkItems.Count + workItems.Count;
822+
#if CORECLR
823+
if (s_prioritizationExperiment)
824+
{
825+
count += lowPriorityWorkItems.Count;
826+
}
827+
#endif
828+
753829
for (int i = 0; i < s_assignableWorkItemQueueCount; i++)
754830
{
755831
count += _assignableWorkItemQueues[i].Count;
@@ -1639,6 +1715,17 @@ internal static IEnumerable<object> GetQueuedWorkItems()
16391715
yield return workItem;
16401716
}
16411717

1718+
#if CORECLR
1719+
if (ThreadPoolWorkQueue.s_prioritizationExperiment)
1720+
{
1721+
// Enumerate low-priority global queue
1722+
foreach (object workItem in s_workQueue.lowPriorityWorkItems)
1723+
{
1724+
yield return workItem;
1725+
}
1726+
}
1727+
#endif
1728+
16421729
// Enumerate each local queue
16431730
foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues)
16441731
{

src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,6 +1160,89 @@ public void ThreadPoolMinMaxThreadsEventTest()
11601160
}).Dispose();
11611161
}
11621162

1163+
[ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))]
1164+
public static void PrioritizationExperimentConfigVarTest()
1165+
{
1166+
// Avoid contaminating the main process' environment
1167+
RemoteExecutor.Invoke(() =>
1168+
{
1169+
// The actual test process below will inherit the config var
1170+
Environment.SetEnvironmentVariable("DOTNET_ThreadPool_PrioritizationExperiment", "1");
1171+
1172+
RemoteExecutor.Invoke(() =>
1173+
{
1174+
const int WorkItemCountPerKind = 100;
1175+
1176+
int completedWorkItemCount = 0;
1177+
var allWorkItemsCompleted = new AutoResetEvent(false);
1178+
Action<int> workItem = _ =>
1179+
{
1180+
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * 3)
1181+
{
1182+
allWorkItemsCompleted.Set();
1183+
}
1184+
};
1185+
1186+
var startTest = new ManualResetEvent(false);
1187+
1188+
var t = new Thread(() =>
1189+
{
1190+
// Enqueue global work from a non-thread-pool thread
1191+
1192+
startTest.CheckedWait();
1193+
1194+
for (int i = 0; i < WorkItemCountPerKind; i++)
1195+
{
1196+
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: false);
1197+
}
1198+
});
1199+
t.IsBackground = true;
1200+
t.Start();
1201+
1202+
ThreadPool.UnsafeQueueUserWorkItem(
1203+
_ =>
1204+
{
1205+
// Enqueue global work from a thread pool worker thread
1206+
1207+
startTest.CheckedWait();
1208+
1209+
for (int i = 0; i < WorkItemCountPerKind; i++)
1210+
{
1211+
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: false);
1212+
}
1213+
},
1214+
0,
1215+
preferLocal: false);
1216+
1217+
t = new Thread(() =>
1218+
{
1219+
// Enqueue local work from thread pool worker threads
1220+
1221+
Assert.True(WorkItemCountPerKind / 10 * 10 == WorkItemCountPerKind);
1222+
Action<int> localWorkItemEnqueuer = _ =>
1223+
{
1224+
for (int i = 0; i < WorkItemCountPerKind / 10; i++)
1225+
{
1226+
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: true);
1227+
}
1228+
};
1229+
1230+
startTest.CheckedWait();
1231+
1232+
for (int i = 0; i < 10; i++)
1233+
{
1234+
ThreadPool.UnsafeQueueUserWorkItem(localWorkItemEnqueuer, 0, preferLocal: false);
1235+
}
1236+
});
1237+
t.IsBackground = true;
1238+
t.Start();
1239+
1240+
startTest.Set();
1241+
allWorkItemsCompleted.CheckedWait();
1242+
}).Dispose();
1243+
}).Dispose();
1244+
}
1245+
11631246
public static bool IsThreadingAndRemoteExecutorSupported =>
11641247
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;
11651248

0 commit comments

Comments
 (0)