Skip to content

Revert the output pipe in the DuplexStreamPipeAdapter #11601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public void Complete()

_completed = true;
_connectionOutputFlowControl.Abort();
_outputWriter.Complete();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context)

if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate)
{
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions);
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions)
{
Log = _logger
};
certificateRequired = false;
}
else
Expand Down Expand Up @@ -140,7 +143,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context)
}

return true;
}));
}))
{
Log = _logger
};

certificateRequired = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
{
Expand All @@ -14,36 +15,114 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
/// <typeparam name="TStream"></typeparam>
internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
{
private readonly Pipe _output;
private Task _outputTask;

public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
{
}

public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) : base(duplexPipe.Input, duplexPipe.Output)
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
base(duplexPipe.Input, duplexPipe.Output)
{
Stream = createStream(this);

var outputOptions = new PipeOptions(pool: writerOptions.Pool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use the defaults here like we did before to reduce thrashing:

https://github.com/aspnet/AspNetCore/blob/release/3.0-preview6/src/Servers/Kestrel/Core/src/Internal/HttpConnection.cs#L67-L76

        internal PipeOptions AdaptedOutputPipeOptions => new PipeOptions
        (
            pool: MemoryPool,
            readerScheduler: PipeScheduler.Inline,
            writerScheduler: PipeScheduler.Inline,
            pauseWriterThreshold: _context.ServiceContext.ServerOptions.Limits.MaxResponseBufferSize ?? 0,
            resumeWriterThreshold: _context.ServiceContext.ServerOptions.Limits.MaxResponseBufferSize ?? 0,
            useSynchronizationContext: false,
            minimumSegmentSize: MemoryPool.GetMinimumSegmentSize()
        );

minimumSegmentSize: writerOptions.MinimumBufferSize,
useSynchronizationContext: false);

Input = PipeReader.Create(Stream, readerOptions);
Output = PipeWriter.Create(Stream, writerOptions);

// We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions
// about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once
// those patterns are fixed.
_output = new Pipe(outputOptions);
}

public ILogger Log { get; set; }

public TStream Stream { get; }

public PipeReader Input { get; }

public PipeWriter Output { get; }
public PipeWriter Output
{
get
{
if (_outputTask == null)
{
_outputTask = WriteOutputAsync();
}

protected override void Dispose(bool disposing)
return _output.Writer;
}
}

public override async ValueTask DisposeAsync()
{
Input.Complete();
Output.Complete();
base.Dispose(disposing);
_output.Writer.Complete();

if (_outputTask != null)
{
// Wait for the output task to complete, this ensures that we've copied
// the application data to the underlying stream
await _outputTask;
}
}

public override ValueTask DisposeAsync()
private async Task WriteOutputAsync()
{
Input.Complete();
Output.Complete();
return base.DisposeAsync();
try
{
while (true)
{
var result = await _output.Reader.ReadAsync();
var buffer = result.Buffer;

try
{
if (buffer.IsEmpty)
{
if (result.IsCompleted)
{
break;
}

await Stream.FlushAsync();
}
else if (buffer.IsSingleSegment)
{
await Stream.WriteAsync(buffer.First);
}
else
{
foreach (var memory in buffer)
{
await Stream.WriteAsync(memory);
}
}
}
finally
{
_output.Reader.AdvanceTo(buffer.End);
}
}
}
catch (Exception ex)
{
Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}");
}
finally
{
_output.Reader.Complete();
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private class LoggingDuplexPipe : DuplexPipeStreamAdapter<LoggingStream>
public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) :
base(transport, stream => new LoggingStream(stream, logger))
{
Log = logger;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,22 @@ protected async Task InitializeConnectionAsync(RequestDelegate application, int
CreateConnection();
}

_connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application));
var connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application));

async Task CompletePipeOnTaskCompletion()
{
try
{
await connectionTask;
}
finally
{
_pair.Transport.Input.Complete();
_pair.Transport.Output.Complete();
}
}

_connectionTask = CompletePipeOnTaskCompletion();

await SendPreambleAsync().ConfigureAwait(false);
await SendSettingsAsync();
Expand Down