Skip to content

Dispatch connection execution #12265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 3 additions & 51 deletions src/Servers/Kestrel/Core/src/Internal/ConnectionDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ async Task AcceptConnectionsAsync()
break;
}

_ = Execute(new KestrelConnection(connection, _serviceContext.Log));
var id = Interlocked.Increment(ref _lastConnectionId);
var kestrelConnection = new KestrelConnection(id, _serviceContext, _connectionDelegate, connection, _serviceContext.Log);
ThreadPool.UnsafeQueueUserWorkItem(kestrelConnection, preferLocal: false);
}
}
catch (Exception ex)
Expand All @@ -65,55 +67,5 @@ async Task AcceptConnectionsAsync()
}
}
}

internal async Task Execute(KestrelConnection connection)
{
var id = Interlocked.Increment(ref _lastConnectionId);
var connectionContext = connection.TransportConnection;

try
{
_serviceContext.ConnectionManager.AddConnection(id, connection);

Log.ConnectionStart(connectionContext.ConnectionId);
KestrelEventSource.Log.ConnectionStart(connectionContext);

using (BeginConnectionScope(connectionContext))
{
try
{
await _connectionDelegate(connectionContext);
}
catch (Exception ex)
{
Log.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
}
}
}
finally
{
await connection.FireOnCompletedAsync();

Log.ConnectionStop(connectionContext.ConnectionId);
KestrelEventSource.Log.ConnectionStop(connectionContext);

// Dispose the transport connection, this needs to happen before removing it from the
// connection manager so that we only signal completion of this connection after the transport
// is properly torn down.
await connection.TransportConnection.DisposeAsync();

_serviceContext.ConnectionManager.RemoveConnection(id);
}
}

private IDisposable BeginConnectionScope(ConnectionContext connectionContext)
{
if (Log.IsEnabled(LogLevel.Critical))
{
return Log.BeginScope(new ConnectionLogScope(connectionContext.ConnectionId));
}

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure
{
internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompleteFeature, IConnectionLifetimeNotificationFeature
internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompleteFeature, IConnectionLifetimeNotificationFeature, IThreadPoolWorkItem
{
private List<(Action<object> handler, object state)> _heartbeatHandlers;
private readonly object _heartbeatLock = new object();
Expand All @@ -21,9 +21,19 @@ internal class KestrelConnection : IConnectionHeartbeatFeature, IConnectionCompl

private readonly CancellationTokenSource _connectionClosingCts = new CancellationTokenSource();
private readonly TaskCompletionSource<object> _completionTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);

public KestrelConnection(ConnectionContext connectionContext, ILogger logger)
private readonly long _id;
private readonly ServiceContext _serviceContext;
private readonly ConnectionDelegate _connectionDelegate;

public KestrelConnection(long id,
ServiceContext serviceContext,
ConnectionDelegate connectionDelegate,
ConnectionContext connectionContext,
IKestrelTrace logger)
{
_id = id;
_serviceContext = serviceContext;
_connectionDelegate = connectionDelegate;
Logger = logger;
TransportConnection = connectionContext;

Expand All @@ -33,7 +43,7 @@ public KestrelConnection(ConnectionContext connectionContext, ILogger logger)
ConnectionClosedRequested = _connectionClosingCts.Token;
}

private ILogger Logger { get; }
private IKestrelTrace Logger { get; }

public ConnectionContext TransportConnection { get; set; }
public CancellationToken ConnectionClosedRequested { get; set; }
Expand Down Expand Up @@ -164,5 +174,59 @@ public void Complete()

_connectionClosingCts.Dispose();
}

void IThreadPoolWorkItem.Execute()
{
_ = ExecuteAsync();
}

internal async Task ExecuteAsync()
{
var connectionContext = TransportConnection;

try
{
_serviceContext.ConnectionManager.AddConnection(_id, this);

Logger.ConnectionStart(connectionContext.ConnectionId);
KestrelEventSource.Log.ConnectionStart(connectionContext);

using (BeginConnectionScope(connectionContext))
{
try
{
await _connectionDelegate(connectionContext);
}
catch (Exception ex)
{
Logger.LogError(0, ex, "Unhandled exception while processing {ConnectionId}.", connectionContext.ConnectionId);
}
}
}
finally
{
await FireOnCompletedAsync();

Logger.ConnectionStop(connectionContext.ConnectionId);
KestrelEventSource.Log.ConnectionStop(connectionContext);

// Dispose the transport connection, this needs to happen before removing it from the
// connection manager so that we only signal completion of this connection after the transport
// is properly torn down.
await TransportConnection.DisposeAsync();

_serviceContext.ConnectionManager.RemoveConnection(_id);
}
}

private IDisposable BeginConnectionScope(ConnectionContext connectionContext)
{
if (Logger.IsEnabled(LogLevel.Critical))
{
return Logger.BeginScope(new ConnectionLogScope(connectionContext.ConnectionId));
}

return null;
}
}
}
27 changes: 13 additions & 14 deletions src/Servers/Kestrel/Core/test/ConnectionDispatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests
public class ConnectionDispatcherTests
{
[Fact]
public void OnConnectionCreatesLogScopeWithConnectionId()
public async Task OnConnectionCreatesLogScopeWithConnectionId()
{
var serviceContext = new TestServiceContext();
// This needs to run inline
var tcs = new TaskCompletionSource<object>();
var dispatcher = new ConnectionDispatcher(serviceContext, _ => tcs.Task);

var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
connection.ConnectionClosed = new CancellationToken(canceled: true);
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => tcs.Task, connection, serviceContext.Log);

_ = dispatcher.Execute(new KestrelConnection(connection, Mock.Of<ILogger>()));
var task = kestrelConnection.ExecuteAsync();

// The scope should be created
var scopeObjects = ((TestKestrelTrace)serviceContext.Log)
Expand All @@ -47,6 +47,8 @@ public void OnConnectionCreatesLogScopeWithConnectionId()

tcs.TrySetResult(null);

await task;

// Verify the scope was disposed after request processing completed
Assert.True(((TestKestrelTrace)serviceContext.Log).Logger.Scopes.IsEmpty);
}
Expand All @@ -73,19 +75,18 @@ public async Task StartAcceptingConnectionsAsyncLogsIfAcceptAsyncThrows()
public async Task OnConnectionFiresOnCompleted()
{
var serviceContext = new TestServiceContext();
var dispatcher = new ConnectionDispatcher(serviceContext, _ => Task.CompletedTask);

var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
connection.ConnectionClosed = new CancellationToken(canceled: true);
var kestrelConnection = new KestrelConnection(connection, Mock.Of<ILogger>());
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, connection, serviceContext.Log);
var completeFeature = kestrelConnection.TransportConnection.Features.Get<IConnectionCompleteFeature>();

Assert.NotNull(completeFeature);
object stateObject = new object();
object callbackState = null;
completeFeature.OnCompleted(state => { callbackState = state; return Task.CompletedTask; }, stateObject);

await dispatcher.Execute(kestrelConnection);
await kestrelConnection.ExecuteAsync();

Assert.Equal(stateObject, callbackState);
}
Expand All @@ -94,25 +95,23 @@ public async Task OnConnectionFiresOnCompleted()
public async Task OnConnectionOnCompletedExceptionCaught()
{
var serviceContext = new TestServiceContext();
var dispatcher = new ConnectionDispatcher(serviceContext, _ => Task.CompletedTask);

var logger = ((TestKestrelTrace)serviceContext.Log).Logger;
var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
connection.ConnectionClosed = new CancellationToken(canceled: true);
var mockLogger = new Mock<ILogger>();
var kestrelConnection = new KestrelConnection(connection, mockLogger.Object);
var kestrelConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, connection, serviceContext.Log);
var completeFeature = kestrelConnection.TransportConnection.Features.Get<IConnectionCompleteFeature>();

Assert.NotNull(completeFeature);
object stateObject = new object();
object callbackState = null;
completeFeature.OnCompleted(state => { callbackState = state; throw new InvalidTimeZoneException(); }, stateObject);

await dispatcher.Execute(kestrelConnection);
await kestrelConnection.ExecuteAsync();

Assert.Equal(stateObject, callbackState);
var log = mockLogger.Invocations.First();
Assert.Equal("An error occured running an IConnectionCompleteFeature.OnCompleted callback.", log.Arguments[2].ToString());
Assert.IsType<InvalidTimeZoneException>(log.Arguments[3]);
var errors = logger.Messages.Where(e => e.LogLevel >= LogLevel.Error).ToArray();
Assert.Single(errors);
Assert.Equal("An error occured running an IConnectionCompleteFeature.OnCompleted callback.", errors[0].Message);
}

private class ThrowingListener : IConnectionListener
Expand Down
5 changes: 4 additions & 1 deletion src/Servers/Kestrel/Core/test/HttpConnectionManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Testing;
using Microsoft.Extensions.Logging;
using Moq;
using Xunit;
Expand Down Expand Up @@ -39,9 +41,10 @@ private void UnrootedConnectionsGetRemovedFromHeartbeatInnerScope(
ConnectionManager httpConnectionManager,
Mock<IKestrelTrace> trace)
{
var serviceContext = new TestServiceContext();
var mock = new Mock<DefaultConnectionContext>() { CallBase = true };
mock.Setup(m => m.ConnectionId).Returns(connectionId);
var httpConnection = new KestrelConnection(mock.Object, Mock.Of<ILogger>());
var httpConnection = new KestrelConnection(0, serviceContext, _ => Task.CompletedTask, mock.Object, Mock.Of<IKestrelTrace>());

httpConnectionManager.AddConnection(0, httpConnection);

Expand Down