Skip to content

Fix some scaling issues with the global queue in the thread pool #69386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ public int Count
}
}

private const int ProcessorsPerAssignableWorkItemQueue = 16;
private static readonly int s_assignableWorkItemQueueCount =
Environment.ProcessorCount <= 32 ? 0 :
(Environment.ProcessorCount + (ProcessorsPerAssignableWorkItemQueue - 1)) / ProcessorsPerAssignableWorkItemQueue;

private bool _loggingEnabled;
private bool _dispatchNormalPriorityWorkFirst;
private int _mayHaveHighPriorityWorkItems;
Expand All @@ -395,6 +400,16 @@ public int Count
internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();
internal readonly ConcurrentQueue<object> highPriorityWorkItems = new ConcurrentQueue<object>();

// SOS's ThreadPool command depends on the following name. The global queue doesn't scale well beyond a point of
// concurrency. Some additional queues may be added and assigned to a limited number of worker threads if necessary to
// help with limiting the concurrency level.
internal readonly ConcurrentQueue<object>[] _assignableWorkItemQueues =
new ConcurrentQueue<object>[s_assignableWorkItemQueueCount];

private readonly LowLevelLock _queueAssignmentLock = new();
private readonly int[] _assignedWorkItemQueueThreadCounts =
s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty<int>();

[StructLayout(LayoutKind.Sequential)]
private struct CacheLineSeparated
{
Expand All @@ -409,9 +424,126 @@ private struct CacheLineSeparated

public ThreadPoolWorkQueue()
{
for (int i = 0; i < s_assignableWorkItemQueueCount; i++)
{
_assignableWorkItemQueues[i] = new ConcurrentQueue<object>();
}

RefreshLoggingEnabled();
}

private void AssignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl)
{
Debug.Assert(s_assignableWorkItemQueueCount > 0);

_queueAssignmentLock.Acquire();

// Determine the first queue that has not yet been assigned to the limit of worker threads
int queueIndex = -1;
int minCount = int.MaxValue;
int minCountQueueIndex = 0;
for (int i = 0; i < s_assignableWorkItemQueueCount; i++)
{
int count = _assignedWorkItemQueueThreadCounts[i];
Debug.Assert(count >= 0);
if (count < ProcessorsPerAssignableWorkItemQueue)
{
queueIndex = i;
_assignedWorkItemQueueThreadCounts[queueIndex] = count + 1;
break;
}

if (count < minCount)
{
minCount = count;
minCountQueueIndex = i;
}
}

if (queueIndex < 0)
{
// All queues have been fully assigned. Choose the queue that has been assigned to the least number of worker
// threads.
queueIndex = minCountQueueIndex;
_assignedWorkItemQueueThreadCounts[queueIndex]++;
}

_queueAssignmentLock.Release();

tl.queueIndex = queueIndex;
tl.assignedGlobalWorkItemQueue = _assignableWorkItemQueues[queueIndex];
}

private void TryReassignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl)
{
Debug.Assert(s_assignableWorkItemQueueCount > 0);

int queueIndex = tl.queueIndex;
if (queueIndex == 0)
{
return;
}

if (!_queueAssignmentLock.TryAcquire())
{
return;
}

// If the currently assigned queue is assigned to other worker threads, try to reassign an earlier queue to this
// worker thread if the earlier queue is not assigned to the limit of worker threads
Debug.Assert(_assignedWorkItemQueueThreadCounts[queueIndex] >= 0);
if (_assignedWorkItemQueueThreadCounts[queueIndex] > 1)
{
for (int i = 0; i < queueIndex; i++)
{
if (_assignedWorkItemQueueThreadCounts[i] < ProcessorsPerAssignableWorkItemQueue)
{
_assignedWorkItemQueueThreadCounts[queueIndex]--;
queueIndex = i;
_assignedWorkItemQueueThreadCounts[queueIndex]++;
break;
}
}
}

_queueAssignmentLock.Release();

tl.queueIndex = queueIndex;
tl.assignedGlobalWorkItemQueue = _assignableWorkItemQueues[queueIndex];
}

private void UnassignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl)
{
Debug.Assert(s_assignableWorkItemQueueCount > 0);

int queueIndex = tl.queueIndex;

_queueAssignmentLock.Acquire();
int newCount = --_assignedWorkItemQueueThreadCounts[queueIndex];
_queueAssignmentLock.Release();

Debug.Assert(newCount >= 0);
if (newCount > 0)
{
return;
}

// This queue is not assigned to any other worker threads. Move its work items to the global queue to prevent them
// from being starved for a long duration.
bool movedWorkItem = false;
ConcurrentQueue<object> queue = tl.assignedGlobalWorkItemQueue;
while (_assignedWorkItemQueueThreadCounts[queueIndex] <= 0 && queue.TryDequeue(out object? workItem))
{
workItems.Enqueue(workItem);
movedWorkItem = true;
}

if (movedWorkItem)
{
EnsureThreadRequested();
}
}

public ThreadPoolWorkQueueThreadLocals GetOrCreateThreadLocals() =>
ThreadPoolWorkQueueThreadLocals.threadLocals ?? CreateThreadLocals();

Expand Down Expand Up @@ -479,7 +611,11 @@ public void Enqueue(object callback, bool forceGlobal)
}
else
{
workItems.Enqueue(callback);
ConcurrentQueue<object> queue =
s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
? tl.assignedGlobalWorkItemQueue
: workItems;
queue.Enqueue(callback);
}

EnsureThreadRequested();
Expand Down Expand Up @@ -533,32 +669,53 @@ internal static bool LocalFindAndPop(object callback)
return workItem;
}

// Check for global work items
// Check for work items from the assigned global queue
if (s_assignableWorkItemQueueCount > 0 && tl.assignedGlobalWorkItemQueue.TryDequeue(out workItem))
{
return workItem;
}

// Check for work items from the global queue
if (workItems.TryDequeue(out workItem))
{
return workItem;
}

// Try to steal from other threads' local work items
WorkStealingQueue localWsq = tl.workStealingQueue;
WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
int c = queues.Length;
Debug.Assert(c > 0, "There must at least be a queue for this thread.");
int maxIndex = c - 1;
uint i = tl.random.NextUInt32() % (uint)c;
while (c > 0)
// Check for work items in other assignable global queues
uint randomValue = tl.random.NextUInt32();
if (s_assignableWorkItemQueueCount > 0)
{
i = (i < maxIndex) ? i + 1 : 0;
WorkStealingQueue otherQueue = queues[i];
if (otherQueue != localWsq && otherQueue.CanSteal)
int queueIndex = tl.queueIndex;
int c = s_assignableWorkItemQueueCount;
int maxIndex = c - 1;
for (int i = (int)(randomValue % (uint)c); c > 0; i = i < maxIndex ? i + 1 : 0, c--)
{
workItem = otherQueue.TrySteal(ref missedSteal);
if (workItem != null)
if (i != queueIndex && _assignableWorkItemQueues[i].TryDequeue(out workItem))
{
return workItem;
}
}
c--;
}

// Try to steal from other threads' local work items
{
WorkStealingQueue localWsq = tl.workStealingQueue;
WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
int c = queues.Length;
Debug.Assert(c > 0, "There must at least be a queue for this thread.");
int maxIndex = c - 1;
for (int i = (int)(randomValue % (uint)c); c > 0; i = i < maxIndex ? i + 1 : 0, c--)
{
WorkStealingQueue otherQueue = queues[i];
if (otherQueue != localWsq && otherQueue.CanSteal)
{
workItem = otherQueue.TrySteal(ref missedSteal);
if (workItem != null)
{
return workItem;
}
}
}
}

return null;
Expand Down Expand Up @@ -594,7 +751,19 @@ public static long LocalCount
}
}

public long GlobalCount => (long)highPriorityWorkItems.Count + workItems.Count;
public long GlobalCount
{
get
{
long count = (long)highPriorityWorkItems.Count + workItems.Count;
for (int i = 0; i < s_assignableWorkItemQueueCount; i++)
{
count += _assignableWorkItemQueues[i].Count;
}

return count;
}
}

// Time in ms for which ThreadPoolWorkQueue.Dispatch keeps executing normal work items before either returning from
// Dispatch (if YieldFromDispatchLoop is true), or performing periodic activities
Expand All @@ -612,6 +781,11 @@ internal static bool Dispatch()
ThreadPoolWorkQueue workQueue = ThreadPool.s_workQueue;
ThreadPoolWorkQueueThreadLocals tl = workQueue.GetOrCreateThreadLocals();

if (s_assignableWorkItemQueueCount > 0)
{
workQueue.AssignWorkItemQueue(tl);
}

// Before dequeuing the first work item, acknowledge that the thread request has been satisfied
workQueue.MarkThreadRequestSatisfied();

Expand All @@ -625,7 +799,12 @@ internal static bool Dispatch()
if (dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal)
{
workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst;
workQueue.workItems.TryDequeue(out workItem);
ConcurrentQueue<object> queue =
s_assignableWorkItemQueueCount > 0 ? tl.assignedGlobalWorkItemQueue : workQueue.workItems;
if (!queue.TryDequeue(out workItem) && s_assignableWorkItemQueueCount > 0)
{
workQueue.workItems.TryDequeue(out workItem);
}
}

if (workItem == null)
Expand All @@ -635,6 +814,11 @@ internal static bool Dispatch()

if (workItem == null)
{
if (s_assignableWorkItemQueueCount > 0)
{
workQueue.UnassignWorkItemQueue(tl);
}

//
// No work.
// If we missed a steal, though, there may be more work in the queue.
Expand Down Expand Up @@ -689,6 +873,11 @@ internal static bool Dispatch()

if (workItem == null)
{
if (s_assignableWorkItemQueueCount > 0)
{
workQueue.UnassignWorkItemQueue(tl);
}

//
// No work.
// If we missed a steal, though, there may be more work in the queue.
Expand Down Expand Up @@ -753,6 +942,10 @@ internal static bool Dispatch()
// processing work items.
tl.TransferLocalWork();
tl.isProcessingHighPriorityWorkItems = false;
if (s_assignableWorkItemQueueCount > 0)
{
workQueue.UnassignWorkItemQueue(tl);
}
return false;
}

Expand All @@ -769,9 +962,20 @@ internal static bool Dispatch()
// The runtime-specific thread pool implementation requires the Dispatch loop to return to the VM
// periodically to let it perform its own work
tl.isProcessingHighPriorityWorkItems = false;
if (s_assignableWorkItemQueueCount > 0)
{
workQueue.UnassignWorkItemQueue(tl);
}
return true;
}

if (s_assignableWorkItemQueueCount > 0)
{
// Due to hill climbing, over time arbitrary worker threads may stop working and eventually unbalance the
// queue assignments. Periodically try to reassign a queue to keep the assigned queues busy.
workQueue.TryReassignWorkItemQueue(tl);
}

// This method will continue to dispatch work items. Refresh the start tick count for the next dispatch
// quantum and do some periodic activities.
startTickCount = currentTickCount;
Expand Down Expand Up @@ -823,6 +1027,8 @@ internal sealed class ThreadPoolWorkQueueThreadLocals
public static ThreadPoolWorkQueueThreadLocals? threadLocals;

public bool isProcessingHighPriorityWorkItems;
public int queueIndex;
public ConcurrentQueue<object> assignedGlobalWorkItemQueue;
public readonly ThreadPoolWorkQueue workQueue;
public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
public readonly Thread currentThread;
Expand All @@ -831,6 +1037,7 @@ internal sealed class ThreadPoolWorkQueueThreadLocals

public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
{
assignedGlobalWorkItemQueue = tpq.workItems;
workQueue = tpq;
workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue();
ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
Expand Down Expand Up @@ -1406,6 +1613,15 @@ internal static IEnumerable<object> GetQueuedWorkItems()
yield return workItem;
}

// Enumerate assignable global queues
foreach (ConcurrentQueue<object> queue in s_workQueue._assignableWorkItemQueues)
{
foreach (object workItem in queue)
{
yield return workItem;
}
}

// Enumerate global queue
foreach (object workItem in s_workQueue.workItems)
{
Expand Down