diff --git a/benchmarks/Kestrel.Performance/Http1WritingBenchmark.cs b/benchmarks/Kestrel.Performance/Http1WritingBenchmark.cs index a71d06743..f272a24b6 100644 --- a/benchmarks/Kestrel.Performance/Http1WritingBenchmark.cs +++ b/benchmarks/Kestrel.Performance/Http1WritingBenchmark.cs @@ -12,6 +12,7 @@ using Microsoft.AspNetCore.Server.Kestrel.Core; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal; using Microsoft.AspNetCore.Testing; @@ -113,6 +114,7 @@ private TestHttp1Connection MakeHttp1Connection() ServiceContext = serviceContext, ConnectionFeatures = new FeatureCollection(), MemoryPool = _memoryPool, + TimeoutControl = new TimeoutControl(timeoutHandler: null), Transport = pair.Transport }); diff --git a/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs b/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs index 202676621..c2c94a10c 100644 --- a/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs +++ b/src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs @@ -48,7 +48,7 @@ public Http1OutputProducer( _connectionContext = connectionContext; _log = log; _minResponseDataRateFeature = minResponseDataRateFeature; - _flusher = new TimingPipeFlusher(pipeWriter, timeoutControl); + _flusher = new TimingPipeFlusher(pipeWriter, timeoutControl, log); } public Task WriteDataAsync(ReadOnlySpan buffer, CancellationToken cancellationToken = default) diff --git a/src/Kestrel.Core/Internal/Http2/Http2Connection.cs b/src/Kestrel.Core/Internal/Http2/Http2Connection.cs index 57bcb1eee..859862a54 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2Connection.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2Connection.cs @@ -93,15 +93,27 @@ public Http2Connection(HttpConnectionContext context) var http2Limits = httpLimits.Http2; _context = context; - _frameWriter = new Http2FrameWriter(context.Transport.Output, context.ConnectionContext, this, _outputFlowControl, context.TimeoutControl, context.ConnectionId, context.ServiceContext.Log); + + _frameWriter = new Http2FrameWriter( + context.Transport.Output, + context.ConnectionContext, + this, + _outputFlowControl, + context.TimeoutControl, + httpLimits.MinResponseDataRate, + context.ConnectionId, + context.ServiceContext.Log); + + _hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize); + + var connectionWindow = (uint)http2Limits.InitialConnectionWindowSize; + _inputFlowControl = new InputFlowControl(connectionWindow, connectionWindow / 2); + _serverSettings.MaxConcurrentStreams = (uint)http2Limits.MaxStreamsPerConnection; _serverSettings.MaxFrameSize = (uint)http2Limits.MaxFrameSize; _serverSettings.HeaderTableSize = (uint)http2Limits.HeaderTableSize; - _hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize); _serverSettings.MaxHeaderListSize = (uint)httpLimits.MaxRequestHeadersTotalSize; _serverSettings.InitialWindowSize = (uint)http2Limits.InitialStreamWindowSize; - var connectionWindow = (uint)http2Limits.InitialConnectionWindowSize; - _inputFlowControl = new InputFlowControl(connectionWindow, connectionWindow / 2); } public string ConnectionId => _context.ConnectionId; diff --git a/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs b/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs index abaea5924..e8e3ccaf0 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs @@ -23,22 +23,25 @@ public class Http2FrameWriter // Literal Header Field without Indexing - Indexed Name (Index 8 - :status) private static readonly byte[] _continueBytes = new byte[] { 0x08, 0x03, (byte)'1', (byte)'0', (byte)'0' }; - private uint _maxFrameSize = Http2PeerSettings.MinAllowedMaxFrameSize; - private byte[] _headerEncodingBuffer; - private Http2Frame _outgoingFrame; private readonly object _writeLock = new object(); + private readonly Http2Frame _outgoingFrame; private readonly HPackEncoder _hpackEncoder = new HPackEncoder(); private readonly PipeWriter _outputWriter; - private bool _aborted; private readonly ConnectionContext _connectionContext; private readonly Http2Connection _http2Connection; private readonly OutputFlowControl _connectionOutputFlowControl; private readonly string _connectionId; private readonly IKestrelTrace _log; private readonly ITimeoutControl _timeoutControl; + private readonly MinDataRate _minResponseDataRate; private readonly TimingPipeFlusher _flusher; + private uint _maxFrameSize = Http2PeerSettings.MinAllowedMaxFrameSize; + private byte[] _headerEncodingBuffer; + private long _unflushedBytes; + private bool _completed; + private bool _aborted; public Http2FrameWriter( PipeWriter outputPipeWriter, @@ -46,6 +49,7 @@ public Http2FrameWriter( Http2Connection http2Connection, OutputFlowControl connectionOutputFlowControl, ITimeoutControl timeoutControl, + MinDataRate minResponseDataRate, string connectionId, IKestrelTrace log) { @@ -56,7 +60,8 @@ public Http2FrameWriter( _connectionId = connectionId; _log = log; _timeoutControl = timeoutControl; - _flusher = new TimingPipeFlusher(_outputWriter, timeoutControl); + _minResponseDataRate = minResponseDataRate; + _flusher = new TimingPipeFlusher(_outputWriter, timeoutControl, log); _outgoingFrame = new Http2Frame(); _headerEncodingBuffer = new byte[_maxFrameSize]; } @@ -112,8 +117,11 @@ public Task FlushAsync(IHttpOutputAborter outputAborter, CancellationToken cance { return Task.CompletedTask; } + + var bytesWritten = _unflushedBytes; + _unflushedBytes = 0; - return _flusher.FlushAsync(outputAborter, cancellationToken); + return _flusher.FlushAsync(_minResponseDataRate, bytesWritten, outputAborter, cancellationToken); } } @@ -130,7 +138,7 @@ public Task Write100ContinueAsync(int streamId) _outgoingFrame.PayloadLength = _continueBytes.Length; WriteHeaderUnsynchronized(); _outputWriter.Write(_continueBytes); - return _flusher.FlushAsync(); + return TimeFlushUnsynchronizedAsync(); } } @@ -195,7 +203,7 @@ public Task WriteResponseTrailers(int streamId, HttpResponseTrailers headers) _http2Connection.Abort(new ConnectionAbortedException(hex.Message, hex)); } - return _flusher.FlushAsync(); + return TimeFlushUnsynchronizedAsync(); } } @@ -228,7 +236,7 @@ private void FinishWritingHeaders(int streamId, int payloadLength, bool done) } } - public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, MinDataRate minRate, ReadOnlySequence data, bool endStream) + public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, ReadOnlySequence data, bool endStream) { // The Length property of a ReadOnlySequence can be expensive, so we cache the value. var dataLength = data.Length; @@ -244,12 +252,13 @@ public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, Mi // https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1 if (dataLength != 0 && dataLength > flowControl.Available) { - return WriteDataAsyncAwaited(streamId, minRate, flowControl, data, dataLength, endStream); + return WriteDataAsync(streamId, flowControl, data, dataLength, endStream); } // This cast is safe since if dataLength would overflow an int, it's guaranteed to be greater than the available flow control window. flowControl.Advance((int)dataLength); - return WriteDataUnsynchronizedAsync(streamId, minRate, data, dataLength, endStream); + WriteDataUnsynchronized(streamId, data, dataLength, endStream); + return TimeFlushUnsynchronizedAsync(); } } @@ -262,7 +271,7 @@ public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, Mi | Padding (*) ... +---------------------------------------------------------------+ */ - private Task WriteDataUnsynchronizedAsync(int streamId, MinDataRate minRate, ReadOnlySequence data, long dataLength, bool endStream) + private void WriteDataUnsynchronized(int streamId, ReadOnlySequence data, long dataLength, bool endStream) { // Note padding is not implemented _outgoingFrame.PrepareData(streamId); @@ -301,17 +310,14 @@ private Task WriteDataUnsynchronizedAsync(int streamId, MinDataRate minRate, Rea } // Plus padding - - return _flusher.FlushAsync(minRate, dataLength); } - private async Task WriteDataAsyncAwaited(int streamId, MinDataRate minRate, StreamOutputFlowControl flowControl, ReadOnlySequence data, long dataLength, bool endStream) + private async Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, ReadOnlySequence data, long dataLength, bool endStream) { while (dataLength > 0) { OutputFlowControlAwaitable availabilityAwaitable; var writeTask = Task.CompletedTask; - int actual; lock (_writeLock) { @@ -320,24 +326,33 @@ private async Task WriteDataAsyncAwaited(int streamId, MinDataRate minRate, Stre break; } - actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityAwaitable); + var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityAwaitable); if (actual > 0) { - // Don't pass minRate through to the inner WriteData calls. - // We measure this ourselves below so we account for flow control in addition to socket backpressure. if (actual < dataLength) { - writeTask = WriteDataUnsynchronizedAsync(streamId, null, data.Slice(0, actual), actual, endStream: false); + WriteDataUnsynchronized(streamId, data.Slice(0, actual), actual, endStream: false); data = data.Slice(actual); dataLength -= actual; } else { - writeTask = WriteDataUnsynchronizedAsync(streamId, null, data, actual, endStream); + WriteDataUnsynchronized(streamId, data, actual, endStream); dataLength = 0; } + + // Don't call TimeFlushUnsynchronizedAsync() since we time this write while also accounting for + // flow control induced backpressure below. + writeTask = _flusher.FlushAsync(); } + + if (_minResponseDataRate != null) + { + _timeoutControl.BytesWrittenToBuffer(_minResponseDataRate, _unflushedBytes); + } + + _unflushedBytes = 0; } // Avoid timing writes that are already complete. This is likely to happen during the last iteration. @@ -346,9 +361,9 @@ private async Task WriteDataAsyncAwaited(int streamId, MinDataRate minRate, Stre continue; } - if (minRate != null) + if (_minResponseDataRate != null) { - _timeoutControl.StartTimingWrite(minRate, actual); + _timeoutControl.StartTimingWrite(); } // This awaitable releases continuations in FIFO order when the window updates. @@ -360,7 +375,7 @@ private async Task WriteDataAsyncAwaited(int streamId, MinDataRate minRate, Stre await writeTask; - if (minRate != null) + if (_minResponseDataRate != null) { _timeoutControl.StopTimingWrite(); } @@ -389,7 +404,7 @@ public Task WriteWindowUpdateAsync(int streamId, int sizeIncrement) var buffer = _outputWriter.GetSpan(4); Bitshifter.WriteUInt31BigEndian(buffer, (uint)sizeIncrement); _outputWriter.Advance(4); - return _flusher.FlushAsync(); + return TimeFlushUnsynchronizedAsync(); } } @@ -413,7 +428,7 @@ public Task WriteRstStreamAsync(int streamId, Http2ErrorCode errorCode) BinaryPrimitives.WriteUInt32BigEndian(buffer, (uint)errorCode); _outputWriter.Advance(4); - return _flusher.FlushAsync(); + return TimeFlushUnsynchronizedAsync(); } } @@ -443,7 +458,7 @@ public Task WriteSettingsAsync(IList settings) WriteSettings(settings, buffer); _outputWriter.Advance(settingsSize); - return _flusher.FlushAsync(); + return TimeFlushUnsynchronizedAsync(); } } @@ -469,7 +484,7 @@ public Task WriteSettingsAckAsync() _outgoingFrame.PrepareSettings(Http2SettingsFrameFlags.ACK); WriteHeaderUnsynchronized(); - return _flusher.FlushAsync(); + return TimeFlushUnsynchronizedAsync(); } } @@ -497,7 +512,7 @@ public Task WritePingAsync(Http2PingFrameFlags flags, ReadOnlySequence pay _outputWriter.Write(segment.Span); } - return _flusher.FlushAsync(); + return TimeFlushUnsynchronizedAsync(); } } @@ -523,7 +538,7 @@ public Task WriteGoAwayAsync(int lastStreamId, Http2ErrorCode errorCode) BinaryPrimitives.WriteUInt32BigEndian(buffer, (uint)errorCode); _outputWriter.Advance(8); - return _flusher.FlushAsync(); + return TimeFlushUnsynchronizedAsync(); } } @@ -531,6 +546,9 @@ private void WriteHeaderUnsynchronized() { _log.Http2FrameSending(_connectionId, _outgoingFrame); WriteHeader(_outgoingFrame, _outputWriter); + + // We assume the payload will be written prior to the next flush. + _unflushedBytes += Http2FrameReader.HeaderLength + _outgoingFrame.PayloadLength; } /* https://tools.ietf.org/html/rfc7540#section-4.1 @@ -560,6 +578,14 @@ internal static void WriteHeader(Http2Frame frame, PipeWriter output) output.Advance(Http2FrameReader.HeaderLength); } + private Task TimeFlushUnsynchronizedAsync() + { + var bytesWritten = _unflushedBytes; + _unflushedBytes = 0; + + return _flusher.FlushAsync(_minResponseDataRate, bytesWritten); + } + public bool TryUpdateConnectionWindow(int bytes) { lock (_writeLock) diff --git a/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs b/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs index 586ba04c0..09111b1d7 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs @@ -24,7 +24,6 @@ public class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter // This should only be accessed via the FrameWriter. The connection-level output flow control is protected by the // FrameWriter's connection-level write lock. private readonly StreamOutputFlowControl _flowControl; - private readonly MinDataRate _minResponseDataRate; private readonly Http2Stream _stream; private readonly object _dataWriterLock = new object(); private readonly Pipe _dataPipe; @@ -38,17 +37,16 @@ public Http2OutputProducer( Http2FrameWriter frameWriter, StreamOutputFlowControl flowControl, ITimeoutControl timeoutControl, - MinDataRate minResponseDataRate, MemoryPool pool, - Http2Stream stream) + Http2Stream stream, + IKestrelTrace log) { _streamId = streamId; _frameWriter = frameWriter; _flowControl = flowControl; - _minResponseDataRate = minResponseDataRate; _stream = stream; _dataPipe = CreateDataPipe(pool); - _flusher = new TimingPipeFlusher(_dataPipe.Writer, timeoutControl); + _flusher = new TimingPipeFlusher(_dataPipe.Writer, timeoutControl, log); _dataWriteProcessingTask = ProcessDataWrites(); } @@ -212,14 +210,14 @@ private async Task ProcessDataWrites() { if (readResult.Buffer.Length > 0) { - await _frameWriter.WriteDataAsync(_streamId, _flowControl, _minResponseDataRate, readResult.Buffer, endStream: false); + await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: false); } await _frameWriter.WriteResponseTrailers(_streamId, _stream.Trailers); } else { - await _frameWriter.WriteDataAsync(_streamId, _flowControl, _minResponseDataRate, readResult.Buffer, endStream: readResult.IsCompleted); + await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: readResult.IsCompleted); } _dataPipe.Reader.AdvanceTo(readResult.Buffer.End); diff --git a/src/Kestrel.Core/Internal/Http2/Http2Stream.cs b/src/Kestrel.Core/Internal/Http2/Http2Stream.cs index 4f2d54434..13c23e91b 100644 --- a/src/Kestrel.Core/Internal/Http2/Http2Stream.cs +++ b/src/Kestrel.Core/Internal/Http2/Http2Stream.cs @@ -50,9 +50,9 @@ public Http2Stream(Http2StreamContext context) context.FrameWriter, _outputFlowControl, context.TimeoutControl, - context.ServiceContext.ServerOptions.Limits.MinResponseDataRate, context.MemoryPool, - this); + this, + context.ServiceContext.Log); RequestBodyPipe = CreateRequestBodyPipe(context.ServerPeerSettings.InitialWindowSize); Output = _http2Output; diff --git a/src/Kestrel.Core/Internal/Infrastructure/ITimeoutControl.cs b/src/Kestrel.Core/Internal/Infrastructure/ITimeoutControl.cs index 5a1a4e8c8..1bd5692a8 100644 --- a/src/Kestrel.Core/Internal/Infrastructure/ITimeoutControl.cs +++ b/src/Kestrel.Core/Internal/Infrastructure/ITimeoutControl.cs @@ -20,7 +20,8 @@ public interface ITimeoutControl void StopTimingRead(); void BytesRead(long count); - void StartTimingWrite(MinDataRate minRate, long size); + void StartTimingWrite(); void StopTimingWrite(); + void BytesWrittenToBuffer(MinDataRate minRate, long count); } } diff --git a/src/Kestrel.Core/Internal/Infrastructure/TimeoutControl.cs b/src/Kestrel.Core/Internal/Infrastructure/TimeoutControl.cs index bb44c9e2b..5a54b0db0 100644 --- a/src/Kestrel.Core/Internal/Infrastructure/TimeoutControl.cs +++ b/src/Kestrel.Core/Internal/Infrastructure/TimeoutControl.cs @@ -28,7 +28,7 @@ public class TimeoutControl : ITimeoutControl, IConnectionTimeoutFeature private int _concurrentAwaitingReads; private readonly object _writeTimingLock = new object(); - private int _cuncurrentAwaitingWrites; + private int _concurrentAwaitingWrites; private long _writeTimingTimeoutTimestamp; public TimeoutControl(ITimeoutHandler timeoutHandler) @@ -124,7 +124,7 @@ private void CheckForWriteDataRateTimeout(long timestamp) { lock (_writeTimingLock) { - if (_cuncurrentAwaitingWrites > 0 && timestamp > _writeTimingTimeoutTimestamp && !Debugger.IsAttached) + if (_concurrentAwaitingWrites > 0 && timestamp > _writeTimingTimeoutTimestamp && !Debugger.IsAttached) { _timeoutHandler.OnTimeout(TimeoutReason.WriteDataRate); } @@ -230,13 +230,29 @@ public void BytesRead(long count) } } - public void StartTimingWrite(MinDataRate minRate, long size) + public void StartTimingWrite() + { + lock (_writeTimingLock) + { + _concurrentAwaitingWrites++; + } + } + + public void StopTimingWrite() + { + lock (_writeTimingLock) + { + _concurrentAwaitingWrites--; + } + } + + public void BytesWrittenToBuffer(MinDataRate minRate, long count) { lock (_writeTimingLock) { // Add Heartbeat.Interval since this can be called right before the next heartbeat. var currentTimeUpperBound = Interlocked.Read(ref _lastTimestamp) + Heartbeat.Interval.Ticks; - var ticksToCompleteWriteAtMinRate = TimeSpan.FromSeconds(size / minRate.BytesPerSecond).Ticks; + var ticksToCompleteWriteAtMinRate = TimeSpan.FromSeconds(count / minRate.BytesPerSecond).Ticks; // If ticksToCompleteWriteAtMinRate is less than the configured grace period, // allow that write to take up to the grace period to complete. Only add the grace period @@ -255,15 +271,6 @@ public void StartTimingWrite(MinDataRate minRate, long size) var accumulatedWriteTimeoutTimestamp = _writeTimingTimeoutTimestamp + ticksToCompleteWriteAtMinRate; _writeTimingTimeoutTimestamp = Math.Max(singleWriteTimeoutTimestamp, accumulatedWriteTimeoutTimestamp); - _cuncurrentAwaitingWrites++; - } - } - - public void StopTimingWrite() - { - lock (_writeTimingLock) - { - _cuncurrentAwaitingWrites--; } } diff --git a/src/Kestrel.Core/Internal/Infrastructure/TimeoutControlExtensions.cs b/src/Kestrel.Core/Internal/Infrastructure/TimeoutControlExtensions.cs index 931bfb51f..1a2c809ab 100644 --- a/src/Kestrel.Core/Internal/Infrastructure/TimeoutControlExtensions.cs +++ b/src/Kestrel.Core/Internal/Infrastructure/TimeoutControlExtensions.cs @@ -7,18 +7,15 @@ public static class TimeoutControlExtensions { public static void StartDrainTimeout(this ITimeoutControl timeoutControl, MinDataRate minDataRate, long? maxResponseBufferSize) { - // If maxResponseBufferSize has no value, there's no backpressure and we can't reasonably timeout draining. + // If maxResponseBufferSize has no value, there's no backpressure and we can't reasonably time out draining. if (minDataRate == null || maxResponseBufferSize == null) { return; } - // With full backpressure and a connection adapter there could be 2 two pipes buffering. - // We already validate that the buffer size is positive. - // There's no reason to stop timing the write after the connection is closed. - var oneBufferSize = maxResponseBufferSize.Value; - var maxBufferedBytes = oneBufferSize < long.MaxValue / 2 ? oneBufferSize * 2 : long.MaxValue; - timeoutControl.StartTimingWrite(minDataRate, maxBufferedBytes); + // Ensure we have at least the grace period from this point to finish draining the response. + timeoutControl.BytesWrittenToBuffer(minDataRate, 1); + timeoutControl.StartTimingWrite(); } } } diff --git a/src/Kestrel.Core/Internal/Infrastructure/TimingPipeFlusher.cs b/src/Kestrel.Core/Internal/Infrastructure/TimingPipeFlusher.cs index 794b2a343..1825a42bb 100644 --- a/src/Kestrel.Core/Internal/Infrastructure/TimingPipeFlusher.cs +++ b/src/Kestrel.Core/Internal/Infrastructure/TimingPipeFlusher.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http; +using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure { @@ -18,16 +19,19 @@ public class TimingPipeFlusher { private readonly PipeWriter _writer; private readonly ITimeoutControl _timeoutControl; - private readonly object _flushLock = new object(); + private readonly IKestrelTrace _log; + private readonly object _flushLock = new object(); private Task _lastFlushTask = Task.CompletedTask; public TimingPipeFlusher( PipeWriter writer, - ITimeoutControl timeoutControl) + ITimeoutControl timeoutControl, + IKestrelTrace log) { _writer = writer; _timeoutControl = timeoutControl; + _log = log; } public Task FlushAsync() @@ -49,6 +53,11 @@ public Task FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outpu { var flushValueTask = _writer.FlushAsync(cancellationToken); + if (minRate != null) + { + _timeoutControl.BytesWrittenToBuffer(minRate, count); + } + if (flushValueTask.IsCompletedSuccessfully) { return Task.CompletedTask; @@ -74,20 +83,21 @@ private async Task TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAb { if (minRate != null) { - _timeoutControl.StartTimingWrite(minRate, count); + _timeoutControl.StartTimingWrite(); } try { await _lastFlushTask; } - catch (OperationCanceledException ex) + catch (OperationCanceledException ex) when (outputAborter != null) { outputAborter.Abort(new ConnectionAbortedException(CoreStrings.ConnectionOrStreamAbortedByCancellationToken, ex)); } - catch + catch (Exception ex) { // A canceled token is the only reason flush should ever throw. + _log.LogError(0, ex, $"Unexpected exception in {nameof(TimingPipeFlusher)}.{nameof(TimeFlushAsync)}."); } if (minRate != null) diff --git a/test/Kestrel.Core.Tests/TimeoutControlTests.cs b/test/Kestrel.Core.Tests/TimeoutControlTests.cs index cb4438759..fbe76de81 100644 --- a/test/Kestrel.Core.Tests/TimeoutControlTests.cs +++ b/test/Kestrel.Core.Tests/TimeoutControlTests.cs @@ -284,7 +284,8 @@ public void WriteTimingAbortsConnectionWhenWriteDoesNotCompleteWithMinimumDataRa _timeoutControl.Tick(systemClock.UtcNow); // Should complete within 4 seconds, but the timeout is adjusted by adding Heartbeat.Interval - _timeoutControl.StartTimingWrite(minRate, 400); + _timeoutControl.BytesWrittenToBuffer(minRate, 400); + _timeoutControl.StartTimingWrite(); // Tick just past 4s plus Heartbeat.Interval systemClock.UtcNow += TimeSpan.FromSeconds(4) + Heartbeat.Interval + TimeSpan.FromTicks(1); @@ -304,7 +305,8 @@ public void WriteTimingAbortsConnectionWhenSmallWriteDoesNotCompleteWithinGraceP _timeoutControl.Tick(startTime); // Should complete within 1 second, but the timeout is adjusted by adding Heartbeat.Interval - _timeoutControl.StartTimingWrite(minRate, 100); + _timeoutControl.BytesWrittenToBuffer(minRate, 100); + _timeoutControl.StartTimingWrite(); // Tick just past 1s plus Heartbeat.Interval systemClock.UtcNow += TimeSpan.FromSeconds(1) + Heartbeat.Interval + TimeSpan.FromTicks(1); @@ -331,10 +333,12 @@ public void WriteTimingTimeoutPushedOnConcurrentWrite() _timeoutControl.Tick(systemClock.UtcNow); // Should complete within 5 seconds, but the timeout is adjusted by adding Heartbeat.Interval - _timeoutControl.StartTimingWrite(minRate, 500); + _timeoutControl.BytesWrittenToBuffer(minRate, 500); + _timeoutControl.StartTimingWrite(); // Start a concurrent write after 3 seconds, which should complete within 3 seconds (adjusted by Heartbeat.Interval) - _timeoutControl.StartTimingWrite(minRate, 300); + _timeoutControl.BytesWrittenToBuffer(minRate, 300); + _timeoutControl.StartTimingWrite(); // Tick just past 5s plus Heartbeat.Interval, when the first write should have completed systemClock.UtcNow += TimeSpan.FromSeconds(5) + Heartbeat.Interval + TimeSpan.FromTicks(1); @@ -368,12 +372,12 @@ public void WriteTimingAbortsConnectionWhenRepeatedSmallWritesDoNotCompleteWithM // 5 consecutive 100 byte writes. for (var i = 0; i < numWrites - 1; i++) { - _timeoutControl.StartTimingWrite(minRate, writeSize); - _timeoutControl.StopTimingWrite(); + _timeoutControl.BytesWrittenToBuffer(minRate, writeSize); } // Stall the last write. - _timeoutControl.StartTimingWrite(minRate, writeSize); + _timeoutControl.BytesWrittenToBuffer(minRate, writeSize); + _timeoutControl.StartTimingWrite(); // Move the clock forward Heartbeat.Interval + MinDataRate.GracePeriod + 4 seconds. // The grace period should only be added for the first write. The subsequent 4 100 byte writes should add 1 second each to the timeout given the 100 byte/s min rate. diff --git a/test/Kestrel.InMemory.FunctionalTests/Http2/Http2TestBase.cs b/test/Kestrel.InMemory.FunctionalTests/Http2/Http2TestBase.cs index a54b38236..681c577d8 100644 --- a/test/Kestrel.InMemory.FunctionalTests/Http2/Http2TestBase.cs +++ b/test/Kestrel.InMemory.FunctionalTests/Http2/Http2TestBase.cs @@ -147,6 +147,7 @@ public class Http2TestBase : TestApplicationErrorLoggerLoggedTest, IDisposable, internal DuplexPipe.DuplexPipePair _pair; protected Http2Connection _connection; protected Task _connectionTask; + protected long _bytesReceived; public Http2TestBase() { @@ -1069,6 +1070,7 @@ protected async Task ReceiveFrameAsync(uint maxFrameSize } finally { + _bytesReceived += buffer.Slice(buffer.Start, consumed).Length; _pair.Application.Input.AdvanceTo(consumed, examined); } } @@ -1228,15 +1230,20 @@ public virtual void BytesRead(long count) } - public virtual void StartTimingWrite(MinDataRate minRate, long size) + public virtual void StartTimingWrite() { - _realTimeoutControl.StartTimingWrite(minRate, size); + _realTimeoutControl.StartTimingWrite(); } public virtual void StopTimingWrite() { _realTimeoutControl.StopTimingWrite(); } + + public virtual void BytesWrittenToBuffer(MinDataRate minRate, long size) + { + _realTimeoutControl.BytesWrittenToBuffer(minRate, size); + } } } } diff --git a/test/Kestrel.InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs b/test/Kestrel.InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs index b521b3993..eeccef720 100644 --- a/test/Kestrel.InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs +++ b/test/Kestrel.InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs @@ -182,14 +182,15 @@ public async Task ResponseDrain_SlowerThanMinimumDataRate_AbortsConnection() await WaitForConnectionStopAsync(expectedLastStreamId: 0, ignoreNonGoAwayFrames: false); mockSystemClock.UtcNow += - TimeSpan.FromSeconds(limits.MaxResponseBufferSize.Value * 2 / limits.MinResponseDataRate.BytesPerSecond) + - Heartbeat.Interval; + TimeSpan.FromSeconds(_bytesReceived / limits.MinResponseDataRate.BytesPerSecond) + + limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval - TimeSpan.FromSeconds(.5); + _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(It.IsAny()), Times.Never); _mockConnectionContext.Verify(c => c.Abort(It.IsAny()), Times.Never); - mockSystemClock.UtcNow += TimeSpan.FromTicks(1); + mockSystemClock.UtcNow += TimeSpan.FromSeconds(1); _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(TimeoutReason.WriteDataRate), Times.Once); @@ -284,12 +285,15 @@ await ExpectAsync(Http2FrameType.HEADERS, _timeoutControl.Tick(mockSystemClock.UtcNow); // Don't read data frame to induce "socket" backpressure. - mockSystemClock.UtcNow += limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval; + mockSystemClock.UtcNow += + TimeSpan.FromSeconds((_bytesReceived + _helloWorldBytes.Length) / limits.MinResponseDataRate.BytesPerSecond) + + limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval - TimeSpan.FromSeconds(.5); + _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(It.IsAny()), Times.Never); - mockSystemClock.UtcNow += TimeSpan.FromTicks(1); + mockSystemClock.UtcNow += TimeSpan.FromSeconds(1); _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(TimeoutReason.WriteDataRate), Times.Once); @@ -340,15 +344,16 @@ await ExpectAsync(Http2FrameType.HEADERS, // Complete timing of the request body so we don't induce any unexpected request body rate timeouts. _timeoutControl.Tick(mockSystemClock.UtcNow); - var timeToWriteMaxData = TimeSpan.FromSeconds(_maxData.Length / limits.MinResponseDataRate.BytesPerSecond); + var timeToWriteMaxData = TimeSpan.FromSeconds((_bytesReceived + _maxData.Length) / limits.MinResponseDataRate.BytesPerSecond) + + limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval - TimeSpan.FromSeconds(.5); // Don't read data frame to induce "socket" backpressure. - mockSystemClock.UtcNow += timeToWriteMaxData + Heartbeat.Interval; + mockSystemClock.UtcNow += timeToWriteMaxData; _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(It.IsAny()), Times.Never); - mockSystemClock.UtcNow += TimeSpan.FromTicks(1); + mockSystemClock.UtcNow += TimeSpan.FromSeconds(1); _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(TimeoutReason.WriteDataRate), Times.Once); @@ -404,12 +409,15 @@ await ExpectAsync(Http2FrameType.DATA, _timeoutControl.Tick(mockSystemClock.UtcNow); // Don't send WINDOW_UPDATE to induce flow-control backpressure - mockSystemClock.UtcNow += limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval; + mockSystemClock.UtcNow += + TimeSpan.FromSeconds(_bytesReceived / limits.MinResponseDataRate.BytesPerSecond) + + limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval - TimeSpan.FromSeconds(.5); + _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(It.IsAny()), Times.Never); - mockSystemClock.UtcNow += TimeSpan.FromTicks(1); + mockSystemClock.UtcNow += TimeSpan.FromSeconds(1); _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(TimeoutReason.WriteDataRate), Times.Once); @@ -458,15 +466,16 @@ await ExpectAsync(Http2FrameType.DATA, // Complete timing of the request body so we don't induce any unexpected request body rate timeouts. _timeoutControl.Tick(mockSystemClock.UtcNow); - var timeToWriteMaxData = TimeSpan.FromSeconds(_clientSettings.InitialWindowSize / limits.MinResponseDataRate.BytesPerSecond); + var timeToWriteMaxData = TimeSpan.FromSeconds(_bytesReceived / limits.MinResponseDataRate.BytesPerSecond) + + limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval - TimeSpan.FromSeconds(.5); // Don't send WINDOW_UPDATE to induce flow-control backpressure - mockSystemClock.UtcNow += timeToWriteMaxData + Heartbeat.Interval; + mockSystemClock.UtcNow += timeToWriteMaxData; _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(It.IsAny()), Times.Never); - mockSystemClock.UtcNow += TimeSpan.FromTicks(1); + mockSystemClock.UtcNow += TimeSpan.FromSeconds(1); _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(TimeoutReason.WriteDataRate), Times.Once); @@ -527,17 +536,17 @@ await ExpectAsync(Http2FrameType.DATA, // Complete timing of the request bodies so we don't induce any unexpected request body rate timeouts. _timeoutControl.Tick(mockSystemClock.UtcNow); - var timeToWriteMaxData = TimeSpan.FromSeconds(_clientSettings.InitialWindowSize / limits.MinResponseDataRate.BytesPerSecond); - // Double the timeout for the second stream. - timeToWriteMaxData += timeToWriteMaxData; + var timeToWriteMaxData = TimeSpan.FromSeconds(_bytesReceived / limits.MinResponseDataRate.BytesPerSecond) + + limits.MinResponseDataRate.GracePeriod + Heartbeat.Interval - TimeSpan.FromSeconds(.5); // Don't send WINDOW_UPDATE to induce flow-control backpressure - mockSystemClock.UtcNow += timeToWriteMaxData + Heartbeat.Interval; + mockSystemClock.UtcNow += timeToWriteMaxData; _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(It.IsAny()), Times.Never); - mockSystemClock.UtcNow += TimeSpan.FromTicks(1); + //mockSystemClock.UtcNow += TimeSpan.FromTicks(1); + mockSystemClock.UtcNow += TimeSpan.FromSeconds(1); _timeoutControl.Tick(mockSystemClock.UtcNow); _mockTimeoutHandler.Verify(h => h.OnTimeout(TimeoutReason.WriteDataRate), Times.Once); diff --git a/test/Kestrel.InMemory.FunctionalTests/RequestHeadersTimeoutTests.cs b/test/Kestrel.InMemory.FunctionalTests/RequestHeadersTimeoutTests.cs index 92d74c860..333ce24f8 100644 --- a/test/Kestrel.InMemory.FunctionalTests/RequestHeadersTimeoutTests.cs +++ b/test/Kestrel.InMemory.FunctionalTests/RequestHeadersTimeoutTests.cs @@ -97,6 +97,9 @@ public async Task TimeoutNotResetOnEachRequestLineCharacterReceived() var testContext = new TestServiceContext(LoggerFactory); var heartbeatManager = new HeartbeatManager(testContext.ConnectionManager); + // Disable response rate, so we can finish the send loop without timing out the response. + testContext.ServerOptions.Limits.MinResponseDataRate = null; + using (var server = CreateServer(testContext)) using (var connection = server.CreateConnection()) { diff --git a/test/Kestrel.InMemory.FunctionalTests/ResponseDrainingTests.cs b/test/Kestrel.InMemory.FunctionalTests/ResponseDrainingTests.cs index 432b61c64..d4ebbbb59 100644 --- a/test/Kestrel.InMemory.FunctionalTests/ResponseDrainingTests.cs +++ b/test/Kestrel.InMemory.FunctionalTests/ResponseDrainingTests.cs @@ -59,14 +59,12 @@ await connection.Send( // Wait for the drain timeout to be set. await outputBufferedTcs.Task.DefaultTimeout(); - testContext.MockSystemClock.UtcNow += - Heartbeat.Interval + - TimeSpan.FromSeconds(testContext.ServerOptions.Limits.MaxResponseBufferSize.Value * 2 / minRate.BytesPerSecond); + testContext.MockSystemClock.UtcNow += minRate.GracePeriod + Heartbeat.Interval - TimeSpan.FromSeconds(.5); heartbeatManager.OnHeartbeat(testContext.SystemClock.UtcNow); Assert.Null(transportConnection.AbortReason); - testContext.MockSystemClock.UtcNow += TimeSpan.FromTicks(1); + testContext.MockSystemClock.UtcNow += TimeSpan.FromSeconds(1); heartbeatManager.OnHeartbeat(testContext.SystemClock.UtcNow); Assert.NotNull(transportConnection.AbortReason);