Skip to content

Accurately count only newly examined bytes #12639

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ public override void AdvanceTo(SequencePosition consumed)

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
var dataLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;
OnAdvance(_readResult, consumed, examined);
_requestBodyPipe.Reader.AdvanceTo(consumed, examined);
OnDataRead(dataLength);
}

public override bool TryRead(out ReadResult readResult)
Expand All @@ -63,6 +62,7 @@ public override bool TryReadInternal(out ReadResult readResult)
var boolResult = _requestBodyPipe.Reader.TryRead(out _readResult);

readResult = _readResult;
CountBytesRead(readResult.Buffer.Length);

if (_readResult.IsCompleted)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ internal sealed class Http1ContentLengthMessageBody : Http1MessageBody
private bool _readCompleted;
private bool _isReading;
private int _userCanceled;
private long _totalExaminedInPreviousReadResult;
private bool _finalAdvanceCalled;

public Http1ContentLengthMessageBody(bool keepAlive, long contentLength, Http1Connection context)
Expand Down Expand Up @@ -85,6 +84,8 @@ public override async ValueTask<ReadResult> ReadAsyncInternal(CancellationToken
if (Interlocked.Exchange(ref _userCanceled, 0) == 1)
{
// Ignore the readResult if it wasn't by the user.
CreateReadResultFromConnectionReadResult();

break;
}
else
Expand Down Expand Up @@ -156,6 +157,7 @@ public override bool TryReadInternal(out ReadResult readResult)
CreateReadResultFromConnectionReadResult();

readResult = _readResult;
CountBytesRead(readResult.Buffer.Length);

return true;
}
Expand All @@ -174,11 +176,11 @@ public override Task ConsumeAsync()

private void CreateReadResultFromConnectionReadResult()
{
if (_readResult.Buffer.Length >= _inputLength + _totalExaminedInPreviousReadResult)
if (_readResult.Buffer.Length >= _inputLength + _examinedUnconsumedBytes)
{
_readCompleted = true;
_readResult = new ReadResult(
_readResult.Buffer.Slice(0, _inputLength + _totalExaminedInPreviousReadResult),
_readResult.Buffer.Slice(0, _inputLength + _examinedUnconsumedBytes),
_readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 1,
_readCompleted);
}
Expand Down Expand Up @@ -217,18 +219,8 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami
return;
}

var consumedLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;
var examinedLength = consumedLength + _readResult.Buffer.Slice(consumed, examined).Length;

_inputLength -= OnAdvance(_readResult, consumed, examined);
_context.Input.AdvanceTo(consumed, examined);

var newlyExamined = examinedLength - _totalExaminedInPreviousReadResult;

OnDataRead(newlyExamined);
_totalExaminedInPreviousReadResult += newlyExamined;
_inputLength -= newlyExamined;

_totalExaminedInPreviousReadResult -= consumedLength;
}

protected override void OnReadStarting()
Expand Down
73 changes: 70 additions & 3 deletions src/Servers/Kestrel/Core/src/Internal/Http/MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal abstract class MessageBody
protected bool _timingEnabled;
protected bool _backpressure;
protected long _alreadyTimedBytes;
protected long _examinedUnconsumedBytes;

protected MessageBody(HttpProtocol context)
{
Expand Down Expand Up @@ -165,16 +166,82 @@ protected ValueTask<ReadResult> StartTimingReadAsync(ValueTask<ReadResult> readA
return readAwaitable;
}

protected void StopTimingRead(long bytesRead)
protected void CountBytesRead(long bytesInReadResult)
{
_context.TimeoutControl.BytesRead(bytesRead - _alreadyTimedBytes);
_alreadyTimedBytes = 0;
var numFirstSeenBytes = bytesInReadResult - _alreadyTimedBytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@halter73 Should this then update _alreadyTimedBytes?

var numFirstSeenBytes = bytesInReadResult - _alreadyTimedBytes;

if (numFirstSeenBytes > 0)
{
    _context.TimeoutControl.BytesRead(numFirstSeenBytes);
    _alreadyTimedBytes = bytesInReadResult ;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nvm I think its ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We count all the bytes in the ReadResult towards rate calculations, examined or not. Then in Advance, if we don't consume everything, we count all the unconsumed and unexamined bytes towards _alreadyTimedBytes, since both the unconsumed and unexamined bytes will be present in the next read result. What got you looking at this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was getting test failures with #11942 (comment) when I removed all the changes that weren't directly related to the streams; as assumed this superseded them.

Issue wasn't with counting though; needed TryStart/TryStop in Http2MessageBody


if (numFirstSeenBytes > 0)
{
_context.TimeoutControl.BytesRead(numFirstSeenBytes);
}
}

protected void StopTimingRead(long bytesInReadResult)
{
CountBytesRead(bytesInReadResult);

if (_backpressure)
{
_backpressure = false;
_context.TimeoutControl.StopTimingRead();
}
}

protected long OnAdvance(ReadResult readResult, SequencePosition consumed, SequencePosition examined)
{
// This code path is fairly hard to understand so let's break it down with an example
// ReadAsync returns a ReadResult of length 50.
// Advance(25, 40). The examined length would be 40 and consumed length would be 25.
// _totalExaminedInPreviousReadResult starts at 0. newlyExamined is 40.
// OnDataRead is called with length 40.
// _totalExaminedInPreviousReadResult is now 40 - 25 = 15.

// The next call to ReadAsync returns 50 again
// Advance(5, 5) is called
// newlyExamined is 5 - 15, or -10.
// Update _totalExaminedInPreviousReadResult to 10 as we consumed 5.

// The next call to ReadAsync returns 50 again
// _totalExaminedInPreviousReadResult is 10
// Advance(50, 50) is called
// newlyExamined = 50 - 10 = 40
// _totalExaminedInPreviousReadResult is now 50
// _totalExaminedInPreviousReadResult is finally 0 after subtracting consumedLength.

long examinedLength, consumedLength, totalLength;

if (consumed.Equals(examined))
{
examinedLength = readResult.Buffer.Slice(readResult.Buffer.Start, examined).Length;
consumedLength = examinedLength;
}
else
{
consumedLength = readResult.Buffer.Slice(readResult.Buffer.Start, consumed).Length;
examinedLength = consumedLength + readResult.Buffer.Slice(consumed, examined).Length;
}

if (examined.Equals(readResult.Buffer.End))
{
totalLength = examinedLength;
}
else
{
totalLength = readResult.Buffer.Length;
}

var newlyExamined = examinedLength - _examinedUnconsumedBytes;

if (newlyExamined > 0)
{
OnDataRead(newlyExamined);
_examinedUnconsumedBytes += newlyExamined;
}

_examinedUnconsumedBytes -= consumedLength;
_alreadyTimedBytes = totalLength - consumedLength;

return newlyExamined;
}
}
}
45 changes: 2 additions & 43 deletions src/Servers/Kestrel/Core/src/Internal/Http2/Http2MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ internal sealed class Http2MessageBody : MessageBody
{
private readonly Http2Stream _context;
private ReadResult _readResult;
private long _alreadyExaminedInNextReadResult;

private Http2MessageBody(Http2Stream context)
: base(context)
Expand Down Expand Up @@ -64,55 +63,15 @@ public override void AdvanceTo(SequencePosition consumed)

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
// This code path is fairly hard to understand so let's break it down with an example
// ReadAsync returns a ReadResult of length 50.
// Advance(25, 40). The examined length would be 40 and consumed length would be 25.
// _totalExaminedInPreviousReadResult starts at 0. newlyExamined is 40.
// OnDataRead is called with length 40.
// _totalExaminedInPreviousReadResult is now 40 - 25 = 15.

// The next call to ReadAsync returns 50 again
// Advance(5, 5) is called
// newlyExamined is 5 - 15, or -10.
// Update _totalExaminedInPreviousReadResult to 10 as we consumed 5.

// The next call to ReadAsync returns 50 again
// _totalExaminedInPreviousReadResult is 10
// Advance(50, 50) is called
// newlyExamined = 50 - 10 = 40
// _totalExaminedInPreviousReadResult is now 50
// _totalExaminedInPreviousReadResult is finally 0 after subtracting consumedLength.

long examinedLength;
long consumedLength;
if (consumed.Equals(examined))
{
examinedLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, examined).Length;
consumedLength = examinedLength;
}
else
{
consumedLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;
examinedLength = consumedLength + _readResult.Buffer.Slice(consumed, examined).Length;
}

OnAdvance(_readResult, consumed, examined);
_context.RequestBodyPipe.Reader.AdvanceTo(consumed, examined);

var newlyExamined = examinedLength - _alreadyExaminedInNextReadResult;

if (newlyExamined > 0)
{
OnDataRead(newlyExamined);
_alreadyExaminedInNextReadResult += newlyExamined;
}

_alreadyExaminedInNextReadResult -= consumedLength;
}

public override bool TryRead(out ReadResult readResult)
{
var result = _context.RequestBodyPipe.Reader.TryRead(out readResult);
_readResult = readResult;
CountBytesRead(readResult.Buffer.Length);

return result;
}
Expand Down