Skip to content

Commit 82ca681

Browse files
authored
Add an option to do zero byte reads on StreamPipeReader (#49117)
- Added UseZeroByteReads to StreamPipeReaderOptions that allows not allocating a buffer by doing a zero byte read on the underlying Stream before the internal buffer is allocated.
1 parent 09d0d04 commit 82ca681

File tree

4 files changed

+57
-20
lines changed

4 files changed

+57
-20
lines changed

src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,13 @@ public static partial class StreamPipeExtensions
9494
}
9595
public partial class StreamPipeReaderOptions
9696
{
97-
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
97+
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) { }
98+
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false) { }
9899
public int BufferSize { get { throw null; } }
99100
public bool LeaveOpen { get { throw null; } }
100101
public int MinimumReadSize { get { throw null; } }
101102
public System.Buffers.MemoryPool<byte> Pool { get { throw null; } }
103+
public bool UseZeroByteReads { get { throw null; } }
102104
}
103105
public partial class StreamPipeWriterOptions
104106
{

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ internal class StreamPipeReader : PipeReader
1313
internal const int InitialSegmentPoolSize = 4; // 16K
1414
internal const int MaxSegmentPoolSize = 256; // 1MB
1515

16-
private readonly int _bufferSize;
17-
private readonly int _minimumReadThreshold;
18-
private readonly MemoryPool<byte>? _pool;
19-
2016
private CancellationTokenSource? _internalTokenSource;
2117
private bool _isReaderCompleted;
2218
private bool _isStreamCompleted;
@@ -31,7 +27,8 @@ internal class StreamPipeReader : PipeReader
3127

3228
// Mutable struct! Don't make this readonly
3329
private BufferSegmentStack _bufferSegmentPool;
34-
private readonly bool _leaveOpen;
30+
31+
private StreamPipeReaderOptions _options;
3532

3633
/// <summary>
3734
/// Creates a new StreamPipeReader.
@@ -47,13 +44,17 @@ public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
4744
throw new ArgumentNullException(nameof(options));
4845
}
4946

47+
_options = options;
5048
_bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
51-
_minimumReadThreshold = Math.Min(options.MinimumReadSize, options.BufferSize);
52-
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
53-
_bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize);
54-
_leaveOpen = options.LeaveOpen;
5549
}
5650

51+
// All derived from the options
52+
private bool LeaveOpen => _options.LeaveOpen;
53+
private bool UseZeroByteReads => _options.UseZeroByteReads;
54+
private int BufferSize => _options.BufferSize;
55+
private int MinimumReadThreshold => _options.MinimumReadSize;
56+
private MemoryPool<byte> Pool => _options.Pool;
57+
5758
/// <summary>
5859
/// Gets the inner stream that is being read from.
5960
/// </summary>
@@ -180,7 +181,7 @@ public override void Complete(Exception? exception = null)
180181
returnSegment.ResetMemory();
181182
}
182183

183-
if (!_leaveOpen)
184+
if (!LeaveOpen)
184185
{
185186
InnerStream.Dispose();
186187
}
@@ -215,6 +216,13 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
215216
var isCanceled = false;
216217
try
217218
{
219+
// This optimization only makes sense if we don't have anything buffered
220+
if (UseZeroByteReads && _bufferedBytes == 0)
221+
{
222+
// Wait for data by doing 0 byte read before
223+
await InnerStream.ReadAsync(Memory<byte>.Empty, cancellationToken).ConfigureAwait(false);
224+
}
225+
218226
AllocateReadTail();
219227

220228
Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
@@ -296,7 +304,7 @@ private bool TryReadInternal(CancellationTokenSource source, out ReadResult resu
296304

297305
private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
298306
{
299-
Debug.Assert(_readHead != null &&_readTail != null);
307+
Debug.Assert(_readHead != null && _readTail != null);
300308
return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
301309
}
302310

@@ -311,7 +319,7 @@ private void AllocateReadTail()
311319
else
312320
{
313321
Debug.Assert(_readTail != null);
314-
if (_readTail.WritableBytes < _minimumReadThreshold)
322+
if (_readTail.WritableBytes < MinimumReadThreshold)
315323
{
316324
BufferSegment nextSegment = AllocateSegment();
317325
_readTail.SetNext(nextSegment);
@@ -324,13 +332,13 @@ private BufferSegment AllocateSegment()
324332
{
325333
BufferSegment nextSegment = CreateSegmentUnsynchronized();
326334

327-
if (_pool is null)
335+
if (_options.IsDefaultSharedMemoryPool)
328336
{
329-
nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(_bufferSize));
337+
nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(BufferSize));
330338
}
331339
else
332340
{
333-
nextSegment.SetOwnedMemory(_pool.Rent(_bufferSize));
341+
nextSegment.SetOwnedMemory(Pool.Rent(BufferSize));
334342
}
335343

336344
return nextSegment;

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReaderOptions.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,24 @@ public class StreamPipeReaderOptions
1818
/// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
1919
/// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
2020
/// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
21-
public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
21+
public StreamPipeReaderOptions(MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) :
22+
this(pool, bufferSize, minimumReadSize, leaveOpen, useZeroByteReads: false)
23+
{
24+
25+
}
26+
27+
/// <summary>Initializes a <see cref="System.IO.Pipelines.StreamPipeReaderOptions" /> instance, optionally specifying a memory pool, a minimum buffer size, a minimum read size, and whether the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
28+
/// <param name="pool">The memory pool to use when allocating memory. The default value is <see langword="null" />.</param>
29+
/// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
30+
/// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
31+
/// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
32+
/// <param name="useZeroByteReads"><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</param>
33+
public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false)
2234
{
2335
Pool = pool ?? MemoryPool<byte>.Shared;
2436

37+
IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared;
38+
2539
BufferSize =
2640
bufferSize == -1 ? DefaultBufferSize :
2741
bufferSize <= 0 ? throw new ArgumentOutOfRangeException(nameof(bufferSize)) :
@@ -33,6 +47,8 @@ public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -
3347
minimumReadSize;
3448

3549
LeaveOpen = leaveOpen;
50+
51+
UseZeroByteReads = useZeroByteReads;
3652
}
3753

3854
/// <summary>Gets the minimum buffer size to use when renting memory from the <see cref="System.IO.Pipelines.StreamPipeReaderOptions.Pool" />.</summary>
@@ -50,5 +66,14 @@ public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -
5066
/// <summary>Gets the value that indicates if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
5167
/// <value><see langword="true" /> if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; otherwise, <see langword="false" />.</value>
5268
public bool LeaveOpen { get; }
69+
70+
/// <summary>Gets the value that indicates if reads with an empty buffer should be issued to the underlying stream, in order to wait for data to arrive before allocating memory.</summary>
71+
/// <value><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</value>
72+
public bool UseZeroByteReads { get; }
73+
74+
/// <summary>
75+
/// Returns true if Pool is <see cref="MemoryPool{Byte}"/>.Shared
76+
/// </summary>
77+
internal bool IsDefaultSharedMemoryPool { get; }
5378
}
5479
}

src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ public async Task TryReadReturnsFalseIfBufferedBytesAndEverythingExamined()
6767
reader.Complete();
6868
}
6969

70-
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
71-
public async Task CanReadMultipleTimes()
70+
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
71+
[InlineData(false)]
72+
[InlineData(true)]
73+
public async Task CanReadMultipleTimes(bool useZeroByteReads)
7274
{
7375
// This needs to run inline to synchronize the reader and writer
7476
TaskCompletionSource<object> waitForRead = null;
@@ -109,7 +111,7 @@ async Task DoAsyncWrites(PipeWriter writer, int[] bufferSizes)
109111

110112
// We're using the pipe here as a way to pump bytes into the reader asynchronously
111113
var pipe = new Pipe();
112-
var options = new StreamPipeReaderOptions(bufferSize: 4096);
114+
var options = new StreamPipeReaderOptions(bufferSize: 4096, useZeroByteReads: useZeroByteReads);
113115
PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options);
114116

115117
var writes = new[] { 4096, 1024, 123, 4096, 100 };

0 commit comments

Comments
 (0)