Skip to content

Commit

Permalink
Cache and reuse NamedPipeServerStream instances (dotnet#46473)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Feb 7, 2023
1 parent 20c7ae1 commit 7b5b67e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal;

internal sealed class NamedPipeConnection : TransportConnection, IConnectionNamedPipeFeature
{
private static readonly ConnectionAbortedException SendGracefullyCompletedException = new ConnectionAbortedException("The named pipe transport's send loop completed gracefully.");
private const int MinAllocBufferSize = 4096;

private readonly NamedPipeConnectionListener _connectionListener;
private readonly NamedPipeServerStream _stream;
private readonly ILogger _log;
private readonly IDuplexPipe _originalTransport;

private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();
private bool _connectionClosed;
private bool _connectionDisposed;
private bool _connectionShutdown;
private bool _streamDisconnected;
private Exception? _shutdownReason;
private readonly object _shutdownLock = new object();

Expand All @@ -30,13 +32,15 @@ internal sealed class NamedPipeConnection : TransportConnection, IConnectionName
internal Task _sendingTask = Task.CompletedTask;

public NamedPipeConnection(
NamedPipeConnectionListener connectionListener,
NamedPipeServerStream stream,
NamedPipeEndPoint endPoint,
ILogger logger,
MemoryPool<byte> memoryPool,
PipeOptions inputOptions,
PipeOptions outputOptions)
{
_connectionListener = connectionListener;
_stream = stream;
_log = logger;
MemoryPool = memoryPool;
Expand Down Expand Up @@ -120,7 +124,7 @@ private async Task DoReceiveAsync()
// This exception should always be ignored because _shutdownReason should be set.
error = ex;

if (!_connectionDisposed)
if (!_connectionShutdown)
{
// This is unexpected if the socket hasn't been disposed yet.
NamedPipeLog.ConnectionError(_log, this, error);
Expand Down Expand Up @@ -206,33 +210,32 @@ private void Shutdown(Exception? shutdownReason)
{
lock (_shutdownLock)
{
if (_connectionDisposed)
if (_connectionShutdown)
{
return;
}

// Make sure to close the connection only after the _aborted flag is set.
// Without this, the RequestsCanBeAbortedMidRead test will sometimes fail when
// a BadHttpRequestException is thrown instead of a TaskCanceledException.
_connectionDisposed = true;
_connectionShutdown = true;

// shutdownReason should only be null if the output was completed gracefully, so no one should ever
// ever observe the nondescript ConnectionAbortedException except for connection middleware attempting
// to half close the connection which is currently unsupported.
_shutdownReason = shutdownReason ?? new ConnectionAbortedException("The Socket transport's send loop completed gracefully.");
_shutdownReason = shutdownReason ?? SendGracefullyCompletedException;
NamedPipeLog.ConnectionDisconnect(_log, this, _shutdownReason.Message);

try
{
// Try to gracefully close the socket even for aborts to match libuv behavior.
_stream.Disconnect();
_streamDisconnected = true;
}
catch
{
// Ignore any errors from NamedPipeStream.Disconnect() since we're tearing down the connection anyway.
}

_stream.Dispose();
}
}

Expand Down Expand Up @@ -287,8 +290,17 @@ public override async ValueTask DisposeAsync()
catch (Exception ex)
{
_log.LogError(0, ex, $"Unexpected exception in {nameof(NamedPipeConnection)}.{nameof(Start)}.");
_stream.Dispose();
return;
}

if (!_streamDisconnected)
{
_stream.Dispose();
}
else
{
_connectionListener.ReturnStream(_stream);
}

await _stream.DisposeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Channels;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using NamedPipeOptions = System.IO.Pipes.PipeOptions;
using PipeOptions = System.IO.Pipelines.PipeOptions;

Expand All @@ -19,6 +20,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
private readonly ILogger _log;
private readonly NamedPipeEndPoint _endpoint;
private readonly NamedPipeTransportOptions _options;
private readonly ObjectPool<NamedPipeServerStream> _namedPipeServerStreamPool;
private readonly CancellationTokenSource _listeningTokenSource = new CancellationTokenSource();
private readonly CancellationToken _listeningToken;
private readonly Channel<ConnectionContext> _acceptedQueue;
Expand All @@ -38,6 +40,7 @@ public NamedPipeConnectionListener(
_log = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes");
_endpoint = endpoint;
_options = options;
_namedPipeServerStreamPool = new DefaultObjectPoolProvider().Create(new NamedPipeServerStreamPoolPolicy(this));
_mutex = mutex;
_memoryPool = options.MemoryPoolFactory();
_listeningToken = _listeningTokenSource.Token;
Expand All @@ -54,6 +57,12 @@ public NamedPipeConnectionListener(
_outputOptions = new PipeOptions(_memoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
}

internal void ReturnStream(NamedPipeServerStream namedPipeServerStream)
{
// The stream is automatically disposed if there isn't space in the pool.
_namedPipeServerStreamPool.Return(namedPipeServerStream);
}

public void Start()
{
Debug.Assert(_listeningTasks == null, "Already started");
Expand All @@ -63,7 +72,7 @@ public void Start()
for (var i = 0; i < _listeningTasks.Length; i++)
{
// Start first stream inline to catch creation errors.
var initialStream = CreateServerStream();
var initialStream = _namedPipeServerStreamPool.Get();

_listeningTasks[i] = Task.Run(() => StartAsync(initialStream));
}
Expand All @@ -83,13 +92,13 @@ private async Task StartAsync(NamedPipeServerStream nextStream)

await stream.WaitForConnectionAsync(_listeningToken);

var connection = new NamedPipeConnection(stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
connection.Start();

// Create the next stream before writing connected stream to the channel.
// This ensures there is always a created stream and another process can't
// create a stream with the same name with different a access policy.
nextStream = CreateServerStream();
nextStream = _namedPipeServerStreamPool.Get();

while (!_acceptedQueue.Writer.TryWrite(connection))
{
Expand All @@ -106,7 +115,7 @@ private async Task StartAsync(NamedPipeServerStream nextStream)

// Dispose existing pipe, create a new one and continue accepting.
nextStream.Dispose();
nextStream = CreateServerStream();
nextStream = _namedPipeServerStreamPool.Get();
}
catch (OperationCanceledException ex) when (_listeningToken.IsCancellationRequested)
{
Expand All @@ -125,41 +134,6 @@ private async Task StartAsync(NamedPipeServerStream nextStream)
}
}

private NamedPipeServerStream CreateServerStream()
{
NamedPipeServerStream stream;
var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough;
if (_options.CurrentUserOnly)
{
pipeOptions |= NamedPipeOptions.CurrentUserOnly;
}

if (_options.PipeSecurity != null)
{
stream = NamedPipeServerStreamAcl.Create(
_endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0, // Buffer in System.IO.Pipelines
outBufferSize: 0, // Buffer in System.IO.Pipelines
_options.PipeSecurity);
}
else
{
stream = new NamedPipeServerStream(
_endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0,
outBufferSize: 0);
}
return stream;
}

public async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
{
while (await _acceptedQueue.Reader.WaitToReadAsync(cancellationToken))
Expand Down Expand Up @@ -191,5 +165,56 @@ public async ValueTask DisposeAsync()
{
await Task.WhenAll(_listeningTasks);
}

// Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed.
// Important to dispose because this empties and disposes streams in the pool.
((IDisposable)_namedPipeServerStreamPool).Dispose();
}

private sealed class NamedPipeServerStreamPoolPolicy : IPooledObjectPolicy<NamedPipeServerStream>
{
public NamedPipeConnectionListener _listener;

public NamedPipeServerStreamPoolPolicy(NamedPipeConnectionListener listener)
{
_listener = listener;
}

public NamedPipeServerStream Create()
{
NamedPipeServerStream stream;
var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough;
if (_listener._options.CurrentUserOnly)
{
pipeOptions |= NamedPipeOptions.CurrentUserOnly;
}

if (_listener._options.PipeSecurity != null)
{
stream = NamedPipeServerStreamAcl.Create(
_listener._endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0, // Buffer in System.IO.Pipelines
outBufferSize: 0, // Buffer in System.IO.Pipelines
_listener._options.PipeSecurity);
}
else
{
stream = new NamedPipeServerStream(
_listener._endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0,
outBufferSize: 0);
}
return stream;
}

public bool Return(NamedPipeServerStream obj) => true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<ItemGroup>
<Reference Include="Microsoft.AspNetCore.Hosting.Abstractions" />
<Reference Include="Microsoft.AspNetCore.Connections.Abstractions" />
<Reference Include="Microsoft.Extensions.ObjectPool" />
<Reference Include="Microsoft.Extensions.Options" />
</ItemGroup>

Expand Down

0 comments on commit 7b5b67e

Please sign in to comment.