Skip to content

HTTP/3: Connection shutdown improvements #34968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private void ExtraFeatureSet(Type key, object? value)
feature = ExtraFeatureGet(key);
}

return feature ?? ConnectionFeatures[key];
return feature ?? ConnectionFeatures?[key];
}

set
Expand Down Expand Up @@ -564,7 +564,7 @@ private void ExtraFeatureSet(Type key, object? value)
feature = (TFeature?)(ExtraFeatureGet(typeof(TFeature)));
}

if (feature == null)
if (feature == null && ConnectionFeatures != null)
{
feature = ConnectionFeatures.Get<TFeature>();
}
Expand Down
178 changes: 111 additions & 67 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ internal async ValueTask SendStreamIdAsync(long id)

internal ValueTask<FlushResult> SendGoAway(long id)
{
Log.Http3GoAwayStreamId(_context.ConnectionId, id);
return _frameWriter.WriteGoAway(id);
}

Expand Down Expand Up @@ -343,6 +344,11 @@ private void ProcessSetting(long id, long value)
}
}

internal ValueTask CompleteAsync()
{
return _frameWriter.CompleteAsync();
}

private ValueTask ProcessGoAwayFrameAsync()
{
EnsureSettingsFrame(Http3FrameType.GoAway);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public static string ToFormattedType(Http3FrameType type)
Http3FrameType.CancelPush => "CANCEL_PUSH",
Http3FrameType.Settings => "SETTINGS",
Http3FrameType.PushPromise => "PUSH_PROMISE",
Http3FrameType.GoAway => "GO_AWAY",
Http3FrameType.GoAway => "GOAWAY",
Http3FrameType.MaxPushId => "MAX_PUSH_ID",
_ => type.ToString()
};
Expand Down
8 changes: 5 additions & 3 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,6 @@ protected override void OnRequestProcessingEnded()

private void CompleteStream(bool errored)
{
Debug.Assert(_appCompleted != null);
_appCompleted.SetResult();

if (!EndStreamReceived)
{
if (!errored)
Expand All @@ -395,6 +392,11 @@ private void CompleteStream(bool errored)

TryClose();
}

// Stream will be pooled after app completed.
// Wait to signal app completed after any potential aborts on the stream.
Debug.Assert(_appCompleted != null);
_appCompleted.SetResult();
}

private bool TryClose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ internal interface IKestrelTrace : ILogger

void Http3ConnectionClosing(string connectionId);

void Http3ConnectionClosed(string connectionId, long highestOpenedStreamId);
void Http3ConnectionClosed(string connectionId, long? highestOpenedStreamId);

void Http3StreamAbort(string traceIdentifier, Http3ErrorCode error, ConnectionAbortedException abortReason);

Expand All @@ -99,5 +99,7 @@ internal interface IKestrelTrace : ILogger
void QPackEncodingError(string connectionId, long streamId, Exception ex);

void Http3OutboundControlStreamError(string connectionId, Exception ex);

void Http3GoAwayStreamId(string connectionId, long goAwayStreamId);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.IO;
using System.Net.Http;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2;
Expand Down Expand Up @@ -322,9 +323,9 @@ public void Http3ConnectionClosing(string connectionId)
}

[LoggerMessage(44, LogLevel.Debug, @"Connection id ""{ConnectionId}"" is closed. The last processed stream ID was {HighestOpenedStreamId}.", EventName = "Http3ConnectionClosed")]
private static partial void Http3ConnectionClosed(ILogger logger, string connectionId, long highestOpenedStreamId);
private static partial void Http3ConnectionClosed(ILogger logger, string connectionId, long? highestOpenedStreamId);

public void Http3ConnectionClosed(string connectionId, long highestOpenedStreamId)
public void Http3ConnectionClosed(string connectionId, long? highestOpenedStreamId)
{
Http3ConnectionClosed(_http3Logger, connectionId, highestOpenedStreamId);
}
Expand Down Expand Up @@ -362,6 +363,14 @@ public void Http3FrameSending(string connectionId, long streamId, Http3RawFrame
}
}

[LoggerMessage(50, LogLevel.Debug, @"Connection id ""{ConnectionId}"": Unexpected error when initializing outbound control stream.", EventName = "Http3OutboundControlStreamError")]
private static partial void Http3OutboundControlStreamError(ILogger logger, string connectionId, Exception ex);

public void Http3OutboundControlStreamError(string connectionId, Exception ex)
{
Http3OutboundControlStreamError(_http3Logger, connectionId, ex);
}

[LoggerMessage(51, LogLevel.Debug, @"Connection id ""{ConnectionId}"": QPACK decoding error while decoding headers for stream ID {StreamId}.", EventName = "QPackDecodingError")]
private static partial void QPackDecodingError(ILogger logger, string connectionId, long streamId, Exception ex);

Expand All @@ -378,12 +387,12 @@ public virtual void QPackEncodingError(string connectionId, long streamId, Excep
QPackEncodingError(_http3Logger, connectionId, streamId, ex);
}

[LoggerMessage(50, LogLevel.Debug, @"Connection id ""{ConnectionId}"": Unexpected error when initializing outbound control stream.", EventName = "Http3OutboundControlStreamError")]
private static partial void Http3OutboundControlStreamError(ILogger logger, string connectionId, Exception ex);
[LoggerMessage(53, LogLevel.Debug, @"Connection id ""{ConnectionId}"": GOAWAY stream ID {GoAwayStreamId}.", EventName = "Http3GoAwayHighestOpenedStreamId")]
private static partial void Http3GoAwayStreamId(ILogger logger, string connectionId, long goAwayStreamId);

public void Http3OutboundControlStreamError(string connectionId, Exception ex)
public void Http3GoAwayStreamId(string connectionId, long goAwayStreamId)
{
Http3OutboundControlStreamError(_http3Logger, connectionId, ex);
Http3GoAwayStreamId(_http3Logger, connectionId, goAwayStreamId);
}

public virtual void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public async Task<EndPoint> BindAsync(EndPoint endPoint, MultiplexedConnectionDe
var sslServerAuthenticationOptions = new SslServerAuthenticationOptions
{
ServerCertificate = listenOptions.HttpsOptions.ServerCertificate,
ApplicationProtocols = new List<SslApplicationProtocol>() { new SslApplicationProtocol("h3") }
ApplicationProtocols = new List<SslApplicationProtocol>() { new SslApplicationProtocol("h3"), new SslApplicationProtocol("h3-29") }
};

features.Set(sslServerAuthenticationOptions);
Expand Down
12 changes: 6 additions & 6 deletions src/Servers/Kestrel/Transport.Quic/src/Internal/IQuicTrace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ internal interface IQuicTrace : ILogger
void AcceptedStream(QuicStreamContext streamContext);
void ConnectedStream(QuicStreamContext streamContext);
void ConnectionError(BaseConnectionContext connection, Exception ex);
void ConnectionAborted(BaseConnectionContext connection, Exception ex);
void ConnectionAbort(BaseConnectionContext connection, string reason);
void ConnectionAborted(BaseConnectionContext connection, long errorCode, Exception ex);
void ConnectionAbort(BaseConnectionContext connection, long errorCode, string reason);
void StreamError(QuicStreamContext streamContext, Exception ex);
void StreamPause(QuicStreamContext streamContext);
void StreamResume(QuicStreamContext streamContext);
void StreamShutdownWrite(QuicStreamContext streamContext, string reason);
void StreamAborted(QuicStreamContext streamContext, Exception ex);
void StreamAbort(QuicStreamContext streamContext, string reason);
void StreamAbortRead(QuicStreamContext streamContext, string reason);
void StreamAbortWrite(QuicStreamContext streamContext, string reason);
void StreamAborted(QuicStreamContext streamContext, long errorCode, Exception ex);
void StreamAbort(QuicStreamContext streamContext, long errorCode, string reason);
void StreamAbortRead(QuicStreamContext streamContext, long errorCode, string reason);
void StreamAbortWrite(QuicStreamContext streamContext, long errorCode, string reason);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Diagnostics;
using System.Net.Quic;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
Expand All @@ -23,12 +24,14 @@ internal partial class QuicConnectionContext : TransportMultiplexedConnection
private long _heartbeatTicks;
private readonly object _poolLock = new object();

private readonly object _shutdownLock = new object();
private readonly QuicConnection _connection;
private readonly QuicTransportContext _context;
private readonly IQuicTrace _log;
private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();

private Task? _closeTask;
private ExceptionDispatchInfo? _abortReason;

internal const int InitialStreamPoolSize = 5;
internal const int MaxStreamPoolSize = 100;
Expand All @@ -53,7 +56,11 @@ public override async ValueTask DisposeAsync()
{
try
{
_closeTask ??= _connection.CloseAsync(errorCode: 0).AsTask();
lock (_shutdownLock)
{
_closeTask ??= _connection.CloseAsync(errorCode: 0).AsTask();
}

await _closeTask;
}
catch (Exception ex)
Expand All @@ -68,9 +75,18 @@ public override async ValueTask DisposeAsync()

public override void Abort(ConnectionAbortedException abortReason)
{
// dedup calls to abort here.
_log.ConnectionAbort(this, abortReason.Message);
_closeTask = _connection.CloseAsync(errorCode: Error).AsTask();
lock (_shutdownLock)
{
// Check if connection has already been already aborted.
if (_abortReason != null)
{
return;
}

_abortReason = ExceptionDispatchInfo.Capture(abortReason);
_log.ConnectionAbort(this, Error, abortReason.Message);
_closeTask = _connection.CloseAsync(errorCode: Error).AsTask();
}
}

public override async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -111,22 +127,46 @@ public override void Abort(ConnectionAbortedException abortReason)
catch (QuicConnectionAbortedException ex)
{
// Shutdown initiated by peer, abortive.
_log.ConnectionAborted(this, ex);
Error = ex.ErrorCode;
_log.ConnectionAborted(this, ex.ErrorCode, ex);

ThreadPool.UnsafeQueueUserWorkItem(state =>
{
state.CancelConnectionClosedToken();
},
this,
preferLocal: false);

// Throw error so consumer sees the connection is aborted by peer.
throw new ConnectionResetException(ex.Message, ex);
}
catch (QuicOperationAbortedException)
{
// Shutdown initiated by us
// Shutdown initiated by us.

// Allow for graceful closure.
lock (_shutdownLock)
{
// Connection has been aborted. Throw reason exception.
_abortReason?.Throw();
}
}
catch (OperationCanceledException)
{
// Shutdown initiated by us.

lock (_shutdownLock)
{
// Connection has been aborted. Throw reason exception.
_abortReason?.Throw();
}
}
catch (Exception ex)
{
Debug.Fail($"Unexpected exception in {nameof(QuicConnectionContext)}.{nameof(AcceptAsync)}: {ex}");
throw;
}

// Return null for graceful closure or cancellation.
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void AbortRead(long errorCode, ConnectionAbortedException abortReason)
if (_stream.CanRead)
{
_shutdownReadReason = abortReason;
_log.StreamAbortRead(this, abortReason.Message);
_log.StreamAbortRead(this, errorCode, abortReason.Message);
_stream.AbortRead(errorCode);
}
else
Expand All @@ -51,7 +51,7 @@ public void AbortWrite(long errorCode, ConnectionAbortedException abortReason)
if (_stream.CanWrite)
{
_shutdownWriteReason = abortReason;
_log.StreamAbortWrite(this, abortReason.Message);
_log.StreamAbortWrite(this, errorCode, abortReason.Message);
_stream.AbortWrite(errorCode);
}
else
Expand All @@ -68,7 +68,6 @@ private void InitializeFeatures()
_currentIProtocolErrorCodeFeature = this;
_currentIStreamIdFeature = this;
_currentIStreamAbortFeature = this;
_currentITlsConnectionFeature = _connection._currentITlsConnectionFeature;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public QuicStreamContext(QuicConnectionContext connection, QuicTransportContext
_context = context;
_log = context.Log;
MemoryPool = connection.MemoryPool;
MultiplexedConnectionFeatures = connection.Features;

RemoteEndPoint = connection.RemoteEndPoint;
LocalEndPoint = connection.LocalEndPoint;
Expand Down Expand Up @@ -213,7 +214,7 @@ private async Task DoReceive()
{
// Abort from peer.
Error = ex.ErrorCode;
_log.StreamAborted(this, ex);
_log.StreamAborted(this, ex.ErrorCode, ex);

// This could be ignored if _shutdownReason is already set.
error = new ConnectionResetException(ex.Message, ex);
Expand All @@ -223,8 +224,11 @@ private async Task DoReceive()
catch (QuicOperationAbortedException ex)
{
// AbortRead has been called for the stream.
// Possibily might also get here from connection closing.
// System.Net.Quic exception handling not finalized.
error = new ConnectionAbortedException(ex.Message, ex);
}
catch (QuicConnectionAbortedException ex)
{
// Connection has aborted.
error = ex;
}
catch (Exception ex)
Expand Down Expand Up @@ -316,7 +320,18 @@ private async Task DoSend()
{
// Abort from peer.
Error = ex.ErrorCode;
_log.StreamAborted(this, ex);
_log.StreamAborted(this, ex.ErrorCode, ex);

// This could be ignored if _shutdownReason is already set.
shutdownReason = new ConnectionResetException(ex.Message, ex);

_clientAbort = true;
}
catch (QuicConnectionAbortedException ex)
{
// Abort from peer.
Error = ex.ErrorCode;
_log.StreamAborted(this, ex.ErrorCode, ex);

// This could be ignored if _shutdownReason is already set.
shutdownReason = new ConnectionResetException(ex.Message, ex);
Expand Down Expand Up @@ -359,7 +374,7 @@ public override void Abort(ConnectionAbortedException abortReason)

_serverAborted = true;

_log.StreamAbort(this, abortReason.Message);
_log.StreamAbort(this, Error, abortReason.Message);

lock (_shutdownLock)
{
Expand Down
Loading