Skip to content

[8.0] Add a config flag to experiment with some work item prioritization changes in cases involving a lot of sync-over-async #103984

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,17 @@ public int Count
}
}

#if CORECLR
// This config var can be used to enable an experimental mode that may reduce the effects of some priority inversion
// issues seen in cases involving a lot of sync-over-async. See EnqueueForPrioritizationExperiment() for more
// information. The mode is experimental and may change in the future.
internal static readonly bool s_prioritizationExperiment =
AppContextConfigHelper.GetBooleanConfig(
"System.Threading.ThreadPool.PrioritizationExperiment",
"DOTNET_ThreadPool_PrioritizationExperiment",
defaultValue: false);
#endif

private const int ProcessorsPerAssignableWorkItemQueue = 16;
private static readonly int s_assignableWorkItemQueueCount =
Environment.ProcessorCount <= 32 ? 0 :
Expand All @@ -394,6 +405,11 @@ public int Count
internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();
internal readonly ConcurrentQueue<object> highPriorityWorkItems = new ConcurrentQueue<object>();

#if CORECLR
internal readonly ConcurrentQueue<object> lowPriorityWorkItems =
s_prioritizationExperiment ? new ConcurrentQueue<object>() : null!;
#endif

// SOS's ThreadPool command depends on the following name. The global queue doesn't scale well beyond a point of
// concurrency. Some additional queues may be added and assigned to a limited number of worker threads if necessary to
// help with limiting the concurrency level.
Expand Down Expand Up @@ -598,23 +614,68 @@ public void Enqueue(object callback, bool forceGlobal)
if (_loggingEnabled && FrameworkEventSource.Log.IsEnabled())
FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);

ThreadPoolWorkQueueThreadLocals? tl;
if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
#if CORECLR
if (s_prioritizationExperiment)
{
tl.workStealingQueue.LocalPush(callback);
EnqueueForPrioritizationExperiment(callback, forceGlobal);
}
else
#endif
{
ConcurrentQueue<object> queue =
s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
? tl.assignedGlobalWorkItemQueue
: workItems;
queue.Enqueue(callback);
ThreadPoolWorkQueueThreadLocals? tl;
if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
{
tl.workStealingQueue.LocalPush(callback);
}
else
{
ConcurrentQueue<object> queue =
s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
? tl.assignedGlobalWorkItemQueue
: workItems;
queue.Enqueue(callback);
}
}

EnsureThreadRequested();
}

#if CORECLR
[MethodImpl(MethodImplOptions.NoInlining)]
private void EnqueueForPrioritizationExperiment(object callback, bool forceGlobal)
{
ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
if (!forceGlobal && tl != null)
{
tl.workStealingQueue.LocalPush(callback);
return;
}

ConcurrentQueue<object> queue;

// This is a rough and experimental attempt at identifying work items that should be lower priority than other
// global work items (even ones that haven't been queued yet), and to queue them to a low-priority global queue that
// is checked after all other global queues. In some cases, a work item may queue another work item that is part of
// the same set of work. For global work items, the second work item would typically get queued behind other global
// work items. In some cases involving a lot of sync-over-async, that can significantly delay worker threads from
// getting unblocked.
if (tl == null && callback is QueueUserWorkItemCallbackBase)
{
queue = lowPriorityWorkItems;
}
else if (s_assignableWorkItemQueueCount > 0 && tl != null)
{
queue = tl.assignedGlobalWorkItemQueue;
}
else
{
queue = workItems;
}

queue.Enqueue(callback);
}
#endif

public void EnqueueAtHighPriority(object workItem)
{
Debug.Assert((workItem is IThreadPoolWorkItem) ^ (workItem is Task));
Expand Down Expand Up @@ -691,6 +752,14 @@ internal static bool LocalFindAndPop(object callback)
}
}

#if CORECLR
// Check for low-priority work items
if (s_prioritizationExperiment && lowPriorityWorkItems.TryDequeue(out workItem))
{
return workItem;
}
#endif

// Try to steal from other threads' local work items
{
WorkStealingQueue localWsq = tl.workStealingQueue;
Expand Down Expand Up @@ -750,6 +819,13 @@ public long GlobalCount
get
{
long count = (long)highPriorityWorkItems.Count + workItems.Count;
#if CORECLR
if (s_prioritizationExperiment)
{
count += lowPriorityWorkItems.Count;
}
#endif

for (int i = 0; i < s_assignableWorkItemQueueCount; i++)
{
count += _assignableWorkItemQueues[i].Count;
Expand Down Expand Up @@ -1639,6 +1715,17 @@ internal static IEnumerable<object> GetQueuedWorkItems()
yield return workItem;
}

#if CORECLR
if (ThreadPoolWorkQueue.s_prioritizationExperiment)
{
// Enumerate low-priority global queue
foreach (object workItem in s_workQueue.lowPriorityWorkItems)
{
yield return workItem;
}
}
#endif

// Enumerate each local queue
foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues)
{
Expand Down
83 changes: 83 additions & 0 deletions src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,89 @@ public void ThreadPoolMinMaxThreadsEventTest()
}).Dispose();
}

[ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))]
public static void PrioritizationExperimentConfigVarTest()
{
// Avoid contaminating the main process' environment
RemoteExecutor.Invoke(() =>
{
// The actual test process below will inherit the config var
Environment.SetEnvironmentVariable("DOTNET_ThreadPool_PrioritizationExperiment", "1");

RemoteExecutor.Invoke(() =>
{
const int WorkItemCountPerKind = 100;

int completedWorkItemCount = 0;
var allWorkItemsCompleted = new AutoResetEvent(false);
Action<int> workItem = _ =>
{
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * 3)
{
allWorkItemsCompleted.Set();
}
};

var startTest = new ManualResetEvent(false);

var t = new Thread(() =>
{
// Enqueue global work from a non-thread-pool thread

startTest.CheckedWait();

for (int i = 0; i < WorkItemCountPerKind; i++)
{
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: false);
}
});
t.IsBackground = true;
t.Start();

ThreadPool.UnsafeQueueUserWorkItem(
_ =>
{
// Enqueue global work from a thread pool worker thread

startTest.CheckedWait();

for (int i = 0; i < WorkItemCountPerKind; i++)
{
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: false);
}
},
0,
preferLocal: false);

t = new Thread(() =>
{
// Enqueue local work from thread pool worker threads

Assert.True(WorkItemCountPerKind / 10 * 10 == WorkItemCountPerKind);
Action<int> localWorkItemEnqueuer = _ =>
{
for (int i = 0; i < WorkItemCountPerKind / 10; i++)
{
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: true);
}
};

startTest.CheckedWait();

for (int i = 0; i < 10; i++)
{
ThreadPool.UnsafeQueueUserWorkItem(localWorkItemEnqueuer, 0, preferLocal: false);
}
});
t.IsBackground = true;
t.Start();

startTest.Set();
allWorkItemsCompleted.CheckedWait();
}).Dispose();
}).Dispose();
}

public static bool IsThreadingAndRemoteExecutorSupported =>
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;

Expand Down
Loading