Skip to content

Commit d8fb128

Browse files
committed
pr feedback
1 parent 1d754ff commit d8fb128

File tree

2 files changed

+48
-64
lines changed

2 files changed

+48
-64
lines changed

src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/BufferSegment.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,5 @@ internal static long GetLength(long startPosition, BufferSegment endSegment, int
126126
{
127127
return (endSegment.RunningIndex + (uint)endIndex) - startPosition;
128128
}
129-
130129
}
131130
}

src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/ConcurrentPipeWriter.cs

Lines changed: 48 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,17 @@ internal class ConcurrentPipeWriter : PipeWriter
3232
private int _tailBytesBuffered;
3333
private int _bytesBuffered;
3434

35-
// When _currentFlushTcs is null, the ConcurrentPipeWriter is in pass-through mode. When it's non-null, it means either:
35+
// When _currentFlushTcs is null and _head/_tail is also null, the ConcurrentPipeWriter is in passthrough mode.
36+
// When the ConcurrentPipeWriter is not in passthrough mode, that could be for one of two reasons:
3637
//
37-
// 1. A flush is in progress
38-
// 2. Or the last flush completed between calls to GetMemory/Span() and Advance().
38+
// 1. A flush of the _innerPipeWriter is in progress.
39+
// 2. Or the last flush of the _innerPipeWriter completed between external calls to GetMemory/Span() and Advance().
3940
//
40-
// In either case, we need to manually append buffer segments until _innerPipeWriter.FlushAsync() completes without any
41-
// other committed data remaining and without any calls to Advance() pending. This logic is borrowed from StreamPipeWriter.
41+
// In either case, we need to manually append buffer segments until the loop in the current or next call to FlushAsync()
42+
// flushes all the buffers putting the ConcurrentPipeWriter back into passthrough mode.
43+
// The manual buffer appending logic is borrowed from corefx's StreamPipeWriter.
4244
private TaskCompletionSource<FlushResult> _currentFlushTcs;
43-
private bool _nonPassThroughAdvancePending;
44-
private bool _isFlushing;
45+
private bool _bufferedWritePending;
4546

4647
// We're trusting the Http2FrameWriter to not call into the PipeWriter after calling abort, we don't validate this.
4748
// We will however clean up after any ongoing flush, assuming a flush is in progress.
@@ -53,11 +54,39 @@ public ConcurrentPipeWriter(PipeWriter innerPipeWriter, MemoryPool<byte> pool)
5354
_pool = pool;
5455
}
5556

57+
public override Memory<byte> GetMemory(int sizeHint = 0)
58+
{
59+
lock (_sync)
60+
{
61+
if (_currentFlushTcs == null && _head == null)
62+
{
63+
return _innerPipeWriter.GetMemory(sizeHint);
64+
}
65+
66+
AllocateMemoryUnsynchronized(sizeHint);
67+
return _tailMemory;
68+
}
69+
}
70+
71+
public override Span<byte> GetSpan(int sizeHint = 0)
72+
{
73+
lock (_sync)
74+
{
75+
if (_currentFlushTcs == null && _head == null)
76+
{
77+
return _innerPipeWriter.GetSpan(sizeHint);
78+
}
79+
80+
AllocateMemoryUnsynchronized(sizeHint);
81+
return _tailMemory.Span;
82+
}
83+
}
84+
5685
public override void Advance(int bytes)
5786
{
5887
lock (_sync)
5988
{
60-
if (_currentFlushTcs == null)
89+
if (_currentFlushTcs == null && _head == null)
6190
{
6291
_innerPipeWriter.Advance(bytes);
6392
return;
@@ -71,7 +100,7 @@ public override void Advance(int bytes)
71100
_tailBytesBuffered += bytes;
72101
_bytesBuffered += bytes;
73102
_tailMemory = _tailMemory.Slice(bytes);
74-
_nonPassThroughAdvancePending = false;
103+
_bufferedWritePending = false;
75104
}
76105
}
77106

@@ -99,7 +128,7 @@ public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellation
99128
{
100129
lock (_sync)
101130
{
102-
if (_isFlushing)
131+
if (_currentFlushTcs != null)
103132
{
104133
return new ValueTask<FlushResult>(_currentFlushTcs.Task);
105134
}
@@ -121,11 +150,8 @@ public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellation
121150
return flushTask;
122151
}
123152

124-
_isFlushing = true;
125-
126153
// Use a TCS instead of something resettable so it can be awaited by multiple awaiters.
127-
// _currentFlushTcs might already be set if the last flush that completed, completed between a call to GetMemory() and Advance().
128-
_currentFlushTcs ??= new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously);
154+
_currentFlushTcs = new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously);
129155
return FlushAsyncAwaited(flushTask, cancellationToken);
130156
}
131157
}
@@ -134,12 +160,12 @@ private async ValueTask<FlushResult> FlushAsyncAwaited(ValueTask<FlushResult> fl
134160
{
135161
try
136162
{
137-
var flushResult = await flushTask;
138-
139163
// This while (true) does look scary, but the real continuation condition is at the start of the loop
140-
// so the _sync lock can be acquired.
164+
// after the await, so the _sync lock can be acquired.
141165
while (true)
142166
{
167+
var flushResult = await flushTask;
168+
143169
lock (_sync)
144170
{
145171
if (_bytesBuffered == 0)
@@ -152,8 +178,6 @@ private async ValueTask<FlushResult> FlushAsyncAwaited(ValueTask<FlushResult> fl
152178

153179
flushTask = _innerPipeWriter.FlushAsync(cancellationToken);
154180
}
155-
156-
flushResult = await flushTask;
157181
}
158182
}
159183
catch (Exception ex)
@@ -169,35 +193,6 @@ private async ValueTask<FlushResult> FlushAsyncAwaited(ValueTask<FlushResult> fl
169193
}
170194
}
171195

172-
public override Memory<byte> GetMemory(int sizeHint = 0)
173-
{
174-
lock (_sync)
175-
{
176-
if (_currentFlushTcs == null)
177-
{
178-
return _innerPipeWriter.GetMemory(sizeHint);
179-
}
180-
181-
182-
AllocateMemoryUnsynchronized(sizeHint);
183-
return _tailMemory;
184-
}
185-
}
186-
187-
public override Span<byte> GetSpan(int sizeHint = 0)
188-
{
189-
lock (_sync)
190-
{
191-
if (_currentFlushTcs == null)
192-
{
193-
return _innerPipeWriter.GetSpan(sizeHint);
194-
}
195-
196-
AllocateMemoryUnsynchronized(sizeHint);
197-
return _tailMemory.Span;
198-
}
199-
}
200-
201196
// This is not exposed to end users. Throw so we find out if we ever start calling this.
202197
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
203198
{
@@ -211,7 +206,7 @@ public void Abort()
211206
_aborted = true;
212207

213208
// If we're flushing, the cleanup will happen after the flush.
214-
if (!_isFlushing)
209+
if (_currentFlushTcs == null)
215210
{
216211
CleanupUnsynchronized();
217212
}
@@ -231,7 +226,6 @@ private void CleanupUnsynchronized()
231226
_head = null;
232227
_tail = null;
233228
_tailMemory = null;
234-
_currentFlushTcs = null;
235229
}
236230

237231
private void CopyAndReturnSegmentsUnsynchronized()
@@ -263,7 +257,7 @@ private void CopyAndReturnSegmentsUnsynchronized()
263257
segment = segment.NextSegment;
264258
}
265259

266-
if (_nonPassThroughAdvancePending)
260+
if (_bufferedWritePending)
267261
{
268262
// If an advance is pending, so is a flush, so the _tail segment should still get returned eventually.
269263
_head = _tail;
@@ -281,8 +275,6 @@ private void CopyAndReturnSegmentsUnsynchronized()
281275

282276
private void CompleteFlushUnsynchronized(FlushResult flushResult, Exception flushEx)
283277
{
284-
_isFlushing = false;
285-
286278
if (flushEx != null)
287279
{
288280
_currentFlushTcs.SetException(flushEx);
@@ -292,25 +284,18 @@ private void CompleteFlushUnsynchronized(FlushResult flushResult, Exception flus
292284
_currentFlushTcs.SetResult(flushResult);
293285
}
294286

287+
_currentFlushTcs = null;
288+
295289
if (_aborted)
296290
{
297291
CleanupUnsynchronized();
298292
}
299-
else if (_nonPassThroughAdvancePending)
300-
{
301-
// If there's still another non-passthrough call to Advance pending, we cannot yet switch back to passthrough mode.
302-
_currentFlushTcs = new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously);
303-
}
304-
else
305-
{
306-
_currentFlushTcs = null;
307-
}
308293
}
309294

310295
// The methods below were copied from https://github.com/dotnet/corefx/blob/de3902bb56f1254ec1af4bf7d092fc2c048734cc/src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
311296
private void AllocateMemoryUnsynchronized(int sizeHint)
312297
{
313-
_nonPassThroughAdvancePending = true;
298+
_bufferedWritePending = true;
314299

315300
if (_head == null)
316301
{

0 commit comments

Comments
 (0)