Skip to content

Commit e727101

Browse files
authored
Work around potential race in PipeWriter Redux (#11065)
- Make sure we always await the last flush task before calling FlushAsync again instead of preemptively calling FlushAsync and checking to see if the ValueTask is incomplete before bothering to acquire the _flushLock - This now acquires the _flushLock fore every call to Response.Body.Write whereas this only happened for truly async writes before. - I don't think this is a big concern since this should normally be uncontested, and DefaultPipeWriter.FlushAsync/GetResult already acquire a lock.
1 parent 9023696 commit e727101

24 files changed

+270
-60
lines changed

src/Servers/Kestrel/Core/src/Internal/Infrastructure/TimingPipeFlusher.cs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@ internal class TimingPipeFlusher
2222
private readonly IKestrelTrace _log;
2323

2424
private readonly object _flushLock = new object();
25-
private Task<FlushResult> _lastFlushTask = null;
25+
26+
// This field should only be get or set under the _flushLock. This is a ValueTask that was either:
27+
// 1. The default value where "IsCompleted" is true
28+
// 2. Created by an async method
29+
// 3. Constructed explicitely from a completed result
30+
// This means it should be safe to await a single _lastFlushTask instance multiple times.
31+
private ValueTask<FlushResult> _lastFlushTask;
2632

2733
public TimingPipeFlusher(
2834
PipeWriter writer,
@@ -51,35 +57,41 @@ public ValueTask<FlushResult> FlushAsync(MinDataRate minRate, long count)
5157

5258
public ValueTask<FlushResult> FlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
5359
{
54-
var flushValueTask = _writer.FlushAsync(cancellationToken);
60+
// https://github.com/dotnet/corefxlab/issues/1334
61+
// Pipelines don't support multiple awaiters on flush.
62+
lock (_flushLock)
63+
{
64+
if (_lastFlushTask.IsCompleted)
65+
{
66+
_lastFlushTask = TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
67+
}
68+
else
69+
{
70+
_lastFlushTask = AwaitLastFlushAndTimeFlushAsync(_lastFlushTask, minRate, count, outputAborter, cancellationToken);
71+
}
72+
73+
return _lastFlushTask;
74+
}
75+
}
76+
77+
private ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
78+
{
79+
var pipeFlushTask = _writer.FlushAsync(cancellationToken);
5580

5681
if (minRate != null)
5782
{
5883
_timeoutControl.BytesWrittenToBuffer(minRate, count);
5984
}
6085

61-
if (flushValueTask.IsCompletedSuccessfully)
86+
if (pipeFlushTask.IsCompletedSuccessfully)
6287
{
63-
return new ValueTask<FlushResult>(flushValueTask.Result);
88+
return new ValueTask<FlushResult>(pipeFlushTask.Result);
6489
}
6590

66-
// https://github.com/dotnet/corefxlab/issues/1334
67-
// Pipelines don't support multiple awaiters on flush.
68-
// While it's acceptable to call PipeWriter.FlushAsync again before the last FlushAsync completes,
69-
// it is not acceptable to attach a new continuation (via await, AsTask(), etc..). In this case,
70-
// we find previous flush Task which still accounts for any newly committed bytes and await that.
71-
lock (_flushLock)
72-
{
73-
if (_lastFlushTask == null || _lastFlushTask.IsCompleted)
74-
{
75-
_lastFlushTask = flushValueTask.AsTask();
76-
}
77-
78-
return TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
79-
}
91+
return TimeFlushAsyncAwaited(pipeFlushTask, minRate, count, outputAborter, cancellationToken);
8092
}
8193

82-
private async ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
94+
private async ValueTask<FlushResult> TimeFlushAsyncAwaited(ValueTask<FlushResult> pipeFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
8395
{
8496
if (minRate != null)
8597
{
@@ -88,7 +100,7 @@ private async ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long co
88100

89101
try
90102
{
91-
return await _lastFlushTask;
103+
return await pipeFlushTask;
92104
}
93105
catch (OperationCanceledException ex) when (outputAborter != null)
94106
{
@@ -111,5 +123,11 @@ private async ValueTask<FlushResult> TimeFlushAsync(MinDataRate minRate, long co
111123

112124
return default;
113125
}
126+
127+
private async ValueTask<FlushResult> AwaitLastFlushAndTimeFlushAsync(ValueTask<FlushResult> lastFlushTask, MinDataRate minRate, long count, IHttpOutputAborter outputAborter, CancellationToken cancellationToken)
128+
{
129+
await lastFlushTask;
130+
return await TimeFlushAsync(minRate, count, outputAborter, cancellationToken);
131+
}
114132
}
115133
}

src/Servers/Kestrel/Core/test/Http1ConnectionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class Http1ConnectionTests : IDisposable
4242

4343
public Http1ConnectionTests()
4444
{
45-
_pipelineFactory = MemoryPoolFactory.Create();
45+
_pipelineFactory = SlabMemoryPoolFactory.Create();
4646
var options = new PipeOptions(_pipelineFactory, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
4747
var pair = DuplexPipe.CreateConnectionPair(options, options);
4848

src/Servers/Kestrel/Core/test/HttpResponseHeadersTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class HttpResponseHeadersTests
2121
[Fact]
2222
public void InitialDictionaryIsEmpty()
2323
{
24-
using (var memoryPool = MemoryPoolFactory.Create())
24+
using (var memoryPool = SlabMemoryPoolFactory.Create())
2525
{
2626
var options = new PipeOptions(memoryPool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
2727
var pair = DuplexPipe.CreateConnectionPair(options, options);

src/Servers/Kestrel/Core/test/OutputProducerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class OutputProducerTests : IDisposable
2121

2222
public OutputProducerTests()
2323
{
24-
_memoryPool = MemoryPoolFactory.Create();
24+
_memoryPool = SlabMemoryPoolFactory.Create();
2525
}
2626

2727
public void Dispose()

src/Servers/Kestrel/Core/test/PipelineExtensionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class PipelineExtensionTests : IDisposable
1616
private const int _ulongMaxValueLength = 20;
1717

1818
private readonly Pipe _pipe;
19-
private readonly MemoryPool<byte> _memoryPool = MemoryPoolFactory.Create();
19+
private readonly MemoryPool<byte> _memoryPool = SlabMemoryPoolFactory.Create();
2020

2121
public PipelineExtensionTests()
2222
{

src/Servers/Kestrel/Core/test/StartLineTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ public void AuthorityForms(string rawTarget, string path, string query)
499499

500500
public StartLineTests()
501501
{
502-
MemoryPool = MemoryPoolFactory.Create();
502+
MemoryPool = SlabMemoryPoolFactory.Create();
503503
var options = new PipeOptions(MemoryPool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
504504
var pair = DuplexPipe.CreateConnectionPair(options, options);
505505
Transport = pair.Transport;

src/Servers/Kestrel/Core/test/TestHelpers/TestInput.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class TestInput : IDisposable
2323

2424
public TestInput()
2525
{
26-
_memoryPool = MemoryPoolFactory.Create();
26+
_memoryPool = SlabMemoryPoolFactory.Create();
2727
var options = new PipeOptions(pool: _memoryPool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false);
2828
var pair = DuplexPipe.CreateConnectionPair(options, options);
2929
Transport = pair.Transport;
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.IO.Pipelines;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
8+
using Moq;
9+
using Xunit;
10+
11+
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
12+
{
13+
public class TimingPipeFlusherTests
14+
{
15+
[Fact]
16+
public async Task IfFlushIsCalledAgainBeforeTheLastFlushCompletedItWaitsForTheLastCall()
17+
{
18+
var mockPipeWriter = new Mock<PipeWriter>();
19+
var pipeWriterFlushTcsArray = new[] {
20+
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
21+
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
22+
new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously),
23+
};
24+
var pipeWriterFlushCallCount = 0;
25+
26+
mockPipeWriter.Setup(p => p.FlushAsync(CancellationToken.None)).Returns(() =>
27+
{
28+
return new ValueTask<FlushResult>(pipeWriterFlushTcsArray[pipeWriterFlushCallCount++].Task);
29+
});
30+
31+
var timingPipeFlusher = new TimingPipeFlusher(mockPipeWriter.Object, null, null);
32+
33+
var flushTask0 = timingPipeFlusher.FlushAsync();
34+
var flushTask1 = timingPipeFlusher.FlushAsync();
35+
var flushTask2 = timingPipeFlusher.FlushAsync();
36+
37+
Assert.False(flushTask0.IsCompleted);
38+
Assert.False(flushTask1.IsCompleted);
39+
Assert.False(flushTask2.IsCompleted);
40+
Assert.Equal(1, pipeWriterFlushCallCount);
41+
42+
pipeWriterFlushTcsArray[0].SetResult(default);
43+
await flushTask0.AsTask().DefaultTimeout();
44+
45+
Assert.True(flushTask0.IsCompleted);
46+
Assert.False(flushTask1.IsCompleted);
47+
Assert.False(flushTask2.IsCompleted);
48+
Assert.True(pipeWriterFlushCallCount <= 2);
49+
50+
pipeWriterFlushTcsArray[1].SetResult(default);
51+
await flushTask1.AsTask().DefaultTimeout();
52+
53+
Assert.True(flushTask0.IsCompleted);
54+
Assert.True(flushTask1.IsCompleted);
55+
Assert.False(flushTask2.IsCompleted);
56+
Assert.True(pipeWriterFlushCallCount <= 3);
57+
58+
pipeWriterFlushTcsArray[2].SetResult(default);
59+
await flushTask2.AsTask().DefaultTimeout();
60+
61+
Assert.True(flushTask0.IsCompleted);
62+
Assert.True(flushTask1.IsCompleted);
63+
Assert.True(flushTask2.IsCompleted);
64+
Assert.Equal(3, pipeWriterFlushCallCount);
65+
}
66+
}
67+
}

src/Servers/Kestrel/Transport.Libuv/src/LibuvTransportOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class LibuvTransportOptions
3131

3232
public long? MaxWriteBufferSize { get; set; } = 64 * 1024;
3333

34-
internal Func<MemoryPool<byte>> MemoryPoolFactory { get; set; } = System.Buffers.MemoryPoolFactory.Create;
34+
internal Func<MemoryPool<byte>> MemoryPoolFactory { get; set; } = System.Buffers.SlabMemoryPoolFactory.Create;
3535

3636
private static int ProcessorThreadCount
3737
{

src/Servers/Kestrel/Transport.Libuv/test/LibuvOutputConsumerTests.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class LibuvOutputConsumerTests : IDisposable
4141

4242
public LibuvOutputConsumerTests()
4343
{
44-
_memoryPool = MemoryPoolFactory.Create();
44+
_memoryPool = SlabMemoryPoolFactory.Create();
4545
_mockLibuv = new MockLibuv();
4646

4747
var context = new TestLibuvTransportContext();
@@ -560,15 +560,15 @@ await Task.Run(async () =>
560560
Assert.False(task1Waits.IsFaulted);
561561

562562
// following tasks should wait.
563-
var task3Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token);
563+
var task2Canceled = outputProducer.WriteDataAsync(fullBuffer, cancellationToken: abortedSource.Token);
564564

565565
// Give time for tasks to percolate
566566
await _mockLibuv.OnPostTask;
567567

568-
// Third task is not completed
569-
Assert.False(task3Canceled.IsCompleted);
570-
Assert.False(task3Canceled.IsCanceled);
571-
Assert.False(task3Canceled.IsFaulted);
568+
// Second task is not completed
569+
Assert.False(task2Canceled.IsCompleted);
570+
Assert.False(task2Canceled.IsCanceled);
571+
Assert.False(task2Canceled.IsFaulted);
572572

573573
abortedSource.Cancel();
574574

@@ -578,29 +578,29 @@ await Task.Run(async () =>
578578
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
579579
}
580580

581-
// First task is completed
581+
// First task is completed
582582
Assert.True(task1Waits.IsCompleted);
583583
Assert.False(task1Waits.IsCanceled);
584584
Assert.False(task1Waits.IsFaulted);
585585

586-
// A final write guarantees that the error is observed by OutputProducer,
587-
// but doesn't return a canceled/faulted task.
588-
var task4Success = outputProducer.WriteDataAsync(fullBuffer);
589-
Assert.True(task4Success.IsCompleted);
590-
Assert.False(task4Success.IsCanceled);
591-
Assert.False(task4Success.IsFaulted);
586+
// Second task is now canceled
587+
await Assert.ThrowsAsync<OperationCanceledException>(() => task2Canceled);
588+
Assert.True(task2Canceled.IsCanceled);
592589

593-
// Third task is now canceled
594-
await Assert.ThrowsAsync<OperationCanceledException>(() => task3Canceled);
595-
Assert.True(task3Canceled.IsCanceled);
590+
// A final write can still succeed.
591+
var task3Success = outputProducer.WriteDataAsync(fullBuffer);
596592

597593
await _mockLibuv.OnPostTask;
598594

599-
// Complete the 4th write
595+
// Complete the 3rd write
600596
while (completeQueue.TryDequeue(out var triggerNextCompleted))
601597
{
602598
await _libuvThread.PostAsync(cb => cb(0), triggerNextCompleted);
603599
}
600+
601+
Assert.True(task3Success.IsCompleted);
602+
Assert.False(task3Success.IsCanceled);
603+
Assert.False(task3Success.IsFaulted);;
604604
}
605605
});
606606
}

0 commit comments

Comments
 (0)