Skip to content

Commit 9137a7f

Browse files
committed
Amortize HttpRequestStream & DuplexPipeStream ReadAsync calls
1 parent 5b2f3fb commit 9137a7f

File tree

4 files changed

+339
-31
lines changed

4 files changed

+339
-31
lines changed

src/Servers/Kestrel/Core/src/Internal/Http/HttpRequestStream.cs

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,21 @@
33

44
using System;
55
using System.Buffers;
6+
using System.Collections.Generic;
67
using System.IO;
78
using System.Threading;
89
using System.Threading.Tasks;
910
using Microsoft.AspNetCore.Connections;
1011
using Microsoft.AspNetCore.Http.Features;
12+
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
1113

1214
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
1315
{
1416
internal sealed class HttpRequestStream : Stream
1517
{
1618
private readonly HttpRequestPipeReader _pipeReader;
1719
private readonly IHttpBodyControlFeature _bodyControl;
20+
private AsyncEnumerableReader _asyncReader;
1821

1922
public HttpRequestStream(IHttpBodyControlFeature bodyControl, HttpRequestPipeReader pipeReader)
2023
{
@@ -44,12 +47,26 @@ public override int WriteTimeout
4447

4548
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
4649
{
47-
return ReadAsyncWrapper(destination, cancellationToken);
50+
try
51+
{
52+
return ReadAsyncInternal(destination, cancellationToken);
53+
}
54+
catch (ConnectionAbortedException ex)
55+
{
56+
throw new TaskCanceledException("The request was aborted", ex);
57+
}
4858
}
4959

5060
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
5161
{
52-
return ReadAsyncWrapper(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
62+
try
63+
{
64+
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
65+
}
66+
catch (ConnectionAbortedException ex)
67+
{
68+
throw new TaskCanceledException("The request was aborted", ex);
69+
}
5370
}
5471

5572
public override int Read(byte[] buffer, int offset, int count)
@@ -127,23 +144,78 @@ private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationTo
127144
return tcs.Task;
128145
}
129146

130-
private ValueTask<int> ReadAsyncWrapper(Memory<byte> destination, CancellationToken cancellationToken)
147+
private ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
131148
{
149+
if (_asyncReader?.InProgress ?? false)
150+
{
151+
// Throw if there are overlapping reads; throwing unwrapped as it suggests last read was not awaited
152+
// so we surface it directly rather than wrapped in a Task (as this one will likely also not be awaited).
153+
throw new InvalidOperationException("Concurrent reads are not supported; await the " + nameof(ValueTask<int>) + " before starting next read.");
154+
}
155+
132156
try
133157
{
134-
return ReadAsyncInternal(destination, cancellationToken);
158+
while (true)
159+
{
160+
if (!_pipeReader.TryRead(out var result))
161+
{
162+
break;
163+
}
164+
165+
if (result.IsCanceled)
166+
{
167+
throw new OperationCanceledException("The read was canceled");
168+
}
169+
170+
var readableBuffer = result.Buffer;
171+
var readableBufferLength = readableBuffer.Length;
172+
173+
var consumed = readableBuffer.End;
174+
var actual = 0;
175+
try
176+
{
177+
if (readableBufferLength != 0)
178+
{
179+
actual = (int)Math.Min(readableBufferLength, buffer.Length);
180+
181+
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
182+
consumed = slice.End;
183+
slice.CopyTo(buffer.Span);
184+
185+
return new ValueTask<int>(actual);
186+
}
187+
188+
if (result.IsCompleted)
189+
{
190+
return new ValueTask<int>(0);
191+
}
192+
}
193+
finally
194+
{
195+
_pipeReader.AdvanceTo(consumed);
196+
}
197+
}
135198
}
136-
catch (ConnectionAbortedException ex)
199+
catch (Exception ex)
137200
{
138-
throw new TaskCanceledException("The request was aborted", ex);
201+
return new ValueTask<int>(Task.FromException<int>(ex));
202+
}
203+
204+
var asyncReader = _asyncReader;
205+
if (asyncReader is null)
206+
{
207+
_asyncReader = asyncReader = new AsyncEnumerableReader();
208+
asyncReader.Initialize(ReadAsyncAwaited(asyncReader));
139209
}
210+
211+
return asyncReader.ReadAsync(buffer, cancellationToken);
140212
}
141213

142-
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
214+
private async IAsyncEnumerable<int> ReadAsyncAwaited(AsyncEnumerableReader reader)
143215
{
144216
while (true)
145217
{
146-
var result = await _pipeReader.ReadAsync(cancellationToken);
218+
var result = await _pipeReader.ReadAsync(reader.CancellationToken);
147219

148220
if (result.IsCanceled)
149221
{
@@ -154,30 +226,40 @@ private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, Cancellation
154226
var readableBufferLength = readableBuffer.Length;
155227

156228
var consumed = readableBuffer.End;
229+
var advanced = false;
157230
try
158231
{
159232
if (readableBufferLength != 0)
160233
{
161-
var actual = (int)Math.Min(readableBufferLength, buffer.Length);
234+
var actual = (int)Math.Min(readableBufferLength, reader.Buffer.Length);
162235

163236
var slice = actual == readableBufferLength ? readableBuffer : readableBuffer.Slice(0, actual);
164237
consumed = slice.End;
165-
slice.CopyTo(buffer.Span);
238+
slice.CopyTo(reader.Buffer.Span);
166239

167-
return actual;
240+
// Finally blocks in enumerators aren't excuted prior to the yield return,
241+
// so we advance here
242+
advanced = true;
243+
_pipeReader.AdvanceTo(consumed);
244+
yield return actual;
168245
}
169-
170-
if (result.IsCompleted)
246+
else if (result.IsCompleted)
171247
{
172-
return 0;
248+
// Finally blocks in enumerators aren't excuted prior to the yield return,
249+
// so we advance here
250+
advanced = true;
251+
_pipeReader.AdvanceTo(consumed);
252+
yield return 0;
173253
}
174254
}
175255
finally
176256
{
177-
_pipeReader.AdvanceTo(consumed);
257+
if (!advanced)
258+
{
259+
_pipeReader.AdvanceTo(consumed);
260+
}
178261
}
179262
}
180-
181263
}
182264

183265
/// <inheritdoc />

src/Servers/Kestrel/Core/src/Internal/Http2/Http2MessageBody.cs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,23 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami
6969

7070
public override bool TryRead(out ReadResult readResult)
7171
{
72-
var result = _context.RequestBodyPipe.Reader.TryRead(out readResult);
73-
_readResult = readResult;
74-
CountBytesRead(readResult.Buffer.Length);
72+
TryStart();
73+
74+
var hasResult = _context.RequestBodyPipe.Reader.TryRead(out readResult);
75+
76+
if (hasResult)
77+
{
78+
_readResult = readResult;
79+
80+
CountBytesRead(readResult.Buffer.Length);
81+
82+
if (readResult.IsCompleted)
83+
{
84+
TryStop();
85+
}
86+
}
7587

76-
return result;
88+
return hasResult;
7789
}
7890

7991
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Runtime.CompilerServices;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using System.Threading.Tasks.Sources;
10+
11+
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
12+
{
13+
internal class AsyncEnumerableReader : IValueTaskSource<int>
14+
{
15+
private readonly Action _onCompletedAction;
16+
17+
private ManualResetValueTaskSourceCore<int> _valueTaskSource;
18+
private IAsyncEnumerable<int> _readerSource;
19+
private IAsyncEnumerator<int> _reader;
20+
21+
public Memory<byte> Buffer { get; private set; }
22+
public CancellationToken CancellationToken { get; private set; }
23+
24+
private ValueTaskAwaiter<bool> _readAwaiter;
25+
26+
private volatile bool _inProgress;
27+
public bool InProgress => _inProgress;
28+
29+
public AsyncEnumerableReader()
30+
{
31+
_onCompletedAction = OnCompleted;
32+
}
33+
34+
internal void Initialize(IAsyncEnumerable<int> readerSource)
35+
{
36+
_readerSource = readerSource;
37+
_reader = readerSource.GetAsyncEnumerator();
38+
}
39+
40+
public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
41+
{
42+
if (_readerSource is null)
43+
{
44+
ThrowNotInitialized();
45+
}
46+
47+
if (_inProgress)
48+
{
49+
ThrowConcurrentReadsNotSupported();
50+
}
51+
_inProgress = true;
52+
53+
Buffer = buffer;
54+
CancellationToken = cancellationToken;
55+
56+
var task = _reader.MoveNextAsync();
57+
_readAwaiter = task.GetAwaiter();
58+
59+
return new ValueTask<int>(this, _valueTaskSource.Version);
60+
}
61+
62+
int IValueTaskSource<int>.GetResult(short token)
63+
{
64+
var isValid = token == _valueTaskSource.Version;
65+
try
66+
{
67+
return _valueTaskSource.GetResult(token);
68+
}
69+
finally
70+
{
71+
if (isValid)
72+
{
73+
Buffer = default;
74+
CancellationToken = default;
75+
_inProgress = false;
76+
_valueTaskSource.Reset();
77+
}
78+
}
79+
}
80+
81+
ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token)
82+
=> _valueTaskSource.GetStatus(token);
83+
84+
void IValueTaskSource<int>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
85+
{
86+
if (!InProgress)
87+
{
88+
ThrowNoReadInProgress();
89+
}
90+
91+
_valueTaskSource.OnCompleted(continuation, state, token, flags);
92+
93+
_readAwaiter.UnsafeOnCompleted(_onCompletedAction);
94+
}
95+
96+
private void OnCompleted()
97+
{
98+
try
99+
{
100+
if (_readAwaiter.GetResult())
101+
{
102+
_valueTaskSource.SetResult(_reader.Current);
103+
}
104+
else
105+
{
106+
_valueTaskSource.SetResult(-1);
107+
}
108+
}
109+
catch (Exception ex)
110+
{
111+
// If the GetResult throws for this ReadAsync (e.g. cancellation),
112+
// that will cause all next ReadAsyncs to also throw, so we create
113+
// a fresh unerrored AsyncEnumerable to restore the next ReadAsyncs
114+
// to the normal flow
115+
_reader = _readerSource.GetAsyncEnumerator();
116+
_valueTaskSource.SetException(ex);
117+
}
118+
}
119+
120+
static void ThrowConcurrentReadsNotSupported()
121+
{
122+
throw new InvalidOperationException("Concurrent reads are not supported");
123+
}
124+
125+
static void ThrowNoReadInProgress()
126+
{
127+
throw new InvalidOperationException("No read in progress, await will not complete");
128+
}
129+
130+
static void ThrowNotInitialized()
131+
{
132+
throw new InvalidOperationException(nameof(AsyncEnumerableReader) + " has not been initialized");
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)