Skip to content

Amortize HttpRequestStream & DuplexPipeStream ReadAsync calls #11942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 98 additions & 16 deletions src/Servers/Kestrel/Core/src/Internal/Http/HttpRequestStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
internal sealed class HttpRequestStream : Stream
{
private readonly HttpRequestPipeReader _pipeReader;
private readonly IHttpBodyControlFeature _bodyControl;
private AsyncEnumerableReader _asyncReader;

public HttpRequestStream(IHttpBodyControlFeature bodyControl, HttpRequestPipeReader pipeReader)
{
Expand Down Expand Up @@ -44,12 +47,26 @@ public override int WriteTimeout

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
return ReadAsyncWrapper(destination, cancellationToken);
try
{
return ReadAsyncInternal(destination, cancellationToken);
}
catch (ConnectionAbortedException ex)
{
throw new TaskCanceledException("The request was aborted", ex);
}
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return ReadAsyncWrapper(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
try
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
catch (ConnectionAbortedException ex)
{
throw new TaskCanceledException("The request was aborted", ex);
}
}

public override int Read(byte[] buffer, int offset, int count)
Expand Down Expand Up @@ -127,23 +144,78 @@ private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationTo
return tcs.Task;
}

private ValueTask<int> ReadAsyncWrapper(Memory<byte> destination, CancellationToken cancellationToken)
private ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
if (_asyncReader?.InProgress ?? false)
{
// Throw if there are overlapping reads; throwing unwrapped as it suggests last read was not awaited
// so we surface it directly rather than wrapped in a Task (as this one will likely also not be awaited).
throw new InvalidOperationException("Concurrent reads are not supported; await the " + nameof(ValueTask<int>) + " before starting next read.");
}

try
{
return ReadAsyncInternal(destination, cancellationToken);
while (true)
{
if (!_pipeReader.TryRead(out var result))
{
break;
}

if (result.IsCanceled)
{
throw new OperationCanceledException("The read was canceled");
}

var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;

var consumed = readableBuffer.End;
var actual = 0;
try
{
if (readableBufferLength != 0)
{
actual = (int)Math.Min(readableBufferLength, buffer.Length);

var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);

return new ValueTask<int>(actual);
}

if (result.IsCompleted)
{
return new ValueTask<int>(0);
}
}
finally
{
_pipeReader.AdvanceTo(consumed);
}
}
}
catch (ConnectionAbortedException ex)
catch (Exception ex)
{
throw new TaskCanceledException("The request was aborted", ex);
return new ValueTask<int>(Task.FromException<int>(ex));
}

var asyncReader = _asyncReader;
if (asyncReader is null)
{
_asyncReader = asyncReader = new AsyncEnumerableReader();
asyncReader.Initialize(ReadAsyncAwaited(asyncReader));
}

return asyncReader.ReadAsync(buffer, cancellationToken);
}

private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
private async IAsyncEnumerable<int> ReadAsyncAwaited(AsyncEnumerableReader reader)
{
while (true)
{
var result = await _pipeReader.ReadAsync(cancellationToken);
var result = await _pipeReader.ReadAsync(reader.CancellationToken);

if (result.IsCanceled)
{
Expand All @@ -154,30 +226,40 @@ private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, Cancellation
var readableBufferLength = readableBuffer.Length;

var consumed = readableBuffer.End;
var advanced = false;
try
{
if (readableBufferLength != 0)
{
var actual = (int)Math.Min(readableBufferLength, buffer.Length);
var actual = (int)Math.Min(readableBufferLength, reader.Buffer.Length);

var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
consumed = slice.End;
slice.CopyTo(buffer.Span);
slice.CopyTo(reader.Buffer.Span);

return actual;
// Finally blocks in enumerators aren't excuted prior to the yield return,
// so we advance here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... that seems like it will be a bug farm in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is that bad. The finally still runs given an uncaught exception or the completion of the method. I think it would be more weird if the finally did run for every yield return.

advanced = true;
_pipeReader.AdvanceTo(consumed);
yield return actual;
}

if (result.IsCompleted)
else if (result.IsCompleted)
{
return 0;
// Finally blocks in enumerators aren't excuted prior to the yield return,
// so we advance here
advanced = true;
_pipeReader.AdvanceTo(consumed);
yield return 0;
}
}
finally
{
_pipeReader.AdvanceTo(consumed);
if (!advanced)
{
_pipeReader.AdvanceTo(consumed);
}
}
}

}

/// <inheritdoc />
Expand Down
20 changes: 16 additions & 4 deletions src/Servers/Kestrel/Core/src/Internal/Http2/Http2MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,23 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami

public override bool TryRead(out ReadResult readResult)
{
var result = _context.RequestBodyPipe.Reader.TryRead(out readResult);
_readResult = readResult;
CountBytesRead(readResult.Buffer.Length);
TryStart();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@halter73 note: had to include this Http2MessageBody change for TryStart/TryStop to prevent test failures.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely feels better to call TryStart here. It does important stuff that isn't even timing related like checking that the request body size isn't larger than what's allowed.

What test failures were you seeing specifically.

Copy link
Member Author

@benaadams benaadams Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example was the following though there were a couple different types

Error message
Moq.MockException :
Expected invocation on the mock should never have been performed, but was 32 times: 
h => h.OnTimeout(It.IsAny<TimeoutReason>())

Configured setups:
 ITimeoutHandler h => h.OnTimeout(It.IsAny<TimeoutReason>())

Performed invocations: 
ITimeoutHandler.OnTimeout(TimeoutReason.ReadDataRate)
ITimeoutHandler.OnTimeout(TimeoutReason.ReadDataRate)
ITimeoutHandler.OnTimeout(TimeoutReason.ReadDataRate)
ITimeoutHandler.OnTimeout(TimeoutReason.ReadDataRate)
ITimeoutHandler.OnTimeout(TimeoutReason.ReadDataRate)
...

Stack trace
   at Moq.Mock.VerifyCalls(Mock targetMock, InvocationShape expectation, LambdaExpression expression, Times times, String failMessage)
   at Moq.Mock.Verify[T](Mock`1 mock, Expression`1 expression, Times times, String failMessage)
   at Moq.Mock`1.Verify(Expression`1 expression, Times times)
   at Moq.Mock`1.Verify(Expression`1 expression, Func`1 times)
   at Microsoft.AspNetCore.Server.Kestrel.Core.Tests.Http2TimeoutTests.DATA_Received_TooSlowlyOnSecondStream_AbortsConnectionAfterNonAdditiveRateTimeout() in /_/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TimeoutTests.cs:line 775
--- End of stack trace from previous location where exception was thrown ---

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. That makes sense. If you start reading a request body with TryRead, and then call ReadAsync, Kestrel will reset _readTimingBytesRead to 0 on the first ReadAsync call ignoring the bytes counted by TryRead.

I wonder how common using both TryRead and ReadAsync on the same RequestBody will be before we do it implicitly ReadAsyncInternal as part of this PR. We might want to backport this part of the change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Http1ContentLengthMessageBody need this?

Http1ChunkedEncodingMessageBody has it; but ContentLength doesn't have the same pattern (TryRead earlier and TryStop)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah; is correct TryStop is in CreateReadResultFromConnectionReadResult

Added isolated change for Http2 #13305


var hasResult = _context.RequestBodyPipe.Reader.TryRead(out readResult);

if (hasResult)
{
_readResult = readResult;

CountBytesRead(readResult.Buffer.Length);

if (readResult.IsCompleted)
{
TryStop();
}
}

return result;
return hasResult;
}

public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
{
internal class AsyncEnumerableReader : IValueTaskSource<int>
{
private readonly Action _onCompletedAction;

private ManualResetValueTaskSourceCore<int> _valueTaskSource;
private IAsyncEnumerable<int> _readerSource;
private IAsyncEnumerator<int> _reader;

public Memory<byte> Buffer { get; private set; }
public CancellationToken CancellationToken { get; private set; }

private ValueTaskAwaiter<bool> _readAwaiter;

private volatile bool _inProgress;
public bool InProgress => _inProgress;

public AsyncEnumerableReader()
{
_onCompletedAction = OnCompleted;
}

internal void Initialize(IAsyncEnumerable<int> readerSource)
{
_readerSource = readerSource;
_reader = readerSource.GetAsyncEnumerator();
}

public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
if (_readerSource is null)
{
ThrowNotInitialized();
}

if (_inProgress)
{
ThrowConcurrentReadsNotSupported();
}
_inProgress = true;

Buffer = buffer;
CancellationToken = cancellationToken;

var task = _reader.MoveNextAsync();
_readAwaiter = task.GetAwaiter();

return new ValueTask<int>(this, _valueTaskSource.Version);
}

int IValueTaskSource<int>.GetResult(short token)
{
var isValid = token == _valueTaskSource.Version;
try
{
return _valueTaskSource.GetResult(token);
}
finally
{
if (isValid)
{
Buffer = default;
CancellationToken = default;
_inProgress = false;
_valueTaskSource.Reset();
}
}
}

ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token)
=> _valueTaskSource.GetStatus(token);

void IValueTaskSource<int>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
if (!InProgress)
{
ThrowNoReadInProgress();
}

_valueTaskSource.OnCompleted(continuation, state, token, flags);

_readAwaiter.UnsafeOnCompleted(_onCompletedAction);
}

private void OnCompleted()
{
try
{
if (_readAwaiter.GetResult())
{
_valueTaskSource.SetResult(_reader.Current);
}
else
{
_valueTaskSource.SetResult(-1);
}
}
catch (Exception ex)
{
// If the GetResult throws for this ReadAsync (e.g. cancellation),
// that will cause all next ReadAsyncs to also throw, so we create
// a fresh unerrored AsyncEnumerable to restore the next ReadAsyncs
// to the normal flow
_reader = _readerSource.GetAsyncEnumerator();
_valueTaskSource.SetException(ex);
}
}

static void ThrowConcurrentReadsNotSupported()
{
throw new InvalidOperationException("Concurrent reads are not supported");
}

static void ThrowNoReadInProgress()
{
throw new InvalidOperationException("No read in progress, await will not complete");
}

static void ThrowNotInitialized()
{
throw new InvalidOperationException(nameof(AsyncEnumerableReader) + " has not been initialized");
}
}
}
Loading