Skip to content

Race condition in ParallelForEachAsync when exceptions are thrown #57

Open
@danylofitel

Description

@danylofitel

ParallelForEachAsync sometimes does not throw an exception if individual iterations did throw exceptions.

It reproduces with the following code (derived from the scenario where I noticed it):

while (true)
{
    Console.WriteLine();
    IReadOnlyList<int> input = Enumerable.Range(0, 10).ToList();
    ConcurrentQueue<int> output = new ConcurrentQueue<int>();

    try
    {
        await input.ParallelForEachAsync(
            async item =>
            {
                if (item == 0)
                {
                    throw new AggregateException(new Exception("Individual task failed."));
                }

                await Task.Delay(1);
                output.Enqueue(item);
            },
            maxDegreeOfParallelism: 10,
            cancellationToken: default);
    }
    catch (Exception)
    {
        continue;
    }

    Console.WriteLine($"No exception. {input.Count} - {output.count}.");
}

The cancellation token does not get canceled, and the default values of breakLoopOnException: false and gracefulBreak: true are used. The first task always throws an exception, therefore the expectation is that remaining tasks would finish, and ParallelForEachAsync would throw an exception.
However, the code above will eventually reach the case where exception is not thrown, and only 9 items are added to the queue (Console.WriteLine() statement above).

In the ParallelForEachAsync implementation the main task that schedules individual iterations, as well as continuations of individual tasks all call OnOperationComplete() of ParallelForEachContext. OnOperationComplete() adds exception to the list of tracked exceptions if it was supplied, releases the semaphore and at the very end calls CompleteLoopNow() if all tasks have completed or cancellation was requested (in this specific case I was not cancelling any tasks, so it was only called when all tasks finish).

public void OnOperationComplete(Exception exceptionIfFailed = null)
{
    // Add exception to the list
    // Release the semaphore

    if ((_semaphore.CurrentCount == _maxDegreeOfParallelism + 1) || (IsLoopBreakRequested && !_gracefulBreak))
        CompleteLoopNow();
}

The problem occurs when the last few tasks release the semaphore at the same time, in which case
_semaphore.CurrentCount == _maxDegreeOfParallelism + 1 condition can be evaluated as true for multiple tasks, so CompleteLoopNow() can be called more than once.

public void CompleteLoopNow()
{
    Console.WriteLine("CompleteLoopNow - Start");
    _cancellationTokenRegistration.Dispose();

    try
    {
        if (_semaphore != null)
            _semaphore.Dispose();
    }
    catch
    {
    }

    var exceptions = ReadExceptions();
    var aggregatedException = exceptions?.Count > 0 ? new ParallelForEachException(exceptions) : null;

    if (_cancellationToken.IsCancellationRequested)
    {
        Console.WriteLine("CompleteLoopNow - OperationCanceledException");
        _ = _completionTcs.TrySetException(
            new OperationCanceledException(
                new OperationCanceledException().Message,
                aggregatedException,
                _cancellationToken));
    }
    else if (exceptions?.Count > 0)
    {
        Console.WriteLine("CompleteLoopNow - TrySetException");
        _ = _completionTcs.TrySetException(aggregatedException);
    }
    else
    {
        Console.WriteLine("CompleteLoopNow - TrySetResult");
        _ = _completionTcs.TrySetResult(null);
    }
}

Which means that multiple tasks can also enter ReadExceptions() concurrently.

public List<Exception> ReadExceptions()
{
    Console.WriteLine("ReadExceptions - Start");
    bool lockTaken = false;
    while (!lockTaken)
        _exceptionListLock.Enter(ref lockTaken);
    try
    {
        Console.WriteLine("ReadExceptions - Returning");
        return _exceptionList;
    }
    finally
    {
        _exceptionList = null;
        _exceptionListLock.Exit(useMemoryBarrier: false);
        Console.WriteLine("ReadExceptions - End");
    }
}

However, in the finally block the exception list is set to null, so the first task calling it will get the full list of exceptions back, and subsequent tasks will get a null. Then in CompleteLoopNow() it is possible that a task with null exception list calls TrySetResult() before the a task with the correct exception list calls TrySetException().

I debugged with the same Console.WriteLine statements as above, and in cases where ParallelForEachAsync() did not throw I saw the following output

CompleteLoopNow - Start              // Task A entering CompleteLoopNow()
ReadExceptions - Start               // Task A entering ReadExceptions()
ReadExceptions - Returning           // Task A returning a full list of exceptions
ReadExceptions - End                 // Task A setting the list of exceptions to null
CompleteLoopNow - Start              // Task B entering CompleteLoopNow()
ReadExceptions - Start               // Task B entering ReadExceptions()
ReadExceptions - Returning           // Task B returning null as the list of exceptions
ReadExceptions - End                 // Task B setting the list of exceptions to null
CompleteLoopNow - TrySetResult       // Task B setting result on the task completion source since it got null from ReadExceptions()
CompleteLoopNow - TrySetException    // Task A setting exception on the task completion source since it got a non-empty list of exceptions from ReadExceptions()

I'm not sure whether ReadExceptions() needs to reset exception list to null. One possible reason is to prevent a race condition for the case where the loop was canceled, in which case continuations of tasks that are still running can keep adding exceptions to the list, but the same list is returned from ReadExceptions() to CompleteLoopNow(). However, in this case it's possible to return a copy of the exception list from ReadExceptions(), i.e.

public List<Exception> ReadExceptions()
{
    bool lockTaken = false;
    while (!lockTaken)
        _exceptionListLock.Enter(ref lockTaken);
    try
    {
        // Return a copy, so the list being returned will not be modified
        // by tasks that are still running if the loop was canceled
        return new List<Exception>(_exceptionList ?? Enumerable.Empty<Exception>());
    }
    finally
    {
        _exceptionListLock.Exit(useMemoryBarrier: false);
    }
}

Another option is to prevent tasks from re-entering CompleteLoopNow().

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions