Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WaitForConnectionAsync when NamedPipeServerStream is disposed #52825

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public sealed partial class NamedPipeServerStream : PipeStream
private int _inBufferSize;
private int _outBufferSize;
private HandleInheritability _inheritability;
private CancellationTokenSource _internalTokenSource = new CancellationTokenSource();

private void Create(string pipeName, PipeDirection direction, int maxNumberOfServerInstances,
PipeTransmissionMode transmissionMode, PipeOptions options, int inBufferSize, int outBufferSize,
Expand Down Expand Up @@ -77,8 +78,25 @@ public Task WaitForConnectionAsync(CancellationToken cancellationToken)
Task.FromCanceled(cancellationToken) :
WaitForConnectionAsyncCore();

async Task WaitForConnectionAsyncCore() =>
HandleAcceptedSocket(await _instance!.ListeningSocket.AcceptAsync(cancellationToken).ConfigureAwait(false));
async Task WaitForConnectionAsyncCore()
{
Socket acceptedSocket;
CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_internalTokenSource.Token, cancellationToken);
try
{
acceptedSocket = await _instance!.ListeningSocket.AcceptAsync(linkedTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
throw new IOException(SR.IO_PipeBroken);
}
finally
{
linkedTokenSource.Dispose();
}

HandleAcceptedSocket(acceptedSocket);
}
}

private void HandleAcceptedSocket(Socket acceptedSocket)
Expand Down Expand Up @@ -116,9 +134,19 @@ private void HandleAcceptedSocket(Socket acceptedSocket)
State = PipeState.Connected;
}

internal override void DisposeCore(bool disposing) =>
internal override void DisposeCore(bool disposing)
{
Interlocked.Exchange(ref _instance, null)?.Dispose(disposing); // interlocked to avoid shared state problems from erroneous double/concurrent disposes

if (disposing)
{
if (State != PipeState.Closed)
{
_internalTokenSource.Cancel();
}
}
}

public void Disconnect()
{
CheckDisconnectOperations();
Expand Down
87 changes: 65 additions & 22 deletions src/libraries/System.IO.Pipes/tests/PipeStreamConformanceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@ public abstract class NamedPipeStreamConformanceTests : PipeStreamConformanceTes
{
protected override bool BrokenPipePropagatedImmediately => OperatingSystem.IsWindows(); // On Unix, implemented on Sockets, where it won't propagate immediate

protected abstract (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams();
protected abstract NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1);
protected abstract NamedPipeClientStream CreateClientStream(string pipeName);

protected (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams()
{
string pipeName = GetUniquePipeName();
NamedPipeServerStream server = CreateServerStream(pipeName);
NamedPipeClientStream client = CreateClientStream(pipeName);
return (server, client);
}

protected sealed override async Task<StreamPair> CreateConnectedStreamsAsync()
{
Expand All @@ -88,6 +97,15 @@ protected sealed override async Task<StreamPair> CreateConnectedStreamsAsync()
return ((NamedPipeServerStream)streams.Stream2, (NamedPipeClientStream)streams.Stream1);
}

protected async Task ValidateDisposedExceptionsAsync(NamedPipeServerStream server)
{
Assert.Throws<ObjectDisposedException>(() => server.Disconnect());
Assert.Throws<ObjectDisposedException>(() => server.GetImpersonationUserName());
Assert.Throws<ObjectDisposedException>(() => server.WaitForConnection());
await Assert.ThrowsAsync<ObjectDisposedException>(() => server.WaitForConnectionAsync());
await ValidateDisposedExceptionsAsync(server as Stream);
}

/// <summary>
/// Yields every combination of testing options for the OneWayReadWrites test
/// </summary>
Expand Down Expand Up @@ -629,6 +647,37 @@ public async Task CancelTokenOn_Client_ReadWriteCancelledToken_Throws_OperationC
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => clientWriteToken);
}
}

[Fact]
public async Task TwoServerInstances_OnceDisposed_Throws()
{
string pipeName = GetUniquePipeName();
NamedPipeServerStream server1 = CreateServerStream(pipeName, 2);
using NamedPipeServerStream server2 = CreateServerStream(pipeName, 2);

Task wait1 = server1.WaitForConnectionAsync();
Task wait2 = server2.WaitForConnectionAsync();
server1.Dispose();
await ValidateDisposedExceptionsAsync(server1);

using NamedPipeClientStream client = CreateClientStream(pipeName);
await client.ConnectAsync();

await Assert.ThrowsAsync<IOException>(() => wait1);

await wait2;
stephentoub marked this conversation as resolved.
Show resolved Hide resolved

foreach ((Stream writeable, Stream readable) in GetReadWritePairs((server2, client)))
{
byte[] sent = new byte[] { 123 };
byte[] received = new byte[] { 0 };

Task t = Task.Run(() => writeable.Write(sent, 0, sent.Length));
Assert.Equal(sent.Length, readable.Read(received, 0, sent.Length));
Assert.Equal(sent, received);
await t;
}
}
}

public sealed class AnonymousPipeTest_ServerIn_ClientOut : AnonymousPipeStreamConformanceTests
Expand All @@ -653,34 +702,28 @@ protected override (AnonymousPipeServerStream Server, AnonymousPipeClientStream

public sealed class NamedPipeTest_ServerOut_ClientIn : NamedPipeStreamConformanceTests
{
protected override (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams()
{
string pipeName = PipeStreamConformanceTests.GetUniquePipeName();
var server = new NamedPipeServerStream(pipeName, PipeDirection.Out, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
var client = new NamedPipeClientStream(".", pipeName, PipeDirection.In, PipeOptions.Asynchronous);
return (server, client);
}
protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) =>
new NamedPipeServerStream(pipeName, PipeDirection.Out, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);

protected override NamedPipeClientStream CreateClientStream(string pipeName) =>
new NamedPipeClientStream(".", pipeName, PipeDirection.In, PipeOptions.Asynchronous);
}

public sealed class NamedPipeTest_ServerIn_ClientOut : NamedPipeStreamConformanceTests
{
protected override (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams()
{
string pipeName = PipeStreamConformanceTests.GetUniquePipeName();
var server = new NamedPipeServerStream(pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
var client = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, PipeOptions.Asynchronous);
return (server, client);
}
protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) =>
new NamedPipeServerStream(pipeName, PipeDirection.In, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);

protected override NamedPipeClientStream CreateClientStream(string pipeName) =>
new NamedPipeClientStream(".", pipeName, PipeDirection.Out, PipeOptions.Asynchronous);
}

public sealed class NamedPipeTest_ServerInOut_ClientInOut : NamedPipeStreamConformanceTests
{
protected override (NamedPipeServerStream Server, NamedPipeClientStream Client) CreateServerAndClientStreams()
{
string pipeName = PipeStreamConformanceTests.GetUniquePipeName();
var server = new NamedPipeServerStream(pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
var client = new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
return (server, client);
}
protected override NamedPipeServerStream CreateServerStream(string pipeName, int maxInstances = 1) =>
new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);

protected override NamedPipeClientStream CreateClientStream(string pipeName) =>
new NamedPipeClientStream(".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
}
}