diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs index aace9360d054..872ce2da1660 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnection.cs @@ -13,15 +13,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal; internal sealed class NamedPipeConnection : TransportConnection, IConnectionNamedPipeFeature { + private static readonly ConnectionAbortedException SendGracefullyCompletedException = new ConnectionAbortedException("The named pipe transport's send loop completed gracefully."); private const int MinAllocBufferSize = 4096; - + private readonly NamedPipeConnectionListener _connectionListener; private readonly NamedPipeServerStream _stream; private readonly ILogger _log; private readonly IDuplexPipe _originalTransport; private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource(); private bool _connectionClosed; - private bool _connectionDisposed; + private bool _connectionShutdown; + private bool _streamDisconnected; private Exception? _shutdownReason; private readonly object _shutdownLock = new object(); @@ -30,6 +32,7 @@ internal sealed class NamedPipeConnection : TransportConnection, IConnectionName internal Task _sendingTask = Task.CompletedTask; public NamedPipeConnection( + NamedPipeConnectionListener connectionListener, NamedPipeServerStream stream, NamedPipeEndPoint endPoint, ILogger logger, @@ -37,6 +40,7 @@ public NamedPipeConnection( PipeOptions inputOptions, PipeOptions outputOptions) { + _connectionListener = connectionListener; _stream = stream; _log = logger; MemoryPool = memoryPool; @@ -120,7 +124,7 @@ private async Task DoReceiveAsync() // This exception should always be ignored because _shutdownReason should be set. error = ex; - if (!_connectionDisposed) + if (!_connectionShutdown) { // This is unexpected if the socket hasn't been disposed yet. NamedPipeLog.ConnectionError(_log, this, error); @@ -206,7 +210,7 @@ private void Shutdown(Exception? shutdownReason) { lock (_shutdownLock) { - if (_connectionDisposed) + if (_connectionShutdown) { return; } @@ -214,25 +218,24 @@ private void Shutdown(Exception? shutdownReason) // Make sure to close the connection only after the _aborted flag is set. // Without this, the RequestsCanBeAbortedMidRead test will sometimes fail when // a BadHttpRequestException is thrown instead of a TaskCanceledException. - _connectionDisposed = true; + _connectionShutdown = true; // shutdownReason should only be null if the output was completed gracefully, so no one should ever // ever observe the nondescript ConnectionAbortedException except for connection middleware attempting // to half close the connection which is currently unsupported. - _shutdownReason = shutdownReason ?? new ConnectionAbortedException("The Socket transport's send loop completed gracefully."); + _shutdownReason = shutdownReason ?? SendGracefullyCompletedException; NamedPipeLog.ConnectionDisconnect(_log, this, _shutdownReason.Message); try { // Try to gracefully close the socket even for aborts to match libuv behavior. _stream.Disconnect(); + _streamDisconnected = true; } catch { // Ignore any errors from NamedPipeStream.Disconnect() since we're tearing down the connection anyway. } - - _stream.Dispose(); } } @@ -287,8 +290,17 @@ public override async ValueTask DisposeAsync() catch (Exception ex) { _log.LogError(0, ex, $"Unexpected exception in {nameof(NamedPipeConnection)}.{nameof(Start)}."); + _stream.Dispose(); + return; + } + + if (!_streamDisconnected) + { + _stream.Dispose(); + } + else + { + _connectionListener.ReturnStream(_stream); } - - await _stream.DisposeAsync(); } } diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs index 928aeea09030..89eb5df6f68f 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs @@ -9,6 +9,7 @@ using System.Threading.Channels; using Microsoft.AspNetCore.Connections; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.ObjectPool; using NamedPipeOptions = System.IO.Pipes.PipeOptions; using PipeOptions = System.IO.Pipelines.PipeOptions; @@ -19,6 +20,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener private readonly ILogger _log; private readonly NamedPipeEndPoint _endpoint; private readonly NamedPipeTransportOptions _options; + private readonly ObjectPool _namedPipeServerStreamPool; private readonly CancellationTokenSource _listeningTokenSource = new CancellationTokenSource(); private readonly CancellationToken _listeningToken; private readonly Channel _acceptedQueue; @@ -38,6 +40,7 @@ public NamedPipeConnectionListener( _log = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes"); _endpoint = endpoint; _options = options; + _namedPipeServerStreamPool = new DefaultObjectPoolProvider().Create(new NamedPipeServerStreamPoolPolicy(this)); _mutex = mutex; _memoryPool = options.MemoryPoolFactory(); _listeningToken = _listeningTokenSource.Token; @@ -54,6 +57,12 @@ public NamedPipeConnectionListener( _outputOptions = new PipeOptions(_memoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false); } + internal void ReturnStream(NamedPipeServerStream namedPipeServerStream) + { + // The stream is automatically disposed if there isn't space in the pool. + _namedPipeServerStreamPool.Return(namedPipeServerStream); + } + public void Start() { Debug.Assert(_listeningTasks == null, "Already started"); @@ -63,7 +72,7 @@ public void Start() for (var i = 0; i < _listeningTasks.Length; i++) { // Start first stream inline to catch creation errors. - var initialStream = CreateServerStream(); + var initialStream = _namedPipeServerStreamPool.Get(); _listeningTasks[i] = Task.Run(() => StartAsync(initialStream)); } @@ -83,13 +92,13 @@ private async Task StartAsync(NamedPipeServerStream nextStream) await stream.WaitForConnectionAsync(_listeningToken); - var connection = new NamedPipeConnection(stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions); + var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions); connection.Start(); // Create the next stream before writing connected stream to the channel. // This ensures there is always a created stream and another process can't // create a stream with the same name with different a access policy. - nextStream = CreateServerStream(); + nextStream = _namedPipeServerStreamPool.Get(); while (!_acceptedQueue.Writer.TryWrite(connection)) { @@ -106,7 +115,7 @@ private async Task StartAsync(NamedPipeServerStream nextStream) // Dispose existing pipe, create a new one and continue accepting. nextStream.Dispose(); - nextStream = CreateServerStream(); + nextStream = _namedPipeServerStreamPool.Get(); } catch (OperationCanceledException ex) when (_listeningToken.IsCancellationRequested) { @@ -125,41 +134,6 @@ private async Task StartAsync(NamedPipeServerStream nextStream) } } - private NamedPipeServerStream CreateServerStream() - { - NamedPipeServerStream stream; - var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough; - if (_options.CurrentUserOnly) - { - pipeOptions |= NamedPipeOptions.CurrentUserOnly; - } - - if (_options.PipeSecurity != null) - { - stream = NamedPipeServerStreamAcl.Create( - _endpoint.PipeName, - PipeDirection.InOut, - NamedPipeServerStream.MaxAllowedServerInstances, - PipeTransmissionMode.Byte, - pipeOptions, - inBufferSize: 0, // Buffer in System.IO.Pipelines - outBufferSize: 0, // Buffer in System.IO.Pipelines - _options.PipeSecurity); - } - else - { - stream = new NamedPipeServerStream( - _endpoint.PipeName, - PipeDirection.InOut, - NamedPipeServerStream.MaxAllowedServerInstances, - PipeTransmissionMode.Byte, - pipeOptions, - inBufferSize: 0, - outBufferSize: 0); - } - return stream; - } - public async ValueTask AcceptAsync(CancellationToken cancellationToken = default) { while (await _acceptedQueue.Reader.WaitToReadAsync(cancellationToken)) @@ -191,5 +165,56 @@ public async ValueTask DisposeAsync() { await Task.WhenAll(_listeningTasks); } + + // Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed. + // Important to dispose because this empties and disposes streams in the pool. + ((IDisposable)_namedPipeServerStreamPool).Dispose(); + } + + private sealed class NamedPipeServerStreamPoolPolicy : IPooledObjectPolicy + { + public NamedPipeConnectionListener _listener; + + public NamedPipeServerStreamPoolPolicy(NamedPipeConnectionListener listener) + { + _listener = listener; + } + + public NamedPipeServerStream Create() + { + NamedPipeServerStream stream; + var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough; + if (_listener._options.CurrentUserOnly) + { + pipeOptions |= NamedPipeOptions.CurrentUserOnly; + } + + if (_listener._options.PipeSecurity != null) + { + stream = NamedPipeServerStreamAcl.Create( + _listener._endpoint.PipeName, + PipeDirection.InOut, + NamedPipeServerStream.MaxAllowedServerInstances, + PipeTransmissionMode.Byte, + pipeOptions, + inBufferSize: 0, // Buffer in System.IO.Pipelines + outBufferSize: 0, // Buffer in System.IO.Pipelines + _listener._options.PipeSecurity); + } + else + { + stream = new NamedPipeServerStream( + _listener._endpoint.PipeName, + PipeDirection.InOut, + NamedPipeServerStream.MaxAllowedServerInstances, + PipeTransmissionMode.Byte, + pipeOptions, + inBufferSize: 0, + outBufferSize: 0); + } + return stream; + } + + public bool Return(NamedPipeServerStream obj) => true; } } diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.csproj b/src/Servers/Kestrel/Transport.NamedPipes/src/Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.csproj index bbe5da6ad877..e7b0816ec136 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.csproj +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.csproj @@ -26,6 +26,7 @@ +