Skip to content

Allow un-examining in PipeReader.AdvanceTo(...) #107360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,6 @@ internal void AdvanceReader(in SequencePosition consumed, in SequencePosition ex
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

// TODO: Use new SequenceMarshal.TryGetReadOnlySequenceSegment to get the correct data
// directly casting only works because the type value in ReadOnlySequenceSegment is 0
AdvanceReader((BufferSegment?)consumed.GetObject(), consumed.GetInteger(), (BufferSegment?)examined.GetObject(), examined.GetInteger());
}

Expand All @@ -486,14 +484,11 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu

if (examinedSegment != null && _lastExaminedIndex >= 0)
{
// This can be negative resulting in _unconsumedBytes increasing, this should be safe because we've already checked that
// examined >= consumed above, so we can't get into a state where we un-examine too much
long examinedBytes = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex);
long oldLength = _unconsumedBytes;

if (examinedBytes < 0)
{
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition();
}

_unconsumedBytes -= examinedBytes;

// Store the absolute position
Expand All @@ -505,6 +500,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
if (oldLength >= ResumeWriterThreshold &&
_unconsumedBytes < ResumeWriterThreshold)
{
// Should only release backpressure if we made forward progress
Debug.Assert(examinedBytes > 0);
_writerAwaitable.Complete(out completionData);
}
}
Expand Down Expand Up @@ -570,7 +567,7 @@ void MoveReturnEndToNextBlock()
// but only if writer is not completed yet
if (examinedEverything && !_writerCompletion.IsCompleted)
{
Debug.Assert(_writerAwaitable.IsCompleted, "PipeWriter.FlushAsync is isn't completed and will deadlock");
Debug.Assert(_writerAwaitable.IsCompleted, "PipeWriter.FlushAsync isn't completed and will deadlock");

_readerAwaitable.SetUncompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSi
/// The <see cref="System.IO.Pipelines.ReadResult.Buffer" /> previously returned from <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> must not be accessed after this call.
/// This is equivalent to calling <see cref="System.IO.Pipelines.PipeReader.AdvanceTo(System.SequencePosition,System.SequencePosition)" /> with identical examined and consumed positions.
/// The examined data communicates to the pipeline when it should signal more data is available.
/// Because the consumed parameter doubles as the examined parameter, the consumed parameter should be greater than or equal to the examined position in the previous call to `AdvanceTo`. Otherwise, an <see cref="System.InvalidOperationException" /> is thrown.</remarks>
/// </remarks>
public abstract void AdvanceTo(SequencePosition consumed);

/// <summary>Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed, read and examined.</summary>
/// <param name="consumed">Marks the extent of the data that has been successfully processed.</param>
/// <param name="examined">Marks the extent of the data that has been read and examined.</param>
/// <remarks>The memory for the consumed data will be released and no longer available.
/// The <see cref="System.IO.Pipelines.ReadResult.Buffer" /> previously returned from <see cref="System.IO.Pipelines.PipeReader.ReadAsync(System.Threading.CancellationToken)" /> must not be accessed after this call.
/// The examined data communicates to the pipeline when it should signal more data is available.
/// The examined parameter should be greater than or equal to the examined position in the previous call to `AdvanceTo`. Otherwise, an <see cref="System.InvalidOperationException" /> is thrown.</remarks>
/// The examined data communicates to the pipeline when it should signal more data is available.</remarks>
public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);

/// <summary>Returns a <see cref="System.IO.Stream" /> representation of the <see cref="System.IO.Pipelines.PipeReader" />.</summary>
Expand Down
28 changes: 28 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,5 +241,33 @@ public async Task FlushAsyncThrowsIfReaderCompletedWithException()
invalidOperationException = await Assert.ThrowsAsync<InvalidOperationException>(async () => await writableBuffer.FlushAsync());
Assert.Equal("Reader failed", invalidOperationException.Message);
}

[Fact]
public void FlushAsyncAwaitableDoesNotCompleteWhenReaderUnexamines()
{
PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold);
ValueTask<FlushResult> flushAsync = writableBuffer.FlushAsync();

ReadResult result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult();
SequencePosition examined = result.Buffer.GetPosition(2);
// Examine 2, don't advance consumed
_pipe.Reader.AdvanceTo(result.Buffer.Start, examined);

Assert.False(flushAsync.IsCompleted);

result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult();
// Examine 1 which is less than the previous examined index of 2
examined = result.Buffer.GetPosition(1);
_pipe.Reader.AdvanceTo(result.Buffer.Start, examined);

Assert.False(flushAsync.IsCompleted);

// Just make sure we can still release backpressure
result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult();
examined = result.Buffer.GetPosition(ResumeWriterThreshold + 1);
_pipe.Reader.AdvanceTo(examined, examined);

Assert.True(flushAsync.IsCompleted);
}
}
}
24 changes: 3 additions & 21 deletions src/libraries/System.IO.Pipelines/tests/PipeLengthTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,24 +220,6 @@ public async Task ExaminedAtSecondLastBlockWorks()
Assert.Equal(0, _pipe.Length);
}

[Fact]
public async Task ExaminedLessThanBeforeThrows()
{
_pipe.Writer.WriteEmpty(10);
await _pipe.Writer.FlushAsync();

ReadResult result = await _pipe.Reader.ReadAsync();
_pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End);

Assert.Equal(0, _pipe.Length);

_pipe.Writer.WriteEmpty(10);
await _pipe.Writer.FlushAsync();

result = await _pipe.Reader.ReadAsync();
Assert.Throws<InvalidOperationException>(() => _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.Start));
}

[Fact]
public async Task ConsumedGreaterThanExaminedThrows()
{
Expand Down Expand Up @@ -284,10 +266,10 @@ public async Task AdvanceFollowedByWriteAsyncTest()
Memory<byte> buffer = new byte[26];
Pipe pipe = new(new PipeOptions(minimumSegmentSize: 1));

var mem = pipe.Writer.GetMemory(14)[..14];
buffer[..14].CopyTo(mem);
var mem = pipe.Writer.GetMemory(14).Slice(0, 14);
buffer.Slice(0, 14).CopyTo(mem);
pipe.Writer.Advance(14);
await pipe.Writer.WriteAsync(buffer[14..]);
await pipe.Writer.WriteAsync(buffer.Slice(14));
ReadResult res = await pipe.Reader.ReadAsync();
Assert.Equal(res.Buffer.Length, buffer.Length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public async Task DisposingPipeReaderStreamCompletesPipeReader(bool dataInPipe)
for (int i = 0; i < 2; i++)
{
s.Dispose();
#if NET
await s.DisposeAsync();
#endif
}

// Make sure OnReaderCompleted was invoked.
Expand Down Expand Up @@ -296,6 +298,32 @@ public void AsStreamDoNotCompleteReader()
pipeReader.AsStream(leaveOpen: true).Dispose();
}

// Regression test: https://github.com/dotnet/runtime/issues/107213
[Fact]
public async Task ZeroByteReadWorksWhenExaminedDoesNotEqualConsumed()
{
Pipe pipe = new Pipe();
Stream stream = pipe.Reader.AsStream();

await pipe.Writer.WriteAsync(new byte[2]);

ReadResult readResult = await pipe.Reader.ReadAsync();
// Consume: 0, Advance: 2
pipe.Reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);

// Write more so that the next read will see unexamined bytes available and not block
await pipe.Writer.WriteAsync(new byte[2]);

// Zero-byte read to test that advancing (via PipeReader.AdvanceTo(consumed)) doesn't throw due to examined being less than
// the last examined position
int result = await stream.ReadAsync(Memory<byte>.Empty);
Assert.Equal(0, result);

// Real read to make sure data is immediately available
result = await stream.ReadAsync(new byte[100]);
Assert.Equal(4, result);
}

public class BuggyPipeReader : PipeReader
{
public override void AdvanceTo(SequencePosition consumed)
Expand Down Expand Up @@ -405,7 +433,7 @@ public static IEnumerable<object[]> ReadCalls

ReadAsyncDelegate readSpanSync = (stream, data) =>
{
return Task.FromResult(stream.Read(data));
return Task.FromResult(stream.Read(data, 0, data.Length));
};

yield return new object[] { readArrayAsync };
Expand Down
82 changes: 82 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,88 @@ public async Task ReadAsyncReturnsDataAfterCanceledRead()
pipe.Reader.AdvanceTo(readResult.Buffer.End);
}

// Regression test: https://github.com/dotnet/runtime/issues/107213
[Fact]
public async Task AdvanceToWithoutExaminedCanUnExamine()
{
PipeWriter buffer = _pipe.Writer;
buffer.Write("Hello Worl"u8.ToArray());
await buffer.FlushAsync();

bool gotData = _pipe.Reader.TryRead(out ReadResult result);
Assert.True(gotData);

Assert.Equal("Hello Worl", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Advance past 'Hello ' and examine everything else
_pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.End);

// Write so that the next ReadAsync will be unblocked
buffer.Write("d"u8.ToArray());
await buffer.FlushAsync();

result = await _pipe.Reader.ReadAsync();

Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Previous examine is at the end of 'Worl', calling AdvanceTo without passing examined will unexamine (not externally visible)
// But more importantly, it will work and not throw that you're unexamining
_pipe.Reader.AdvanceTo(result.Buffer.Start);

// Double check that ReadAsync is still unblocked and works
result = await _pipe.Reader.ReadAsync();
Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));
}

[Fact]
public async Task AdvanceToWithExaminedCanUnExamine()
{
PipeWriter buffer = _pipe.Writer;
buffer.Write("Hello Worl"u8.ToArray());
await buffer.FlushAsync();

bool gotData = _pipe.Reader.TryRead(out ReadResult result);
Assert.True(gotData);

Assert.Equal("Hello Worl", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Advance past 'Hello ' and examine everything else
_pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.End);

// Write so that the next ReadAsync will be unblocked
buffer.Write("d"u8.ToArray());
await buffer.FlushAsync();

result = await _pipe.Reader.ReadAsync();

Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));

// Previous examine is at the end of 'Worl', calling AdvanceTo without passing examined will unexamine (not externally visible)
// But more importantly, it will work and not throw that you're unexamining
_pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.GetPosition(1));

// Double check that ReadAsync is still unblocked and works
result = await _pipe.Reader.ReadAsync();
Assert.Equal("World", Encoding.ASCII.GetString(result.Buffer.ToArray()));
}

[Fact]
public async Task ExaminedCannotBeBeforeConsumed()
{
PipeWriter buffer = _pipe.Writer;
buffer.Write("Hello World"u8.ToArray());
await buffer.FlushAsync();

bool gotData = _pipe.Reader.TryRead(out ReadResult result);
Assert.True(gotData);

Assert.Equal("Hello World", Encoding.ASCII.GetString(result.Buffer.ToArray()));

InvalidOperationException ex = Assert.Throws<InvalidOperationException>(
() => _pipe.Reader.AdvanceTo(result.Buffer.GetPosition(6), result.Buffer.GetPosition(5)));
Assert.Equal("The examined position must be greater than or equal to the consumed position.", ex.Message);
}

private bool IsTaskWithResult<T>(ValueTask<T> task)
{
return task == new ValueTask<T>(task.Result);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>$(NetCoreAppCurrent);$(NetFrameworkMinimum)</TargetFrameworks>
Expand All @@ -25,6 +25,7 @@
<Compile Include="PipeCompletionCallbacksTests.cs" />
<Compile Include="PipeOptionsTests.cs" />
<Compile Include="PipeReaderWriterFacts.cs" />
<Compile Include="PipeReaderStreamTests.cs" />
<Compile Include="PipePoolTests.cs" />
<Compile Include="PipeResetTests.cs" />
<Compile Include="PipeTest.cs" />
Expand All @@ -43,15 +44,14 @@
<Compile Include="StreamPipeWriterTests.cs" />
<Compile Include="Infrastructure\ThrowAfterNWritesStream.cs" />
<Compile Include="UnflushedBytesTests.cs" />
<Compile Include="PipeLengthTests.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETCoreApp'">
<Compile Include="PipeLengthTests.cs" />
<Compile Include="BufferSegmentPoolTest.cs" />
<Compile Include="PipeReaderWriterFacts.nonnetstandard.cs" />
<Compile Include="PipeResetTests.nonnetstandard.cs" />
<Compile Include="PipePoolTests.nonnetstandard.cs" />
<Compile Include="PipeWriterStreamTests.nonnetstandard.cs" />
<Compile Include="PipeReaderStreamTests.nonnetstandard.cs" />
<Compile Include="PipeReaderWriterStreamTests.nonnetstandard.cs" />
</ItemGroup>
<ItemGroup>
Expand Down
Loading