Description
Background and motivation
The maximum degree of parallelism of a Parallel.ForEachAsync
invocation is constrained by setting the ParallelOptions.MaxDegreeOfParallelism
property. However, this constrains the degree of parallelism per invocation; not globally.
If a method that invokes Parallel.ForEachAsync
is itself invoked on multiple threads (e.g., per received HTTP request), then this has a multiplying effect and can overwhelm the server. For example, a web server may use a ConcurrencyLimiter
to constrain to handling no more than 100 simultaneous requests, each of which invoking a method that invokes Parallel.ForEachAsync
with ParallelOptions.MaxDegreeOfParallelism
also set to 100. This results in the body
function passed to the Parallel.ForEachAsync
method being invoked with a maximum effective concurrency of 10,000.
Ideally, each invocation of Parallel.ForEachAsync
could be constrained using a global ConcurrencyLimiter
for that invocation. This could be achieved by adding a ConcurrencyLimiter
property to the ParallelOptions
class. Thus in the above example, a static/global ConcurrencyLimiter
could be defined in the class that contains the method that invokes Parallel.ForEachAsync
and given to the ParallelOptions
parameter, providing a global (process-wide) limit to the number of concurrent tasks invoked by that invocation of Parallel.ForEachAsync
across all HTTP requests.
Furthermore, other invocations of Parallel.ForEachAsync
in other places in the code can each set/use their own global limits, each governed by its own ConcurrencyLimiter
instance.
Note that while the body
function passed to the Parallel.ForEachAsync
method can use a ConcurrencyLimiter
internally to limit the global concurrency of given body
function, the Parallel.ForEachAsync
method would continue to consume items from the given source
enumerable parameter up until the permitted degree of parallelism specified by the ParallelOptions.MaxDegreeOfParallelism
property. Thus if the source
enumerable is IAsyncEnumerable<T>
, and the items being enumerated are very large in number and not all in memory (i.e., streamed from network/storage), and the Parallel.ForEachAsync
consumes items from the given source
enumerable faster than the given body
function can process them, then this could lead to the server running out of memory.
However, if the Parallel.ForEachAsync
method is ConcurrencyLimiter
-aware, then it knows to only consume the next item from the given source
enumerable when there is sufficient capacity to process it.
API Proposal
namespace System.Threading.Tasks;
public class ParallelOptions
{
public ConcurrencyLimiter ConcurrencyLimiter { get; set; }
}
API Usage
public class RequestHandler
{
private static readonly ConcurrencyLimiter s_concurrencyLimiter = new(new()
{
PermitLimit = 100,
QueueLimit = 0,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst
});
public async Task Handle(Request request, CancellationToken cancellationToken)
{
await Parallel.ForEachAsync(request.Items, new ParallelOptions
{
CancellationToken = cancellationToken,
ConcurrencyLimiter = s_concurrencyLimiter
}, async (item, cancellationToken) =>
{
// Handle each item
});
}
}
Alternative Designs
No response
Risks
No response