Skip to content

Commit

Permalink
Refactor named pipes connection listener completion (#46521)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Feb 10, 2023
1 parent 65c2f0d commit 8e0155c
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal sealed class NamedPipeConnectionListener : IConnectionListener
private readonly PipeOptions _inputOptions;
private readonly PipeOptions _outputOptions;
private readonly Mutex _mutex;
private Task[]? _listeningTasks;
private Task? _completeListeningTask;
private int _disposed;

public NamedPipeConnectionListener(
Expand Down Expand Up @@ -65,73 +65,79 @@ internal void ReturnStream(NamedPipeServerStream namedPipeServerStream)

public void Start()
{
Debug.Assert(_listeningTasks == null, "Already started");
Debug.Assert(_completeListeningTask == null, "Already started");

_listeningTasks = new Task[_options.ListenerQueueCount];
var listeningTasks = new Task[_options.ListenerQueueCount];

for (var i = 0; i < _listeningTasks.Length; i++)
for (var i = 0; i < listeningTasks.Length; i++)
{
// Start first stream inline to catch creation errors.
var initialStream = _namedPipeServerStreamPool.Get();

_listeningTasks[i] = Task.Run(() => StartAsync(initialStream));
listeningTasks[i] = Task.Run(() => StartAsync(initialStream));
}

_completeListeningTask = Task.Run(async () =>
{
try
{
await Task.WhenAll(listeningTasks);
_acceptedQueue.Writer.TryComplete();
}
catch (Exception ex)
{
_acceptedQueue.Writer.TryComplete(ex);
NamedPipeLog.ConnectionListenerAborted(_log, ex);
}
});
}

public EndPoint EndPoint => _endpoint;

private async Task StartAsync(NamedPipeServerStream nextStream)
{
try
while (true)
{
while (true)
try
{
try
{
var stream = nextStream;
var stream = nextStream;

await stream.WaitForConnectionAsync(_listeningToken);
await stream.WaitForConnectionAsync(_listeningToken);

var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
connection.Start();
var connection = new NamedPipeConnection(this, stream, _endpoint, _log, _memoryPool, _inputOptions, _outputOptions);
connection.Start();

// Create the next stream before writing connected stream to the channel.
// This ensures there is always a created stream and another process can't
// create a stream with the same name with different a access policy.
nextStream = _namedPipeServerStreamPool.Get();
// Create the next stream before writing connected stream to the channel.
// This ensures there is always a created stream and another process can't
// create a stream with the same name with different a access policy.
nextStream = _namedPipeServerStreamPool.Get();

while (!_acceptedQueue.Writer.TryWrite(connection))
while (!_acceptedQueue.Writer.TryWrite(connection))
{
if (!await _acceptedQueue.Writer.WaitToWriteAsync(_listeningToken))
{
if (!await _acceptedQueue.Writer.WaitToWriteAsync(_listeningToken))
{
throw new InvalidOperationException("Accept queue writer was unexpectedly closed.");
}
throw new InvalidOperationException("Accept queue writer was unexpectedly closed.");
}
}
catch (IOException ex) when (!_listeningToken.IsCancellationRequested)
{
// WaitForConnectionAsync can throw IOException when the pipe is broken.
NamedPipeLog.ConnectionListenerBrokenPipe(_log, ex);

// Dispose existing pipe, create a new one and continue accepting.
nextStream.Dispose();
nextStream = _namedPipeServerStreamPool.Get();
}
catch (OperationCanceledException ex) when (_listeningToken.IsCancellationRequested)
{
// Cancelled the current token
NamedPipeLog.ConnectionListenerAborted(_log, ex);
break;
}
}
catch (IOException ex) when (!_listeningToken.IsCancellationRequested)
{
// WaitForConnectionAsync can throw IOException when the pipe is broken.
NamedPipeLog.ConnectionListenerBrokenPipe(_log, ex);

nextStream.Dispose();
_acceptedQueue.Writer.TryComplete();
}
catch (Exception ex)
{
_acceptedQueue.Writer.TryComplete(ex);
// Dispose existing pipe, create a new one and continue accepting.
nextStream.Dispose();
nextStream = _namedPipeServerStreamPool.Get();
}
catch (OperationCanceledException) when (_listeningToken.IsCancellationRequested)
{
// Token was canceled. The listener is shutting down.
break;
}
}

NamedPipeLog.ConnectionListenerQueueExited(_log);
nextStream.Dispose();
}

public async ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -161,9 +167,9 @@ public async ValueTask DisposeAsync()

_listeningTokenSource.Dispose();
_mutex.Dispose();
if (_listeningTasks != null)
if (_completeListeningTask != null)
{
await Task.WhenAll(_listeningTasks);
await _completeListeningTask;
}

// Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static void ConnectionError(ILogger logger, BaseConnectionContext connect
}
}

[LoggerMessage(3, LogLevel.Debug, "Named pipe listener aborted.", EventName = "ConnectionListenerAborted")]
[LoggerMessage(3, LogLevel.Error, "Named pipe listener aborted.", EventName = "ConnectionListenerAborted")]
public static partial void ConnectionListenerAborted(ILogger logger, Exception exception);

[LoggerMessage(4, LogLevel.Debug, @"Connection id ""{ConnectionId}"" paused.", EventName = "ConnectionPause", SkipEnabledCheck = true)]
Expand Down Expand Up @@ -79,4 +79,7 @@ public static void ConnectionDisconnect(ILogger logger, NamedPipeConnection conn

[LoggerMessage(8, LogLevel.Debug, "Named pipe listener received broken pipe while waiting for a connection.", EventName = "ConnectionListenerBrokenPipe")]
public static partial void ConnectionListenerBrokenPipe(ILogger logger, Exception ex);

[LoggerMessage(9, LogLevel.Trace, "Named pipe listener queue exited.", EventName = "ConnectionListenerQueueExited")]
public static partial void ConnectionListenerQueueExited(ILogger logger);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts()
}

[ConditionalFact]
public async Task AcceptAsync_UnbindAfterCall_CleanExitAndLog()
public async Task AcceptAsync_UnbindAfterCall_CleanExit()
{
// Arrange
await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory);
Expand All @@ -65,7 +65,7 @@ public async Task AcceptAsync_UnbindAfterCall_CleanExitAndLog()
// Assert
Assert.Null(await acceptTask.AsTask().DefaultTimeout());

Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted");
Assert.DoesNotContain(LogMessages, m => m.LogLevel >= LogLevel.Error);
}

[Theory]
Expand Down Expand Up @@ -192,7 +192,7 @@ private record ClientStreamContext(NamedPipeClientStream ClientStream, Task Conn
}

[ConditionalFact]
public async Task AcceptAsync_DisposeAfterCall_CleanExitAndLog()
public async Task AcceptAsync_DisposeAfterCall_CleanExit()
{
// Arrange
await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory);
Expand All @@ -205,7 +205,7 @@ public async Task AcceptAsync_DisposeAfterCall_CleanExitAndLog()
// Assert
Assert.Null(await acceptTask.AsTask().DefaultTimeout());

Assert.Contains(LogMessages, m => m.EventId.Name == "ConnectionListenerAborted");
Assert.DoesNotContain(LogMessages, m => m.LogLevel >= LogLevel.Error);
}

[ConditionalFact]
Expand Down

0 comments on commit 8e0155c

Please sign in to comment.