diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs index 89eb5df6f68f..c908eebfe816 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeConnectionListener.cs @@ -28,7 +28,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener private readonly PipeOptions _inputOptions; private readonly PipeOptions _outputOptions; private readonly Mutex _mutex; - private Task[]? _listeningTasks; + private Task? _completeListeningTask; private int _disposed; public NamedPipeConnectionListener( @@ -65,73 +65,79 @@ internal void ReturnStream(NamedPipeServerStream namedPipeServerStream) public void Start() { - Debug.Assert(_listeningTasks == null, "Already started"); + Debug.Assert(_completeListeningTask == null, "Already started"); - _listeningTasks = new Task[_options.ListenerQueueCount]; + var listeningTasks = new Task[_options.ListenerQueueCount]; - for (var i = 0; i < _listeningTasks.Length; i++) + for (var i = 0; i < listeningTasks.Length; i++) { // Start first stream inline to catch creation errors. var initialStream = _namedPipeServerStreamPool.Get(); - _listeningTasks[i] = Task.Run(() => StartAsync(initialStream)); + listeningTasks[i] = Task.Run(() => StartAsync(initialStream)); } + + _completeListeningTask = Task.Run(async () => + { + try + { + await Task.WhenAll(listeningTasks); + _acceptedQueue.Writer.TryComplete(); + } + catch (Exception ex) + { + _acceptedQueue.Writer.TryComplete(ex); + NamedPipeLog.ConnectionListenerAborted(_log, ex); + } + }); } public EndPoint EndPoint => _endpoint; private async Task StartAsync(NamedPipeServerStream nextStream) { - try + while (true) { - while (true) + try { - try - { - var stream = nextStream; + var stream = nextStream; - await stream.WaitForConnectionAsync(_listeningToken); + await stream.WaitForConnectionAsync(_listeningToken); - var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions); - connection.Start(); + var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions); + connection.Start(); - // Create the next stream before writing connected stream to the channel. - // This ensures there is always a created stream and another process can't - // create a stream with the same name with different a access policy. - nextStream = _namedPipeServerStreamPool.Get(); + // Create the next stream before writing connected stream to the channel. + // This ensures there is always a created stream and another process can't + // create a stream with the same name with different a access policy. + nextStream = _namedPipeServerStreamPool.Get(); - while (!_acceptedQueue.Writer.TryWrite(connection)) + while (!_acceptedQueue.Writer.TryWrite(connection)) + { + if (!await _acceptedQueue.Writer.WaitToWriteAsync(_listeningToken)) { - if (!await _acceptedQueue.Writer.WaitToWriteAsync(_listeningToken)) - { - throw new InvalidOperationException("Accept queue writer was unexpectedly closed."); - } + throw new InvalidOperationException("Accept queue writer was unexpectedly closed."); } } - catch (IOException ex) when (!_listeningToken.IsCancellationRequested) - { - // WaitForConnectionAsync can throw IOException when the pipe is broken. - NamedPipeLog.ConnectionListenerBrokenPipe(_log, ex); - - // Dispose existing pipe, create a new one and continue accepting. - nextStream.Dispose(); - nextStream = _namedPipeServerStreamPool.Get(); - } - catch (OperationCanceledException ex) when (_listeningToken.IsCancellationRequested) - { - // Cancelled the current token - NamedPipeLog.ConnectionListenerAborted(_log, ex); - break; - } } + catch (IOException ex) when (!_listeningToken.IsCancellationRequested) + { + // WaitForConnectionAsync can throw IOException when the pipe is broken. + NamedPipeLog.ConnectionListenerBrokenPipe(_log, ex); - nextStream.Dispose(); - _acceptedQueue.Writer.TryComplete(); - } - catch (Exception ex) - { - _acceptedQueue.Writer.TryComplete(ex); + // Dispose existing pipe, create a new one and continue accepting. + nextStream.Dispose(); + nextStream = _namedPipeServerStreamPool.Get(); + } + catch (OperationCanceledException) when (_listeningToken.IsCancellationRequested) + { + // Token was canceled. The listener is shutting down. + break; + } } + + NamedPipeLog.ConnectionListenerQueueExited(_log); + nextStream.Dispose(); } public async ValueTask AcceptAsync(CancellationToken cancellationToken = default) @@ -161,9 +167,9 @@ public async ValueTask DisposeAsync() _listeningTokenSource.Dispose(); _mutex.Dispose(); - if (_listeningTasks != null) + if (_completeListeningTask != null) { - await Task.WhenAll(_listeningTasks); + await _completeListeningTask; } // Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed. diff --git a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeLog.cs b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeLog.cs index e02ad8bb9a6d..1e50f7eccdf6 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeLog.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/src/Internal/NamedPipeLog.cs @@ -30,7 +30,7 @@ public static void ConnectionError(ILogger logger, BaseConnectionContext connect } } - [LoggerMessage(3, LogLevel.Debug, "Named pipe listener aborted.", EventName = "ConnectionListenerAborted")] + [LoggerMessage(3, LogLevel.Error, "Named pipe listener aborted.", EventName = "ConnectionListenerAborted")] public static partial void ConnectionListenerAborted(ILogger logger, Exception exception); [LoggerMessage(4, LogLevel.Debug, @"Connection id ""{ConnectionId}"" paused.", EventName = "ConnectionPause", SkipEnabledCheck = true)] @@ -79,4 +79,7 @@ public static void ConnectionDisconnect(ILogger logger, NamedPipeConnection conn [LoggerMessage(8, LogLevel.Debug, "Named pipe listener received broken pipe while waiting for a connection.", EventName = "ConnectionListenerBrokenPipe")] public static partial void ConnectionListenerBrokenPipe(ILogger logger, Exception ex); + + [LoggerMessage(9, LogLevel.Trace, "Named pipe listener queue exited.", EventName = "ConnectionListenerQueueExited")] + public static partial void ConnectionListenerQueueExited(ILogger logger); } diff --git a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs index 4c2540f81f13..c98994df5c92 100644 --- a/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs +++ b/src/Servers/Kestrel/Transport.NamedPipes/test/NamedPipeConnectionListenerTests.cs @@ -52,7 +52,7 @@ public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts() } [ConditionalFact] - public async Task AcceptAsync_UnbindAfterCall_CleanExitAndLog() + public async Task AcceptAsync_UnbindAfterCall_CleanExit() { // Arrange await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory); @@ -65,7 +65,7 @@ public async Task AcceptAsync_UnbindAfterCall_CleanExitAndLog() // Assert Assert.Null(await acceptTask.AsTask().DefaultTimeout()); - Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted"); + Assert.DoesNotContain(LogMessages, m => m.LogLevel >= LogLevel.Error); } [Theory] @@ -192,7 +192,7 @@ private record ClientStreamContext(NamedPipeClientStream ClientStream, Task Conn } [ConditionalFact] - public async Task AcceptAsync_DisposeAfterCall_CleanExitAndLog() + public async Task AcceptAsync_DisposeAfterCall_CleanExit() { // Arrange await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory); @@ -205,7 +205,7 @@ public async Task AcceptAsync_DisposeAfterCall_CleanExitAndLog() // Assert Assert.Null(await acceptTask.AsTask().DefaultTimeout()); - Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted"); + Assert.DoesNotContain(LogMessages, m => m.LogLevel >= LogLevel.Error); } [ConditionalFact]