Skip to content

[API Proposal]: Support specifying global degree of parallelism for Parallel.ForEachAsync by providing a ConcurrencyLimiter #78753

Open

Description

Background and motivation

The maximum degree of parallelism of a Parallel.ForEachAsync invocation is constrained by setting the ParallelOptions.MaxDegreeOfParallelism property. However, this constrains the degree of parallelism per invocation; not globally.

If a method that invokes Parallel.ForEachAsync is itself invoked on multiple threads (e.g., per received HTTP request), then this has a multiplying effect and can overwhelm the server. For example, a web server may use a ConcurrencyLimiter to constrain to handling no more than 100 simultaneous requests, each of which invoking a method that invokes Parallel.ForEachAsync with ParallelOptions.MaxDegreeOfParallelism also set to 100. This results in the body function passed to the Parallel.ForEachAsync method being invoked with a maximum effective concurrency of 10,000.

Ideally, each invocation of Parallel.ForEachAsync could be constrained using a global ConcurrencyLimiter for that invocation. This could be achieved by adding a ConcurrencyLimiter property to the ParallelOptions class. Thus in the above example, a static/global ConcurrencyLimiter could be defined in the class that contains the method that invokes Parallel.ForEachAsync and given to the ParallelOptions parameter, providing a global (process-wide) limit to the number of concurrent tasks invoked by that invocation of Parallel.ForEachAsync across all HTTP requests.

Furthermore, other invocations of Parallel.ForEachAsync in other places in the code can each set/use their own global limits, each governed by its own ConcurrencyLimiter instance.

Note that while the body function passed to the Parallel.ForEachAsync method can use a ConcurrencyLimiter internally to limit the global concurrency of given body function, the Parallel.ForEachAsync method would continue to consume items from the given source enumerable parameter up until the permitted degree of parallelism specified by the ParallelOptions.MaxDegreeOfParallelism property. Thus if the source enumerable is IAsyncEnumerable<T>, and the items being enumerated are very large in number and not all in memory (i.e., streamed from network/storage), and the Parallel.ForEachAsync consumes items from the given source enumerable faster than the given body function can process them, then this could lead to the server running out of memory.

However, if the Parallel.ForEachAsync method is ConcurrencyLimiter-aware, then it knows to only consume the next item from the given source enumerable when there is sufficient capacity to process it.

API Proposal

namespace System.Threading.Tasks;

public class ParallelOptions
{
    public ConcurrencyLimiter ConcurrencyLimiter { get; set; }
}

API Usage

public class RequestHandler
{
  private static readonly ConcurrencyLimiter s_concurrencyLimiter = new(new() 
  { 
    PermitLimit = 100, 
    QueueLimit = 0, 
    QueueProcessingOrder = QueueProcessingOrder.OldestFirst
  });

  public async Task Handle(Request request, CancellationToken cancellationToken)
  {
    await Parallel.ForEachAsync(request.Items, new ParallelOptions
    {
      CancellationToken = cancellationToken,
      ConcurrencyLimiter = s_concurrencyLimiter
    }, async (item, cancellationToken) =>
    {
      // Handle each item
    });
  }
}

Alternative Designs

No response

Risks

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions