-
Notifications
You must be signed in to change notification settings - Fork 10.4k
Amortize HttpRequestStream & DuplexPipeStream ReadAsync calls #11942
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,18 +3,21 @@ | |
|
||
using System; | ||
using System.Buffers; | ||
using System.Collections.Generic; | ||
using System.IO; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.AspNetCore.Connections; | ||
using Microsoft.AspNetCore.Http.Features; | ||
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure; | ||
|
||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http | ||
{ | ||
internal sealed class HttpRequestStream : Stream | ||
{ | ||
private readonly HttpRequestPipeReader _pipeReader; | ||
private readonly IHttpBodyControlFeature _bodyControl; | ||
private AsyncEnumerableReader _asyncReader; | ||
|
||
public HttpRequestStream(IHttpBodyControlFeature bodyControl, HttpRequestPipeReader pipeReader) | ||
{ | ||
|
@@ -44,12 +47,26 @@ public override int WriteTimeout | |
|
||
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default) | ||
{ | ||
return ReadAsyncWrapper(destination, cancellationToken); | ||
try | ||
{ | ||
return ReadAsyncInternal(destination, cancellationToken); | ||
} | ||
catch (ConnectionAbortedException ex) | ||
{ | ||
throw new TaskCanceledException("The request was aborted", ex); | ||
} | ||
} | ||
|
||
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | ||
{ | ||
return ReadAsyncWrapper(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask(); | ||
try | ||
{ | ||
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask(); | ||
} | ||
catch (ConnectionAbortedException ex) | ||
{ | ||
throw new TaskCanceledException("The request was aborted", ex); | ||
} | ||
} | ||
|
||
public override int Read(byte[] buffer, int offset, int count) | ||
|
@@ -127,23 +144,78 @@ private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationTo | |
return tcs.Task; | ||
} | ||
|
||
private ValueTask<int> ReadAsyncWrapper(Memory<byte> destination, CancellationToken cancellationToken) | ||
private ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken) | ||
{ | ||
if (_asyncReader?.InProgress ?? false) | ||
{ | ||
// Throw if there are overlapping reads; throwing unwrapped as it suggests last read was not awaited | ||
// so we surface it directly rather than wrapped in a Task (as this one will likely also not be awaited). | ||
throw new InvalidOperationException("Concurrent reads are not supported; await the " + nameof(ValueTask<int>) + " before starting next read."); | ||
} | ||
|
||
try | ||
{ | ||
return ReadAsyncInternal(destination, cancellationToken); | ||
while (true) | ||
{ | ||
if (!_pipeReader.TryRead(out var result)) | ||
{ | ||
break; | ||
} | ||
|
||
if (result.IsCanceled) | ||
{ | ||
throw new OperationCanceledException("The read was canceled"); | ||
} | ||
|
||
var readableBuffer = result.Buffer; | ||
var readableBufferLength = readableBuffer.Length; | ||
|
||
var consumed = readableBuffer.End; | ||
var actual = 0; | ||
try | ||
{ | ||
if (readableBufferLength != 0) | ||
{ | ||
actual = (int)Math.Min(readableBufferLength, buffer.Length); | ||
|
||
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual); | ||
consumed = slice.End; | ||
slice.CopyTo(buffer.Span); | ||
|
||
return new ValueTask<int>(actual); | ||
} | ||
|
||
if (result.IsCompleted) | ||
{ | ||
return new ValueTask<int>(0); | ||
} | ||
} | ||
finally | ||
{ | ||
_pipeReader.AdvanceTo(consumed); | ||
} | ||
} | ||
} | ||
catch (ConnectionAbortedException ex) | ||
catch (Exception ex) | ||
{ | ||
throw new TaskCanceledException("The request was aborted", ex); | ||
return new ValueTask<int>(Task.FromException<int>(ex)); | ||
} | ||
|
||
var asyncReader = _asyncReader; | ||
if (asyncReader is null) | ||
{ | ||
_asyncReader = asyncReader = new AsyncEnumerableReader(); | ||
benaadams marked this conversation as resolved.
Show resolved
Hide resolved
|
||
asyncReader.Initialize(ReadAsyncAwaited(asyncReader)); | ||
} | ||
|
||
return asyncReader.ReadAsync(buffer, cancellationToken); | ||
} | ||
|
||
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken) | ||
private async IAsyncEnumerable<int> ReadAsyncAwaited(AsyncEnumerableReader reader) | ||
{ | ||
while (true) | ||
{ | ||
var result = await _pipeReader.ReadAsync(cancellationToken); | ||
var result = await _pipeReader.ReadAsync(reader.CancellationToken); | ||
|
||
if (result.IsCanceled) | ||
{ | ||
|
@@ -154,30 +226,40 @@ private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, Cancellation | |
var readableBufferLength = readableBuffer.Length; | ||
|
||
var consumed = readableBuffer.End; | ||
var advanced = false; | ||
try | ||
{ | ||
if (readableBufferLength != 0) | ||
{ | ||
var actual = (int)Math.Min(readableBufferLength, buffer.Length); | ||
var actual = (int)Math.Min(readableBufferLength, reader.Buffer.Length); | ||
|
||
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual); | ||
consumed = slice.End; | ||
slice.CopyTo(buffer.Span); | ||
slice.CopyTo(reader.Buffer.Span); | ||
|
||
return actual; | ||
// Finally blocks in enumerators aren't excuted prior to the yield return, | ||
// so we advance here | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm... that seems like it will be a bug farm in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is that bad. The finally still runs given an uncaught exception or the completion of the method. I think it would be more weird if the finally did run for every yield return. |
||
advanced = true; | ||
_pipeReader.AdvanceTo(consumed); | ||
yield return actual; | ||
} | ||
|
||
if (result.IsCompleted) | ||
else if (result.IsCompleted) | ||
{ | ||
return 0; | ||
// Finally blocks in enumerators aren't excuted prior to the yield return, | ||
// so we advance here | ||
advanced = true; | ||
_pipeReader.AdvanceTo(consumed); | ||
yield return 0; | ||
} | ||
} | ||
finally | ||
{ | ||
_pipeReader.AdvanceTo(consumed); | ||
if (!advanced) | ||
{ | ||
_pipeReader.AdvanceTo(consumed); | ||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
/// <inheritdoc /> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,11 +69,23 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami | |
|
||
public override bool TryRead(out ReadResult readResult) | ||
{ | ||
var result = _context.RequestBodyPipe.Reader.TryRead(out readResult); | ||
_readResult = readResult; | ||
CountBytesRead(readResult.Buffer.Length); | ||
TryStart(); | ||
jkotalik marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @halter73 note: had to include this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It definitely feels better to call TryStart here. It does important stuff that isn't even timing related like checking that the request body size isn't larger than what's allowed. What test failures were you seeing specifically. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One example was the following though there were a couple different types
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah. That makes sense. If you start reading a request body with TryRead, and then call ReadAsync, Kestrel will reset _readTimingBytesRead to 0 on the first ReadAsync call ignoring the bytes counted by TryRead. I wonder how common using both TryRead and ReadAsync on the same RequestBody will be before we do it implicitly ReadAsyncInternal as part of this PR. We might want to backport this part of the change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah; is correct Added isolated change for Http2 #13305 |
||
|
||
var hasResult = _context.RequestBodyPipe.Reader.TryRead(out readResult); | ||
|
||
if (hasResult) | ||
{ | ||
_readResult = readResult; | ||
|
||
CountBytesRead(readResult.Buffer.Length); | ||
|
||
if (readResult.IsCompleted) | ||
{ | ||
TryStop(); | ||
} | ||
} | ||
|
||
return result; | ||
return hasResult; | ||
} | ||
|
||
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Runtime.CompilerServices; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using System.Threading.Tasks.Sources; | ||
|
||
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure | ||
{ | ||
internal class AsyncEnumerableReader : IValueTaskSource<int> | ||
{ | ||
private readonly Action _onCompletedAction; | ||
|
||
private ManualResetValueTaskSourceCore<int> _valueTaskSource; | ||
private IAsyncEnumerable<int> _readerSource; | ||
private IAsyncEnumerator<int> _reader; | ||
|
||
public Memory<byte> Buffer { get; private set; } | ||
public CancellationToken CancellationToken { get; private set; } | ||
|
||
private ValueTaskAwaiter<bool> _readAwaiter; | ||
|
||
private volatile bool _inProgress; | ||
public bool InProgress => _inProgress; | ||
|
||
public AsyncEnumerableReader() | ||
{ | ||
_onCompletedAction = OnCompleted; | ||
} | ||
|
||
internal void Initialize(IAsyncEnumerable<int> readerSource) | ||
{ | ||
_readerSource = readerSource; | ||
_reader = readerSource.GetAsyncEnumerator(); | ||
} | ||
|
||
public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken) | ||
{ | ||
if (_readerSource is null) | ||
{ | ||
ThrowNotInitialized(); | ||
} | ||
|
||
if (_inProgress) | ||
{ | ||
ThrowConcurrentReadsNotSupported(); | ||
} | ||
_inProgress = true; | ||
|
||
Buffer = buffer; | ||
CancellationToken = cancellationToken; | ||
|
||
var task = _reader.MoveNextAsync(); | ||
_readAwaiter = task.GetAwaiter(); | ||
|
||
return new ValueTask<int>(this, _valueTaskSource.Version); | ||
} | ||
|
||
int IValueTaskSource<int>.GetResult(short token) | ||
{ | ||
var isValid = token == _valueTaskSource.Version; | ||
try | ||
{ | ||
return _valueTaskSource.GetResult(token); | ||
} | ||
finally | ||
{ | ||
if (isValid) | ||
{ | ||
Buffer = default; | ||
CancellationToken = default; | ||
_inProgress = false; | ||
_valueTaskSource.Reset(); | ||
} | ||
} | ||
} | ||
|
||
ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token) | ||
=> _valueTaskSource.GetStatus(token); | ||
|
||
void IValueTaskSource<int>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) | ||
{ | ||
if (!InProgress) | ||
{ | ||
ThrowNoReadInProgress(); | ||
} | ||
|
||
_valueTaskSource.OnCompleted(continuation, state, token, flags); | ||
|
||
_readAwaiter.UnsafeOnCompleted(_onCompletedAction); | ||
} | ||
|
||
private void OnCompleted() | ||
{ | ||
try | ||
{ | ||
if (_readAwaiter.GetResult()) | ||
{ | ||
_valueTaskSource.SetResult(_reader.Current); | ||
} | ||
else | ||
{ | ||
_valueTaskSource.SetResult(-1); | ||
} | ||
} | ||
catch (Exception ex) | ||
{ | ||
// If the GetResult throws for this ReadAsync (e.g. cancellation), | ||
// that will cause all next ReadAsyncs to also throw, so we create | ||
// a fresh unerrored AsyncEnumerable to restore the next ReadAsyncs | ||
// to the normal flow | ||
_reader = _readerSource.GetAsyncEnumerator(); | ||
_valueTaskSource.SetException(ex); | ||
} | ||
} | ||
|
||
static void ThrowConcurrentReadsNotSupported() | ||
{ | ||
throw new InvalidOperationException("Concurrent reads are not supported"); | ||
} | ||
|
||
static void ThrowNoReadInProgress() | ||
{ | ||
throw new InvalidOperationException("No read in progress, await will not complete"); | ||
} | ||
|
||
static void ThrowNotInitialized() | ||
{ | ||
throw new InvalidOperationException(nameof(AsyncEnumerableReader) + " has not been initialized"); | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.