Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,6 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
if (connection is not null)
{
Log.EstablishedConnection(_logger);

// Allow the reads to be canceled
connection.Cancellation ??= new CancellationTokenSource();
}
}
else
Expand All @@ -198,7 +195,7 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti

if (connection.TransportType != HttpTransportType.WebSockets || connection.UseStatefulReconnect)
{
if (!await connection.CancelPreviousPoll(context))
if (connection.ApplicationTask is not null && !await connection.CancelPreviousPoll(context))
{
// Connection closed. It's already set the response status code.
return;
Expand All @@ -215,6 +212,9 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
case HttpTransportType.None:
break;
case HttpTransportType.WebSockets:
// Allow the reads to be canceled
connection.Cancellation ??= new CancellationTokenSource();

var isReconnect = connection.ApplicationTask is not null;
var ws = new WebSocketsServerTransport(options.WebSockets, connection.Application, connection, _loggerFactory);
if (!connection.TryActivatePersistentConnection(connectionDelegate, ws, currentRequestTcs.Task, context, _logger))
Expand Down Expand Up @@ -376,6 +376,11 @@ private async Task ProcessNegotiate(HttpContext context, HttpConnectionDispatche
if (error == null)
{
connection = CreateConnection(options, clientProtocolVersion, useStatefulReconnect);
if (connection.Status == HttpConnectionStatus.Disposed)
{
// Happens if the server is shutting down when a new negotiate request comes in
error = "The connection was closed before negotiation completed.";
}
}

// Set the Connection ID on the logging scope so that logs from now on will have the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ internal sealed partial class HttpConnectionManager
private readonly ILogger<HttpConnectionContext> _connectionLogger;
private readonly TimeSpan _disconnectTimeout;
private readonly HttpConnectionsMetrics _metrics;
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly Lock _closeLock = new();
private bool _closed;
Comment on lines +29 to +31
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field _closed reads like a verb; renaming to _isClosed (and updating references) improves clarity by conveying a boolean state.

Copilot generated this review using guidance from repository custom instructions.


public HttpConnectionManager(ILoggerFactory loggerFactory, IHostApplicationLifetime appLifetime, IOptions<ConnectionOptions> connectionOptions, HttpConnectionsMetrics metrics)
{
Expand All @@ -34,6 +37,7 @@ public HttpConnectionManager(ILoggerFactory loggerFactory, IHostApplicationLifet
_nextHeartbeat = new PeriodicTimer(_heartbeatTickRate);
_disconnectTimeout = connectionOptions.Value.DisconnectTimeout ?? ConnectionOptionsSetup.DefaultDisconectTimeout;
_metrics = metrics;
_applicationLifetime = appLifetime;

// Register these last as the callbacks could run immediately
appLifetime.ApplicationStarted.Register(Start);
Expand Down Expand Up @@ -82,6 +86,12 @@ internal HttpConnectionContext CreateConnection(HttpConnectionDispatcherOptions

_connections.TryAdd(connectionToken, connection);

// If the application is stopping don't allow new connections to be created
if (_applicationLifetime.ApplicationStopping.IsCancellationRequested || _closed)
{
CloseConnections();
}

return connection;
}

Expand Down Expand Up @@ -184,20 +194,28 @@ public void Scan()

public void CloseConnections()
{
// Stop firing the timer
_nextHeartbeat.Dispose();
lock (_closeLock)
{
if (!_closed)
{
// Stop firing the timer
_nextHeartbeat.Dispose();

var tasks = new List<Task>(_connections.Count);
_closed = true;
}

// REVIEW: In the future we can consider a hybrid where we first try to wait for shutdown
// for a certain time frame then after some grace period we shutdown more aggressively
foreach (var c in _connections)
{
// We're shutting down so don't wait for closing the application
tasks.Add(DisposeAndRemoveAsync(c.Value, closeGracefully: false, HttpConnectionStopStatus.AppShutdown));
}
var tasks = new List<Task>(_connections.Count);

// REVIEW: In the future we can consider a hybrid where we first try to wait for shutdown
// for a certain time frame then after some grace period we shutdown more aggressively
foreach (var c in _connections)
{
// We're shutting down so don't wait for closing the application
tasks.Add(DisposeAndRemoveAsync(c.Value, closeGracefully: false, HttpConnectionStopStatus.AppShutdown));
}

Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(5));
Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(5));
}
}

internal async Task DisposeAndRemoveAsync(HttpConnectionContext connection, bool closeGracefully, HttpConnectionStopStatus status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,38 @@ public async Task NoNegotiateVersionInQueryStringThrowsWhenMinProtocolVersionIsS
}
}

[Fact]
public async Task NegotiateAfterApplicationStoppingReturnsError()
{
using (StartVerifiableLog())
{
var appLifetime = new TestApplicationLifetime();
var manager = CreateConnectionManager(LoggerFactory, appLifetime);
appLifetime.Start();

appLifetime.StopApplication();

var dispatcher = CreateDispatcher(manager, LoggerFactory);
var context = new DefaultHttpContext();
var services = new ServiceCollection();
services.AddSingleton<TestConnectionHandler>();
services.AddOptions();
var ms = new MemoryStream();
context.Request.Path = "/foo";
context.Request.Method = "POST";
context.Response.Body = ms;
context.Request.QueryString = new QueryString("");
await dispatcher.ExecuteNegotiateAsync(context, new HttpConnectionDispatcherOptions());
var negotiateResponse = JsonConvert.DeserializeObject<JObject>(Encoding.UTF8.GetString(ms.ToArray()));

var error = negotiateResponse.Value<string>("error");
Assert.Equal("The connection was closed before negotiation completed.", error);

var connectionId = negotiateResponse.Value<string>("connectionId");
Assert.Null(connectionId);
}
}

[Theory]
[InlineData(HttpTransportType.LongPolling)]
[InlineData(HttpTransportType.ServerSentEvents)]
Expand Down Expand Up @@ -2517,6 +2549,120 @@ public async Task DisableReconnectDisallowsReplacementConnection()
}
}

[Fact]
public async Task StatefulReconnectionConnectionClosesOnApplicationStopping()
{
// ReconnectConnectionHandler can throw OperationCanceledException during Pipe.ReadAsync
using (StartVerifiableLog(wc => wc.EventId.Name == "FailedDispose"))
{
var appLifetime = new TestApplicationLifetime();
var manager = CreateConnectionManager(LoggerFactory, appLifetime);
var options = new HttpConnectionDispatcherOptions() { AllowStatefulReconnects = true };
options.WebSockets.CloseTimeout = TimeSpan.FromMilliseconds(1);
// pretend negotiate occurred
var connection = manager.CreateConnection(options, negotiateVersion: 1, useStatefulReconnect: true);
connection.TransportType = HttpTransportType.WebSockets;

var dispatcher = CreateDispatcher(manager, LoggerFactory);
var services = new ServiceCollection();

var context = MakeRequest("/foo", connection, services);
SetTransport(context, HttpTransportType.WebSockets);

var builder = new ConnectionBuilder(services.BuildServiceProvider());
builder.UseConnectionHandler<ReconnectConnectionHandler>();
var app = builder.Build();

var initialWebSocketTask = dispatcher.ExecuteAsync(context, options, app);

#pragma warning disable CA2252 // This API requires opting into preview features
var reconnectFeature = connection.Features.Get<IStatefulReconnectFeature>();
#pragma warning restore CA2252 // This API requires opting into preview features
Assert.NotNull(reconnectFeature);

var websocketFeature = (TestWebSocketConnectionFeature)context.Features.Get<IHttpWebSocketFeature>();
await websocketFeature.Accepted.DefaultTimeout();

// Stop application should cause the connection to close and new connection attempts to fail
appLifetime.StopApplication();
var webSocketMessage = await websocketFeature.Client.GetNextMessageAsync().DefaultTimeout();

Assert.Equal(WebSocketCloseStatus.NormalClosure, webSocketMessage.CloseStatus);

await initialWebSocketTask.DefaultTimeout();

// New websocket connection with previous connection token
context = MakeRequest("/foo", connection, services);
SetTransport(context, HttpTransportType.WebSockets);

await dispatcher.ExecuteAsync(context, options, app).DefaultTimeout();

// Should complete immediately with 404 as the connection is closed
Assert.Equal(404, context.Response.StatusCode);
}
}

[Fact]
public async Task StatefulReconnectionConnectionThatReconnectedClosesOnApplicationStopping()
{
// ReconnectConnectionHandler can throw OperationCanceledException during Pipe.ReadAsync
using (StartVerifiableLog(wc => wc.EventId.Name == "FailedDispose"))
{
var appLifetime = new TestApplicationLifetime();
var manager = CreateConnectionManager(LoggerFactory, appLifetime);
var options = new HttpConnectionDispatcherOptions() { AllowStatefulReconnects = true };
options.WebSockets.CloseTimeout = TimeSpan.FromMilliseconds(1);
// pretend negotiate occurred
var connection = manager.CreateConnection(options, negotiateVersion: 1, useStatefulReconnect: true);
connection.TransportType = HttpTransportType.WebSockets;

var dispatcher = CreateDispatcher(manager, LoggerFactory);
var services = new ServiceCollection();

var context = MakeRequest("/foo", connection, services);
SetTransport(context, HttpTransportType.WebSockets);

var builder = new ConnectionBuilder(services.BuildServiceProvider());
builder.UseConnectionHandler<ReconnectConnectionHandler>();
var app = builder.Build();

var initialWebSocketTask = dispatcher.ExecuteAsync(context, options, app);

#pragma warning disable CA2252 // This API requires opting into preview features
var reconnectFeature = connection.Features.Get<IStatefulReconnectFeature>();
#pragma warning restore CA2252 // This API requires opting into preview features
Assert.NotNull(reconnectFeature);

var websocketFeature = (TestWebSocketConnectionFeature)context.Features.Get<IHttpWebSocketFeature>();
await websocketFeature.Accepted.DefaultTimeout();

// New websocket connection with previous connection token
context = MakeRequest("/foo", connection, services);
SetTransport(context, HttpTransportType.WebSockets);

var secondWebSocketTask = dispatcher.ExecuteAsync(context, options, app).DefaultTimeout();

await initialWebSocketTask.DefaultTimeout();

// Stop application should cause the connection to close and new connection attempts to fail
appLifetime.StopApplication();
var webSocketMessage = await websocketFeature.Client.GetNextMessageAsync().DefaultTimeout();

Assert.Equal(WebSocketCloseStatus.NormalClosure, webSocketMessage.CloseStatus);

await secondWebSocketTask.DefaultTimeout();

// New websocket connection with previous connection token
context = MakeRequest("/foo", connection, services);
SetTransport(context, HttpTransportType.WebSockets);

await dispatcher.ExecuteAsync(context, options, app).DefaultTimeout();

// Should complete immediately with 404 as the connection is closed
Assert.Equal(404, context.Response.StatusCode);
}
}

private class ControllableMemoryStream : MemoryStream
{
private readonly SyncPoint _syncPoint;
Expand Down Expand Up @@ -3766,18 +3912,24 @@ private static void SetTransport(HttpContext context, HttpTransportType transpor
}
}

private static HttpConnectionManager CreateConnectionManager(ILoggerFactory loggerFactory, IHostApplicationLifetime hostApplicationLifetime)
{
return CreateConnectionManager(loggerFactory, null, null, hostApplicationLifetime);
}

private static HttpConnectionManager CreateConnectionManager(ILoggerFactory loggerFactory, HttpConnectionsMetrics metrics = null)
{
return CreateConnectionManager(loggerFactory, null, metrics);
}

private static HttpConnectionManager CreateConnectionManager(ILoggerFactory loggerFactory, TimeSpan? disconnectTimeout, HttpConnectionsMetrics metrics = null)
private static HttpConnectionManager CreateConnectionManager(ILoggerFactory loggerFactory, TimeSpan? disconnectTimeout,
HttpConnectionsMetrics metrics = null, IHostApplicationLifetime hostApplicationLifetime = null)
{
var connectionOptions = new ConnectionOptions();
connectionOptions.DisconnectTimeout = disconnectTimeout;
return new HttpConnectionManager(
loggerFactory ?? new LoggerFactory(),
new EmptyApplicationLifetime(),
hostApplicationLifetime ?? new EmptyApplicationLifetime(),
Options.Create(connectionOptions),
metrics ?? new HttpConnectionsMetrics(new TestMeterFactory()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,26 @@ public async Task ApplicationLifetimeCanStartBeforeHttpConnectionManagerInitiali
}
}

[Fact]
public async Task ApplicationLifetimeStoppingApplicationStopsNewIncomingConnections()
{
using (StartVerifiableLog())
{
var appLifetime = new TestApplicationLifetime();
var connectionManager = CreateConnectionManager(LoggerFactory, appLifetime);

appLifetime.Start();

appLifetime.StopApplication();

var connection = connectionManager.CreateConnection();

Assert.Equal(HttpConnectionStatus.Disposed, connection.Status);
var result = await connection.Application.Output.FlushAsync();
Assert.True(result.IsCompleted);
}
}

private static HttpConnectionManager CreateConnectionManager(ILoggerFactory loggerFactory, IHostApplicationLifetime lifetime = null, HttpConnectionsMetrics metrics = null)
{
lifetime ??= new EmptyApplicationLifetime();
Expand Down
Loading