Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Measure the rate of all HTTP/2 output (#3067)
Browse files Browse the repository at this point in the history
Prior to this, only the response body counted toward the HTTP/2 response data rate. This PR aligns the HTTP/2 logic closer to the HTTP/1.x logic and measures the rate for all HTTP/2 response data.

This PR also accounts for all response bytes written, not just those that immediately induced backpressure.
  • Loading branch information
halter73 authored Oct 31, 2018
1 parent e958d82 commit d50c0c1
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 101 deletions.
2 changes: 2 additions & 0 deletions benchmarks/Kestrel.Performance/Http1WritingBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using Microsoft.AspNetCore.Testing;

Expand Down Expand Up @@ -113,6 +114,7 @@ private TestHttp1Connection MakeHttp1Connection()
ServiceContext = serviceContext,
ConnectionFeatures = new FeatureCollection(),
MemoryPool = _memoryPool,
TimeoutControl = new TimeoutControl(timeoutHandler: null),
Transport = pair.Transport
});

Expand Down
2 changes: 1 addition & 1 deletion src/Kestrel.Core/Internal/Http/Http1OutputProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Http1OutputProducer(
_connectionContext = connectionContext;
_log = log;
_minResponseDataRateFeature = minResponseDataRateFeature;
_flusher = new TimingPipeFlusher(pipeWriter, timeoutControl);
_flusher = new TimingPipeFlusher(pipeWriter, timeoutControl, log);
}

public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default)
Expand Down
20 changes: 16 additions & 4 deletions src/Kestrel.Core/Internal/Http2/Http2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,27 @@ public Http2Connection(HttpConnectionContext context)
var http2Limits = httpLimits.Http2;

_context = context;
_frameWriter = new Http2FrameWriter(context.Transport.Output, context.ConnectionContext, this, _outputFlowControl, context.TimeoutControl, context.ConnectionId, context.ServiceContext.Log);

_frameWriter = new Http2FrameWriter(
context.Transport.Output,
context.ConnectionContext,
this,
_outputFlowControl,
context.TimeoutControl,
httpLimits.MinResponseDataRate,
context.ConnectionId,
context.ServiceContext.Log);

_hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize);

var connectionWindow = (uint)http2Limits.InitialConnectionWindowSize;
_inputFlowControl = new InputFlowControl(connectionWindow, connectionWindow / 2);

_serverSettings.MaxConcurrentStreams = (uint)http2Limits.MaxStreamsPerConnection;
_serverSettings.MaxFrameSize = (uint)http2Limits.MaxFrameSize;
_serverSettings.HeaderTableSize = (uint)http2Limits.HeaderTableSize;
_hpackDecoder = new HPackDecoder(http2Limits.HeaderTableSize, http2Limits.MaxRequestHeaderFieldSize);
_serverSettings.MaxHeaderListSize = (uint)httpLimits.MaxRequestHeadersTotalSize;
_serverSettings.InitialWindowSize = (uint)http2Limits.InitialStreamWindowSize;
var connectionWindow = (uint)http2Limits.InitialConnectionWindowSize;
_inputFlowControl = new InputFlowControl(connectionWindow, connectionWindow / 2);
}

public string ConnectionId => _context.ConnectionId;
Expand Down
86 changes: 56 additions & 30 deletions src/Kestrel.Core/Internal/Http2/Http2FrameWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,33 @@ public class Http2FrameWriter
// Literal Header Field without Indexing - Indexed Name (Index 8 - :status)
private static readonly byte[] _continueBytes = new byte[] { 0x08, 0x03, (byte)'1', (byte)'0', (byte)'0' };

private uint _maxFrameSize = Http2PeerSettings.MinAllowedMaxFrameSize;
private byte[] _headerEncodingBuffer;
private Http2Frame _outgoingFrame;
private readonly object _writeLock = new object();
private readonly Http2Frame _outgoingFrame;
private readonly HPackEncoder _hpackEncoder = new HPackEncoder();
private readonly PipeWriter _outputWriter;
private bool _aborted;
private readonly ConnectionContext _connectionContext;
private readonly Http2Connection _http2Connection;
private readonly OutputFlowControl _connectionOutputFlowControl;
private readonly string _connectionId;
private readonly IKestrelTrace _log;
private readonly ITimeoutControl _timeoutControl;
private readonly MinDataRate _minResponseDataRate;
private readonly TimingPipeFlusher _flusher;

private uint _maxFrameSize = Http2PeerSettings.MinAllowedMaxFrameSize;
private byte[] _headerEncodingBuffer;
private long _unflushedBytes;

private bool _completed;
private bool _aborted;

public Http2FrameWriter(
PipeWriter outputPipeWriter,
ConnectionContext connectionContext,
Http2Connection http2Connection,
OutputFlowControl connectionOutputFlowControl,
ITimeoutControl timeoutControl,
MinDataRate minResponseDataRate,
string connectionId,
IKestrelTrace log)
{
Expand All @@ -56,7 +60,8 @@ public Http2FrameWriter(
_connectionId = connectionId;
_log = log;
_timeoutControl = timeoutControl;
_flusher = new TimingPipeFlusher(_outputWriter, timeoutControl);
_minResponseDataRate = minResponseDataRate;
_flusher = new TimingPipeFlusher(_outputWriter, timeoutControl, log);
_outgoingFrame = new Http2Frame();
_headerEncodingBuffer = new byte[_maxFrameSize];
}
Expand Down Expand Up @@ -112,8 +117,11 @@ public Task FlushAsync(IHttpOutputAborter outputAborter, CancellationToken cance
{
return Task.CompletedTask;
}

var bytesWritten = _unflushedBytes;
_unflushedBytes = 0;

return _flusher.FlushAsync(outputAborter, cancellationToken);
return _flusher.FlushAsync(_minResponseDataRate, bytesWritten, outputAborter, cancellationToken);
}
}

Expand All @@ -130,7 +138,7 @@ public Task Write100ContinueAsync(int streamId)
_outgoingFrame.PayloadLength = _continueBytes.Length;
WriteHeaderUnsynchronized();
_outputWriter.Write(_continueBytes);
return _flusher.FlushAsync();
return TimeFlushUnsynchronizedAsync();
}
}

Expand Down Expand Up @@ -195,7 +203,7 @@ public Task WriteResponseTrailers(int streamId, HttpResponseTrailers headers)
_http2Connection.Abort(new ConnectionAbortedException(hex.Message, hex));
}

return _flusher.FlushAsync();
return TimeFlushUnsynchronizedAsync();
}
}

Expand Down Expand Up @@ -228,7 +236,7 @@ private void FinishWritingHeaders(int streamId, int payloadLength, bool done)
}
}

public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, MinDataRate minRate, ReadOnlySequence<byte> data, bool endStream)
public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, ReadOnlySequence<byte> data, bool endStream)
{
// The Length property of a ReadOnlySequence can be expensive, so we cache the value.
var dataLength = data.Length;
Expand All @@ -244,12 +252,13 @@ public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, Mi
// https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.1
if (dataLength != 0 && dataLength > flowControl.Available)
{
return WriteDataAsyncAwaited(streamId, minRate, flowControl, data, dataLength, endStream);
return WriteDataAsync(streamId, flowControl, data, dataLength, endStream);
}

// This cast is safe since if dataLength would overflow an int, it's guaranteed to be greater than the available flow control window.
flowControl.Advance((int)dataLength);
return WriteDataUnsynchronizedAsync(streamId, minRate, data, dataLength, endStream);
WriteDataUnsynchronized(streamId, data, dataLength, endStream);
return TimeFlushUnsynchronizedAsync();
}
}

Expand All @@ -262,7 +271,7 @@ public Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, Mi
| Padding (*) ...
+---------------------------------------------------------------+
*/
private Task WriteDataUnsynchronizedAsync(int streamId, MinDataRate minRate, ReadOnlySequence<byte> data, long dataLength, bool endStream)
private void WriteDataUnsynchronized(int streamId, ReadOnlySequence<byte> data, long dataLength, bool endStream)
{
// Note padding is not implemented
_outgoingFrame.PrepareData(streamId);
Expand Down Expand Up @@ -301,17 +310,14 @@ private Task WriteDataUnsynchronizedAsync(int streamId, MinDataRate minRate, Rea
}

// Plus padding

return _flusher.FlushAsync(minRate, dataLength);
}

private async Task WriteDataAsyncAwaited(int streamId, MinDataRate minRate, StreamOutputFlowControl flowControl, ReadOnlySequence<byte> data, long dataLength, bool endStream)
private async Task WriteDataAsync(int streamId, StreamOutputFlowControl flowControl, ReadOnlySequence<byte> data, long dataLength, bool endStream)
{
while (dataLength > 0)
{
OutputFlowControlAwaitable availabilityAwaitable;
var writeTask = Task.CompletedTask;
int actual;

lock (_writeLock)
{
Expand All @@ -320,24 +326,33 @@ private async Task WriteDataAsyncAwaited(int streamId, MinDataRate minRate, Stre
break;
}

actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityAwaitable);
var actual = flowControl.AdvanceUpToAndWait(dataLength, out availabilityAwaitable);

if (actual > 0)
{
// Don't pass minRate through to the inner WriteData calls.
// We measure this ourselves below so we account for flow control in addition to socket backpressure.
if (actual < dataLength)
{
writeTask = WriteDataUnsynchronizedAsync(streamId, null, data.Slice(0, actual), actual, endStream: false);
WriteDataUnsynchronized(streamId, data.Slice(0, actual), actual, endStream: false);
data = data.Slice(actual);
dataLength -= actual;
}
else
{
writeTask = WriteDataUnsynchronizedAsync(streamId, null, data, actual, endStream);
WriteDataUnsynchronized(streamId, data, actual, endStream);
dataLength = 0;
}

// Don't call TimeFlushUnsynchronizedAsync() since we time this write while also accounting for
// flow control induced backpressure below.
writeTask = _flusher.FlushAsync();
}

if (_minResponseDataRate != null)
{
_timeoutControl.BytesWrittenToBuffer(_minResponseDataRate, _unflushedBytes);
}

_unflushedBytes = 0;
}

// Avoid timing writes that are already complete. This is likely to happen during the last iteration.
Expand All @@ -346,9 +361,9 @@ private async Task WriteDataAsyncAwaited(int streamId, MinDataRate minRate, Stre
continue;
}

if (minRate != null)
if (_minResponseDataRate != null)
{
_timeoutControl.StartTimingWrite(minRate, actual);
_timeoutControl.StartTimingWrite();
}

// This awaitable releases continuations in FIFO order when the window updates.
Expand All @@ -360,7 +375,7 @@ private async Task WriteDataAsyncAwaited(int streamId, MinDataRate minRate, Stre

await writeTask;

if (minRate != null)
if (_minResponseDataRate != null)
{
_timeoutControl.StopTimingWrite();
}
Expand Down Expand Up @@ -389,7 +404,7 @@ public Task WriteWindowUpdateAsync(int streamId, int sizeIncrement)
var buffer = _outputWriter.GetSpan(4);
Bitshifter.WriteUInt31BigEndian(buffer, (uint)sizeIncrement);
_outputWriter.Advance(4);
return _flusher.FlushAsync();
return TimeFlushUnsynchronizedAsync();
}
}

Expand All @@ -413,7 +428,7 @@ public Task WriteRstStreamAsync(int streamId, Http2ErrorCode errorCode)
BinaryPrimitives.WriteUInt32BigEndian(buffer, (uint)errorCode);
_outputWriter.Advance(4);

return _flusher.FlushAsync();
return TimeFlushUnsynchronizedAsync();
}
}

Expand Down Expand Up @@ -443,7 +458,7 @@ public Task WriteSettingsAsync(IList<Http2PeerSetting> settings)
WriteSettings(settings, buffer);
_outputWriter.Advance(settingsSize);

return _flusher.FlushAsync();
return TimeFlushUnsynchronizedAsync();
}
}

Expand All @@ -469,7 +484,7 @@ public Task WriteSettingsAckAsync()

_outgoingFrame.PrepareSettings(Http2SettingsFrameFlags.ACK);
WriteHeaderUnsynchronized();
return _flusher.FlushAsync();
return TimeFlushUnsynchronizedAsync();
}
}

Expand Down Expand Up @@ -497,7 +512,7 @@ public Task WritePingAsync(Http2PingFrameFlags flags, ReadOnlySequence<byte> pay
_outputWriter.Write(segment.Span);
}

return _flusher.FlushAsync();
return TimeFlushUnsynchronizedAsync();
}
}

Expand All @@ -523,14 +538,17 @@ public Task WriteGoAwayAsync(int lastStreamId, Http2ErrorCode errorCode)
BinaryPrimitives.WriteUInt32BigEndian(buffer, (uint)errorCode);
_outputWriter.Advance(8);

return _flusher.FlushAsync();
return TimeFlushUnsynchronizedAsync();
}
}

private void WriteHeaderUnsynchronized()
{
_log.Http2FrameSending(_connectionId, _outgoingFrame);
WriteHeader(_outgoingFrame, _outputWriter);

// We assume the payload will be written prior to the next flush.
_unflushedBytes += Http2FrameReader.HeaderLength + _outgoingFrame.PayloadLength;
}

/* https://tools.ietf.org/html/rfc7540#section-4.1
Expand Down Expand Up @@ -560,6 +578,14 @@ internal static void WriteHeader(Http2Frame frame, PipeWriter output)
output.Advance(Http2FrameReader.HeaderLength);
}

private Task TimeFlushUnsynchronizedAsync()
{
var bytesWritten = _unflushedBytes;
_unflushedBytes = 0;

return _flusher.FlushAsync(_minResponseDataRate, bytesWritten);
}

public bool TryUpdateConnectionWindow(int bytes)
{
lock (_writeLock)
Expand Down
12 changes: 5 additions & 7 deletions src/Kestrel.Core/Internal/Http2/Http2OutputProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class Http2OutputProducer : IHttpOutputProducer, IHttpOutputAborter
// This should only be accessed via the FrameWriter. The connection-level output flow control is protected by the
// FrameWriter's connection-level write lock.
private readonly StreamOutputFlowControl _flowControl;
private readonly MinDataRate _minResponseDataRate;
private readonly Http2Stream _stream;
private readonly object _dataWriterLock = new object();
private readonly Pipe _dataPipe;
Expand All @@ -38,17 +37,16 @@ public Http2OutputProducer(
Http2FrameWriter frameWriter,
StreamOutputFlowControl flowControl,
ITimeoutControl timeoutControl,
MinDataRate minResponseDataRate,
MemoryPool<byte> pool,
Http2Stream stream)
Http2Stream stream,
IKestrelTrace log)
{
_streamId = streamId;
_frameWriter = frameWriter;
_flowControl = flowControl;
_minResponseDataRate = minResponseDataRate;
_stream = stream;
_dataPipe = CreateDataPipe(pool);
_flusher = new TimingPipeFlusher(_dataPipe.Writer, timeoutControl);
_flusher = new TimingPipeFlusher(_dataPipe.Writer, timeoutControl, log);
_dataWriteProcessingTask = ProcessDataWrites();
}

Expand Down Expand Up @@ -212,14 +210,14 @@ private async Task ProcessDataWrites()
{
if (readResult.Buffer.Length > 0)
{
await _frameWriter.WriteDataAsync(_streamId, _flowControl, _minResponseDataRate, readResult.Buffer, endStream: false);
await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: false);
}

await _frameWriter.WriteResponseTrailers(_streamId, _stream.Trailers);
}
else
{
await _frameWriter.WriteDataAsync(_streamId, _flowControl, _minResponseDataRate, readResult.Buffer, endStream: readResult.IsCompleted);
await _frameWriter.WriteDataAsync(_streamId, _flowControl, readResult.Buffer, endStream: readResult.IsCompleted);
}

_dataPipe.Reader.AdvanceTo(readResult.Buffer.End);
Expand Down
4 changes: 2 additions & 2 deletions src/Kestrel.Core/Internal/Http2/Http2Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public Http2Stream(Http2StreamContext context)
context.FrameWriter,
_outputFlowControl,
context.TimeoutControl,
context.ServiceContext.ServerOptions.Limits.MinResponseDataRate,
context.MemoryPool,
this);
this,
context.ServiceContext.Log);

RequestBodyPipe = CreateRequestBodyPipe(context.ServerPeerSettings.InitialWindowSize);
Output = _http2Output;
Expand Down
3 changes: 2 additions & 1 deletion src/Kestrel.Core/Internal/Infrastructure/ITimeoutControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public interface ITimeoutControl
void StopTimingRead();
void BytesRead(long count);

void StartTimingWrite(MinDataRate minRate, long size);
void StartTimingWrite();
void StopTimingWrite();
void BytesWrittenToBuffer(MinDataRate minRate, long count);
}
}
Loading

0 comments on commit d50c0c1

Please sign in to comment.