Skip to content

Revert the input pipe in the DuplexStreamPipeAdapter #12204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 16, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
Expand All @@ -15,21 +16,32 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
/// <typeparam name="TStream"></typeparam>
internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
{
private readonly Pipe _input;
private readonly Pipe _output;
private Task _inputTask;
private Task _outputTask;
private bool _disposed;
private readonly object _disposeLock = new object();
private readonly int _minAllocBufferSize;

public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
{
}

public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
base(duplexPipe.Input, duplexPipe.Output)
base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true)
{
Stream = createStream(this);

var inputOptions = new PipeOptions(pool: readerOptions.Pool,
readerScheduler: PipeScheduler.ThreadPool,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(),
useSynchronizationContext: false);

var outputOptions = new PipeOptions(pool: writerOptions.Pool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
Expand All @@ -38,7 +50,9 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r
minimumSegmentSize: writerOptions.MinimumBufferSize,
useSynchronizationContext: false);

Input = PipeReader.Create(Stream, readerOptions);
_minAllocBufferSize = writerOptions.MinimumBufferSize;

_input = new Pipe(inputOptions);

// We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions
// about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once
Expand All @@ -50,43 +64,112 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r

public TStream Stream { get; }

public PipeReader Input { get; }
public PipeReader Input
{
get
{
if (_inputTask == null)
{
RunAsync();
}

return _input.Reader;
}
}

public PipeWriter Output
{
get
{
if (_outputTask == null)
{
_outputTask = WriteOutputAsync();
RunAsync();
}

return _output.Writer;
}
}

public override ValueTask DisposeAsync()
public void RunAsync()
{
_inputTask = ReadInputAsync();
_outputTask = WriteOutputAsync();
}

public override async ValueTask DisposeAsync()
{
lock (_disposeLock)
{
if (_disposed)
{
return default;
return;
}
_disposed = true;
}

Input.Complete();
_output.Writer.Complete();
_input.Reader.Complete();

if (_outputTask == null || _outputTask.IsCompletedSuccessfully)
if (_outputTask == null)
{
// Wait for the output task to complete, this ensures that we've copied
// the application data to the underlying stream
return default;
return;
}

return new ValueTask(_outputTask);
if (_outputTask != null)
{
await _outputTask;
}

CancelPendingRead();

if (_inputTask != null)
{
await _inputTask;
}
}

private async Task ReadInputAsync()
{
Exception error = null;
try
{
while (true)
{
var outputBuffer = _input.Writer.GetMemory(_minAllocBufferSize);

var bytesRead = await Stream.ReadAsync(outputBuffer);
_input.Writer.Advance(bytesRead);

if (bytesRead == 0)
{
// FIN
break;
}

var result = await _input.Writer.FlushAsync();

if (result.IsCompleted)
{
// flushResult should not be canceled.
break;
}
}

}
catch (OperationCanceledException ex)
{
// Propagate the exception if it's ConnectionAbortedException
error = ex as ConnectionAbortedException;
}
catch (Exception ex)
{
// Don't rethrow the exception. It should be handled by the Pipeline consumer.
error = ex;
}
finally
{
_input.Writer.Complete(error);
}
}

private async Task WriteOutputAsync()
Expand Down