Skip to content

Commit

Permalink
Fix Http 1.1's read-ahead task management (#103257)
Browse files Browse the repository at this point in the history
* Fix Http 1.1's read-ahead task management

* Fix comment typo
  • Loading branch information
MihaZupan authored Jun 22, 2024
1 parent 50e1604 commit 3526759
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
{
if (connection is not null || _http11Connections.TryPop(out connection))
{
// If the connection is new, this check will always succeed as there is no scavenging task pending.
if (!connection.TryOwnScavengingTaskCompletion())
{
goto DisposeConnection;
}

// TryDequeueWaiter will prune completed requests from the head of the queue,
// so it's possible for it to return false even though we checked that Count != 0.
bool success = _http11RequestQueue.TryDequeueWaiter(this, out waiter);
Expand Down Expand Up @@ -144,10 +150,19 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
// and set the _http11RequestQueueIsEmptyAndNotDisposed flag to false, followed by multiple
// returning connections observing the flag and calling into this method before we clear the flag.
// This should be a relatively rare case, so the added contention should be minimal.

// We took ownership of the scavenging task completion.
// If we can't return the completion (the task already completed), we must dispose the connection.
if (!connection.TryReturnScavengingTaskCompletionOwnership())
{
goto DisposeConnection;
}

_http11Connections.Push(connection);
}
else
{
// We may be out of available connections, check if we should inject a new one.
CheckForHttp11ConnectionInjection();
}

Expand All @@ -163,11 +178,31 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
// before signaling the waiter. This is intentional, as the fact that
// this method was called indicates that the connection is either new,
// or was just returned to the pool and is still in a good state.
//
// We must, however, take ownership of the scavenging task completion as
// there is a small chance that such a task was started if the connection
// was briefly returned to the pool.
return;
}

// The request was already cancelled or handled by a different connection.

// We took ownership of the scavenging task completion.
// If we can't return the completion (the task already completed), we must dispose the connection.
if (!connection.TryReturnScavengingTaskCompletionOwnership())
{
goto DisposeConnection;
}

// Loop again to try to find another request to signal, or return the connection.
continue;

DisposeConnection:
// The scavenging task completed before we assigned a request to the connection.
// We've received EOF/erroneous data and the connection is not usable anymore.
// Throw it away and try again.
connection.Dispose();
connection = null;
}

if (_disposed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ internal sealed partial class HttpConnection : HttpConnectionBase
private const int ReadAheadTask_NotStarted = 0;
private const int ReadAheadTask_Started = 1;
private const int ReadAheadTask_CompletionReserved = 2;
private const int ReadAheadTask_Completed = 3;
private int _readAheadTaskStatus;
private ValueTask<int> _readAheadTask;
private ArrayBuffer _readBuffer;
Expand Down Expand Up @@ -118,8 +119,11 @@ private void Dispose(bool disposing)
}
}

private bool ReadAheadTaskHasStarted =>
_readAheadTaskStatus != ReadAheadTask_NotStarted;

/// <summary>Prepare an idle connection to be used for a new request.
/// The caller MUST call SendAsync afterwards if this method returns true.</summary>
/// The caller MUST call SendAsync afterwards if this method returns true, or dispose the connection if it returns false.</summary>
/// <param name="async">Indicates whether the coming request will be sync or async.</param>
/// <returns>True if connection can be used, false if it is invalid due to a timeout or receiving EOF or unexpected data.</returns>
public bool PrepareForReuse(bool async)
Expand All @@ -133,7 +137,9 @@ public bool PrepareForReuse(bool async)
// If the read-ahead task is completed, then we've received either EOF or erroneous data the connection, so it's not usable.
if (ReadAheadTaskHasStarted)
{
return TryOwnReadAheadTaskCompletion();
Debug.Assert(_readAheadTaskStatus is ReadAheadTask_Started or ReadAheadTask_Completed);

return Interlocked.Exchange(ref _readAheadTaskStatus, ReadAheadTask_CompletionReserved) == ReadAheadTask_Started;
}

// Check to see if we've received anything on the connection; if we have, that's
Expand Down Expand Up @@ -177,6 +183,35 @@ public bool PrepareForReuse(bool async)
}
}

/// <summary>Takes ownership of the scavenging task completion if it was started.
/// The caller MUST call either SendAsync or return the completion ownership afterwards if this method returns true, or dispose the connection if it returns false.</summary>
public bool TryOwnScavengingTaskCompletion()
{
Debug.Assert(_readAheadTaskStatus != ReadAheadTask_CompletionReserved);

return !ReadAheadTaskHasStarted
|| Interlocked.Exchange(ref _readAheadTaskStatus, ReadAheadTask_CompletionReserved) == ReadAheadTask_Started;
}

/// <summary>Returns ownership of the scavenging task completion if it was started.
/// The caller MUST Dispose the connection afterwards if this method returns false.</summary>
public bool TryReturnScavengingTaskCompletionOwnership()
{
Debug.Assert(_readAheadTaskStatus != ReadAheadTask_Started);

if (!ReadAheadTaskHasStarted ||
Interlocked.Exchange(ref _readAheadTaskStatus, ReadAheadTask_Started) == ReadAheadTask_CompletionReserved)
{
return true;
}

// The read-ahead task has started, and we failed to transition back to Started.
// This means that the read-ahead task has completed, and we can't reuse the connection. The caller must dispose it.
// We're still responsible for observing potential exceptions thrown by the read-ahead task to avoid leaking unobserved exceptions.
LogExceptions(_readAheadTask.AsTask());
return false;
}

/// <summary>Check whether a currently idle connection is still usable, or should be scavenged.</summary>
/// <returns>True if connection can be used, false if it is invalid due to a timeout or receiving EOF or unexpected data.</returns>
public override bool CheckUsabilityOnScavenge()
Expand All @@ -187,21 +222,7 @@ public override bool CheckUsabilityOnScavenge()
}

// We may already have a read-ahead task if we did a previous scavenge and haven't used the connection since.
EnsureReadAheadTaskHasStarted();

// If the read-ahead task is completed, then we've received either EOF or erroneous data the connection, so it's not usable.
return !_readAheadTask.IsCompleted;
}

private bool ReadAheadTaskHasStarted =>
_readAheadTaskStatus != ReadAheadTask_NotStarted;

private bool TryOwnReadAheadTaskCompletion() =>
Interlocked.CompareExchange(ref _readAheadTaskStatus, ReadAheadTask_CompletionReserved, ReadAheadTask_Started) == ReadAheadTask_Started;

private void EnsureReadAheadTaskHasStarted()
{
if (_readAheadTaskStatus == ReadAheadTask_NotStarted)
if (!ReadAheadTaskHasStarted)
{
Debug.Assert(_readAheadTask == default);

Expand All @@ -212,6 +233,9 @@ private void EnsureReadAheadTaskHasStarted()
#pragma warning restore CA2012
}

// If the read-ahead task is completed, then we've received either EOF or erroneous data the connection, so it's not usable.
return !_readAheadTask.IsCompleted;

async ValueTask<int> ReadAheadWithZeroByteReadAsync()
{
Debug.Assert(_readAheadTask == default);
Expand All @@ -231,19 +255,26 @@ async ValueTask<int> ReadAheadWithZeroByteReadAsync()
// PrepareForReuse will check TryOwnReadAheadTaskCompletion before calling into SendAsync.
// If we can own the completion from within the read-ahead task, it means that PrepareForReuse hasn't been called yet.
// In that case we've received EOF/erroneous data before we sent the request headers, and the connection can't be reused.
if (TryOwnReadAheadTaskCompletion())
if (TransitionToCompletedAndTryOwnCompletion())
{
if (NetEventSource.Log.IsEnabled()) Trace("Read-ahead task observed data before the request was sent.");
}

return read;
}
catch (Exception error) when (TryOwnReadAheadTaskCompletion())
catch (Exception error) when (TransitionToCompletedAndTryOwnCompletion())
{
if (NetEventSource.Log.IsEnabled()) Trace($"Error performing read ahead: {error}");

return 0;
}

bool TransitionToCompletedAndTryOwnCompletion()
{
Debug.Assert(_readAheadTaskStatus is ReadAheadTask_Started or ReadAheadTask_CompletionReserved);

return Interlocked.Exchange(ref _readAheadTaskStatus, ReadAheadTask_Completed) == ReadAheadTask_Started;
}
}
}

Expand Down Expand Up @@ -497,7 +528,8 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, boo
{
Debug.Assert(_currentRequest == null, $"Expected null {nameof(_currentRequest)}.");
Debug.Assert(_readBuffer.ActiveLength == 0, "Unexpected data in read buffer");
Debug.Assert(_readAheadTaskStatus != ReadAheadTask_Started);
Debug.Assert(_readAheadTaskStatus != ReadAheadTask_Started,
"The caller should have called PrepareForReuse or TryOwnScavengingTaskCompletion if the connection was idle on the pool.");
MarkConnectionAsNotIdle();
Expand Down

0 comments on commit 3526759

Please sign in to comment.