Skip to content

Add an option to do zero byte reads on StreamPipeReader #49117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ public static partial class StreamPipeExtensions
}
public partial class StreamPipeReaderOptions
{
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false) { }
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) { }
public StreamPipeReaderOptions(System.Buffers.MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false) { }
public int BufferSize { get { throw null; } }
public bool LeaveOpen { get { throw null; } }
public int MinimumReadSize { get { throw null; } }
public System.Buffers.MemoryPool<byte> Pool { get { throw null; } }
public bool UseZeroByteReads { get { throw null; } }
}
public partial class StreamPipeWriterOptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ internal class StreamPipeReader : PipeReader
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB

private readonly int _bufferSize;
private readonly int _minimumReadThreshold;
private readonly MemoryPool<byte>? _pool;

private CancellationTokenSource? _internalTokenSource;
private bool _isReaderCompleted;
private bool _isStreamCompleted;
Expand All @@ -31,7 +27,8 @@ internal class StreamPipeReader : PipeReader

// Mutable struct! Don't make this readonly
private BufferSegmentStack _bufferSegmentPool;
private readonly bool _leaveOpen;

private StreamPipeReaderOptions _options;

/// <summary>
/// Creates a new StreamPipeReader.
Expand All @@ -47,13 +44,17 @@ public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
throw new ArgumentNullException(nameof(options));
}

_options = options;
_bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
_minimumReadThreshold = Math.Min(options.MinimumReadSize, options.BufferSize);
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
_bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize);
_leaveOpen = options.LeaveOpen;
}

// All derived from the options
private bool LeaveOpen => _options.LeaveOpen;
private bool UseZeroByteReads => _options.UseZeroByteReads;
private int BufferSize => _options.BufferSize;
private int MinimumReadThreshold => _options.MinimumReadSize;
private MemoryPool<byte> Pool => _options.Pool;

/// <summary>
/// Gets the inner stream that is being read from.
/// </summary>
Expand Down Expand Up @@ -180,7 +181,7 @@ public override void Complete(Exception? exception = null)
returnSegment.ResetMemory();
}

if (!_leaveOpen)
if (!LeaveOpen)
{
InnerStream.Dispose();
}
Expand Down Expand Up @@ -215,6 +216,13 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
var isCanceled = false;
try
{
// This optimization only makes sense if we don't have anything buffered
if (UseZeroByteReads && _bufferedBytes == 0)
{
// Wait for data by doing 0 byte read before
await InnerStream.ReadAsync(Memory<byte>.Empty, cancellationToken).ConfigureAwait(false);
}

AllocateReadTail();

Memory<byte> buffer = _readTail!.AvailableMemory.Slice(_readTail.End);
Expand Down Expand Up @@ -296,7 +304,7 @@ private bool TryReadInternal(CancellationTokenSource source, out ReadResult resu

private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
{
Debug.Assert(_readHead != null &&_readTail != null);
Debug.Assert(_readHead != null && _readTail != null);
return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
}

Expand All @@ -311,7 +319,7 @@ private void AllocateReadTail()
else
{
Debug.Assert(_readTail != null);
if (_readTail.WritableBytes < _minimumReadThreshold)
if (_readTail.WritableBytes < MinimumReadThreshold)
{
BufferSegment nextSegment = AllocateSegment();
_readTail.SetNext(nextSegment);
Expand All @@ -324,13 +332,13 @@ private BufferSegment AllocateSegment()
{
BufferSegment nextSegment = CreateSegmentUnsynchronized();

if (_pool is null)
if (_options.IsDefaultSharedMemoryPool)
{
nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(_bufferSize));
nextSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(BufferSize));
}
else
{
nextSegment.SetOwnedMemory(_pool.Rent(_bufferSize));
nextSegment.SetOwnedMemory(Pool.Rent(BufferSize));
}

return nextSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,24 @@ public class StreamPipeReaderOptions
/// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
/// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
/// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false)
public StreamPipeReaderOptions(MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen) :
this(pool, bufferSize, minimumReadSize, leaveOpen, useZeroByteReads: false)
{

}

/// <summary>Initializes a <see cref="System.IO.Pipelines.StreamPipeReaderOptions" /> instance, optionally specifying a memory pool, a minimum buffer size, a minimum read size, and whether the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
/// <param name="pool">The memory pool to use when allocating memory. The default value is <see langword="null" />.</param>
/// <param name="bufferSize">The minimum buffer size to use when renting memory from the <paramref name="pool" />. The default value is 4096.</param>
/// <param name="minimumReadSize">The threshold of remaining bytes in the buffer before a new buffer is allocated. The default value is 1024.</param>
/// <param name="leaveOpen"><see langword="true" /> to leave the underlying stream open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; <see langword="false" /> to close it. The default is <see langword="false" />.</param>
/// <param name="useZeroByteReads"><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</param>
public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false)
{
Pool = pool ?? MemoryPool<byte>.Shared;

IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared;

BufferSize =
bufferSize == -1 ? DefaultBufferSize :
bufferSize <= 0 ? throw new ArgumentOutOfRangeException(nameof(bufferSize)) :
Expand All @@ -33,6 +47,8 @@ public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -
minimumReadSize;

LeaveOpen = leaveOpen;

UseZeroByteReads = useZeroByteReads;
}

/// <summary>Gets the minimum buffer size to use when renting memory from the <see cref="System.IO.Pipelines.StreamPipeReaderOptions.Pool" />.</summary>
Expand All @@ -50,5 +66,14 @@ public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -
/// <summary>Gets the value that indicates if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes.</summary>
/// <value><see langword="true" /> if the underlying stream should be left open after the <see cref="System.IO.Pipelines.PipeReader" /> completes; otherwise, <see langword="false" />.</value>
public bool LeaveOpen { get; }

/// <summary>Gets the value that indicates if reads with an empty buffer should be issued to the underlying stream, in order to wait for data to arrive before allocating memory.</summary>
/// <value><see langword="true" /> if reads with an empty buffer should be issued to the underlying stream before allocating memory; otherwise, <see langword="false" />.</value>
public bool UseZeroByteReads { get; }

/// <summary>
/// Returns true if Pool is <see cref="MemoryPool{Byte}"/>.Shared
/// </summary>
internal bool IsDefaultSharedMemoryPool { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ public async Task TryReadReturnsFalseIfBufferedBytesAndEverythingExamined()
reader.Complete();
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task CanReadMultipleTimes()
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(false)]
[InlineData(true)]
public async Task CanReadMultipleTimes(bool useZeroByteReads)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be another test with a mocked stream so we can check that it actually issues zero byte reads

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😢

{
// This needs to run inline to synchronize the reader and writer
TaskCompletionSource<object> waitForRead = null;
Expand Down Expand Up @@ -109,7 +111,7 @@ async Task DoAsyncWrites(PipeWriter writer, int[] bufferSizes)

// We're using the pipe here as a way to pump bytes into the reader asynchronously
var pipe = new Pipe();
var options = new StreamPipeReaderOptions(bufferSize: 4096);
var options = new StreamPipeReaderOptions(bufferSize: 4096, useZeroByteReads: useZeroByteReads);
PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options);

var writes = new[] { 4096, 1024, 123, 4096, 100 };
Expand Down