Skip to content

Commit

Permalink
Add pool test
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK committed Feb 10, 2023
1 parent a9249f4 commit 0140321
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ public NamedPipeConnectionListener(
NamedPipeEndPoint endpoint,
NamedPipeTransportOptions options,
ILoggerFactory loggerFactory,
ObjectPoolProvider objectPoolProvider,
Mutex mutex)
{
_log = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes");
_endpoint = endpoint;
_options = options;
_namedPipeServerStreamPool = new DefaultObjectPoolProvider().Create(new NamedPipeServerStreamPoolPolicy(this));
_mutex = mutex;
_memoryPool = options.MemoryPoolFactory();
_listeningToken = _listeningTokenSource.Token;
// Have to create the pool here (instead of DI) because the pool is specific to an endpoint.
_namedPipeServerStreamPool = objectPoolProvider.Create(new NamedPipeServerStreamPoolPolicy(endpoint, options));

// The OS maintains a backlog of clients that are waiting to connect, so the app queue only stores a single connection.
// We want to have a queue plus a background task that populates the queue, rather than creating NamedPipeServerStream
Expand Down Expand Up @@ -176,43 +178,45 @@ public async ValueTask DisposeAsync()

// Dispose pool after listening tasks are complete so there is no chance a stream is fetched from the pool after the pool is disposed.
// Important to dispose because this empties and disposes streams in the pool.
((IDisposable)_namedPipeServerStreamPool).Dispose();
(_namedPipeServerStreamPool as IDisposable)?.Dispose();
}

private sealed class NamedPipeServerStreamPoolPolicy : IPooledObjectPolicy<NamedPipeServerStream>
{
public NamedPipeConnectionListener _listener;
private readonly NamedPipeEndPoint _endpoint;
private readonly NamedPipeTransportOptions _options;

public NamedPipeServerStreamPoolPolicy(NamedPipeConnectionListener listener)
public NamedPipeServerStreamPoolPolicy(NamedPipeEndPoint endpoint, NamedPipeTransportOptions options)
{
_listener = listener;
_endpoint = endpoint;
_options = options;
}

public NamedPipeServerStream Create()
{
NamedPipeServerStream stream;
var pipeOptions = NamedPipeOptions.Asynchronous | NamedPipeOptions.WriteThrough;
if (_listener._options.CurrentUserOnly)
if (_options.CurrentUserOnly)
{
pipeOptions |= NamedPipeOptions.CurrentUserOnly;
}

if (_listener._options.PipeSecurity != null)
if (_options.PipeSecurity != null)
{
stream = NamedPipeServerStreamAcl.Create(
_listener._endpoint.PipeName,
_endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
pipeOptions,
inBufferSize: 0, // Buffer in System.IO.Pipelines
outBufferSize: 0, // Buffer in System.IO.Pipelines
_listener._options.PipeSecurity);
_options.PipeSecurity);
}
else
{
stream = new NamedPipeServerStream(
_listener._endpoint.PipeName,
_endpoint.PipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Byte,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Net;
using Microsoft.AspNetCore.Connections;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Microsoft.Extensions.Options;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal;
Expand All @@ -14,17 +15,20 @@ internal sealed class NamedPipeTransportFactory : IConnectionListenerFactory, IC
private const string LocalComputerServerName = ".";

private readonly ILoggerFactory _loggerFactory;
private readonly ObjectPoolProvider _objectPoolProvider;
private readonly NamedPipeTransportOptions _options;

public NamedPipeTransportFactory(
ILoggerFactory loggerFactory,
IOptions<NamedPipeTransportOptions> options)
IOptions<NamedPipeTransportOptions> options,
ObjectPoolProvider objectPoolProvider)
{
ArgumentNullException.ThrowIfNull(loggerFactory);

Debug.Assert(OperatingSystem.IsWindows(), "Named pipes transport requires a Windows operating system.");

_loggerFactory = loggerFactory;
_objectPoolProvider = objectPoolProvider;
_options = options.Value;
}

Expand Down Expand Up @@ -52,7 +56,7 @@ public ValueTask<IConnectionListener> BindAsync(EndPoint endpoint, CancellationT
throw new AddressInUseException($"Named pipe '{namedPipeEndPoint.PipeName}' is already in use by Kestrel.");
}

var listener = new NamedPipeConnectionListener(namedPipeEndPoint, _options, _loggerFactory, mutex);
var listener = new NamedPipeConnectionListener(namedPipeEndPoint, _options, _loggerFactory, _objectPoolProvider, mutex);
listener.Start();

return new ValueTask<IConnectionListener>(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes;
using Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.ObjectPool;

namespace Microsoft.AspNetCore.Hosting;

Expand All @@ -29,6 +31,7 @@ public static IWebHostBuilder UseNamedPipes(this IWebHostBuilder hostBuilder)

hostBuilder.ConfigureServices(services =>
{
services.TryAddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
services.AddSingleton<IConnectionListenerFactory, NamedPipeTransportFactory>();
});
return hostBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Tests;

Expand All @@ -24,11 +25,70 @@ public async Task AcceptAsync_AfterUnbind_ReturnNull()
Assert.Null(await connectionListener.AcceptAsync().DefaultTimeout());
}

private class TestObjectPoolProvider : ObjectPoolProvider
{
public List<ITestObjectPool> Pools { get; } = new List<ITestObjectPool>();

public override ObjectPool<T> Create<T>(IPooledObjectPolicy<T> policy)
{
var pool = new TestObjectPool<T>(policy);
Pools.Add(pool);

return pool;
}

private class TestObjectPool<T> : ObjectPool<T>, ITestObjectPool where T : class
{
private readonly IPooledObjectPolicy<T> _policy;

public TestObjectPool(IPooledObjectPolicy<T> policy)
{
_policy = policy;
}

public int GetCount { get; private set; }
public int ReturnSuccessCount { get; private set; }
public int ReturnFailureCount { get; private set; }

public override T Get()
{
GetCount++;
return _policy.Create();
}

public override void Return(T obj)
{
if (_policy.Return(obj))
{
ReturnSuccessCount++;
}
else
{
ReturnFailureCount++;
}
}
}
}

private interface ITestObjectPool
{
int GetCount { get; }
int ReturnSuccessCount { get; }
int ReturnFailureCount { get; }
}

[ConditionalFact]
public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts()
{
// Arrange
await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory);
var testObjectPoolProvider = new TestObjectPoolProvider();
var options = new NamedPipeTransportOptions
{
ListenerQueueCount = 2
};
await using var connectionListener = await NamedPipeTestHelpers.CreateConnectionListenerFactory(LoggerFactory, options: options, objectPoolProvider: testObjectPoolProvider);
var pool = Assert.Single(testObjectPoolProvider.Pools);
Assert.Equal(options.ListenerQueueCount, pool.GetCount);

// Stream 1
var acceptTask1 = connectionListener.AcceptAsync();
Expand All @@ -40,6 +100,10 @@ public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts()
await serverConnection1.DisposeAsync().AsTask().DefaultTimeout();
Assert.True(serverConnection1.ConnectionClosed.IsCancellationRequested, "Connection 1 should be closed");

Assert.Equal(options.ListenerQueueCount + 1, pool.GetCount);
Assert.Equal(1, pool.ReturnSuccessCount);
Assert.Equal(0, pool.ReturnFailureCount);

// Stream 2
var acceptTask2 = connectionListener.AcceptAsync();
await using var clientStream2 = NamedPipeTestHelpers.CreateClientStream(connectionListener.EndPoint);
Expand All @@ -49,6 +113,10 @@ public async Task AcceptAsync_ClientCreatesConnection_ServerAccepts()
Assert.False(serverConnection2.ConnectionClosed.IsCancellationRequested, "Connection 2 should be open");
await serverConnection2.DisposeAsync().AsTask().DefaultTimeout();
Assert.True(serverConnection2.ConnectionClosed.IsCancellationRequested, "Connection 2 should be closed");

Assert.Equal(options.ListenerQueueCount + 2, pool.GetCount);
Assert.Equal(2, pool.ReturnSuccessCount);
Assert.Equal(0, pool.ReturnFailureCount);
}

[ConditionalFact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.ObjectPool;
using Microsoft.Extensions.Options;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.NamedPipes.Tests;
Expand All @@ -22,18 +23,20 @@ internal static class NamedPipeTestHelpers

public static NamedPipeTransportFactory CreateTransportFactory(
ILoggerFactory loggerFactory = null,
NamedPipeTransportOptions options = null)
NamedPipeTransportOptions options = null,
ObjectPoolProvider objectPoolProvider = null)
{
options ??= new NamedPipeTransportOptions();
return new NamedPipeTransportFactory(loggerFactory ?? NullLoggerFactory.Instance, Options.Create(options));
return new NamedPipeTransportFactory(loggerFactory ?? NullLoggerFactory.Instance, Options.Create(options), objectPoolProvider ?? new DefaultObjectPoolProvider());
}

public static async Task<NamedPipeConnectionListener> CreateConnectionListenerFactory(
ILoggerFactory loggerFactory = null,
string pipeName = null,
NamedPipeTransportOptions options = null)
NamedPipeTransportOptions options = null,
ObjectPoolProvider objectPoolProvider = null)
{
var transportFactory = CreateTransportFactory(loggerFactory, options);
var transportFactory = CreateTransportFactory(loggerFactory, options, objectPoolProvider);

var endpoint = new NamedPipeEndPoint(pipeName ?? GetUniquePipeName());

Expand Down

0 comments on commit 0140321

Please sign in to comment.