Skip to content

Commit

Permalink
Use reset event in Kestrel heartbeat to quickly stop thread (dotnet#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Apr 13, 2023
1 parent 656f091 commit 063f21e
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 38 deletions.
33 changes: 21 additions & 12 deletions src/Servers/Kestrel/Core/src/Internal/Infrastructure/Heartbeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;

internal sealed class Heartbeat : IDisposable
{
// Interval used by Kestrel server.
public static readonly TimeSpan Interval = TimeSpan.FromSeconds(1);

private readonly IHeartbeatHandler[] _callbacks;
Expand All @@ -15,15 +16,17 @@ internal sealed class Heartbeat : IDisposable
private readonly KestrelTrace _trace;
private readonly TimeSpan _interval;
private readonly Thread _timerThread;
private volatile bool _stopped;
private readonly ManualResetEventSlim _stopEvent;

public Heartbeat(IHeartbeatHandler[] callbacks, ISystemClock systemClock, IDebugger debugger, KestrelTrace trace)
public Heartbeat(IHeartbeatHandler[] callbacks, ISystemClock systemClock, IDebugger debugger, KestrelTrace trace, TimeSpan interval)
{
_callbacks = callbacks;
_systemClock = systemClock;
_debugger = debugger;
_trace = trace;
_interval = Interval;
_interval = interval;
// Wait time is long, so don't try to spin to exit early. Spinning would waste CPU time.
_stopEvent = new ManualResetEventSlim(false, spinCount: 0);
_timerThread = new Thread(state => ((Heartbeat)state!).TimerLoop())
{
Name = "Kestrel Timer",
Expand Down Expand Up @@ -62,26 +65,32 @@ internal void OnHeartbeat()
}
catch (Exception ex)
{
if (!_stopped)
{
_trace.LogError(0, ex, $"{nameof(Heartbeat)}.{nameof(OnHeartbeat)}");
}
_trace.LogError(0, ex, $"{nameof(Heartbeat)}.{nameof(OnHeartbeat)}");
}
}

private void TimerLoop()
{
Thread.Sleep(_interval);
while (!_stopped)
// Starting the heartbeat immediately triggers OnHeartbeat.
// Initial delay to avoid running heartbeat again from timer thread.
while (!_stopEvent.Wait(_interval))
{
OnHeartbeat();
Thread.Sleep(_interval);
}
}

public void Dispose()
{
_stopped = true;
// Don't block waiting for the thread to exit
// Stop heart beat and immediately exit wait interval.
_stopEvent.Set();

// Wait for heartbeat thread to finish.
// Should either be immediate or a short delay while heartbeat callbacks complete.
if (_timerThread.IsAlive)
{
_timerThread.Join();
}

_stopEvent.Dispose();
}
}
8 changes: 6 additions & 2 deletions src/Servers/Kestrel/Core/src/Internal/KestrelServerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ private static ServiceContext CreateServiceContext(IOptions<KestrelServerOptions
new IHeartbeatHandler[] { dateHeaderValueManager, heartbeatManager },
new SystemClock(),
DebuggerWrapper.Singleton,
trace);
trace,
Heartbeat.Interval);

return new ServiceContext
{
Expand Down Expand Up @@ -240,6 +241,10 @@ public async Task StopAsync(CancellationToken cancellationToken)
return;
}

// Stop background heartbeat first.
// The heartbeat running as the server is shutting down could produce unexpected behavior.
ServiceContext.Heartbeat?.Dispose();

_stopCts.Cancel();

#pragma warning disable CA2016 // Don't use cancellationToken when acquiring the semaphore. Dispose calls this with a pre-canceled token.
Expand All @@ -257,7 +262,6 @@ public async Task StopAsync(CancellationToken cancellationToken)
}
finally
{
ServiceContext.Heartbeat?.Dispose();
_configChangedRegistration?.Dispose();
_stopCts.Dispose();
_bindSemaphore.Release();
Expand Down
6 changes: 3 additions & 3 deletions src/Servers/Kestrel/Core/test/DateHeaderValueManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void GetDateHeaderValue_ReturnsCachedValueBetweenTimerTicks()

var testKestrelTrace = new KestrelTrace(NullLoggerFactory.Instance);

using (var heartbeat = new Heartbeat(new IHeartbeatHandler[] { dateHeaderValueManager }, systemClock, DebuggerWrapper.Singleton, testKestrelTrace))
using (var heartbeat = new Heartbeat(new IHeartbeatHandler[] { dateHeaderValueManager }, systemClock, DebuggerWrapper.Singleton, testKestrelTrace, Heartbeat.Interval))
{
Assert.Equal(now.ToString(Rfc1123DateFormat), dateHeaderValueManager.GetDateHeaderValues().String);
systemClock.UtcNow = future;
Expand All @@ -74,7 +74,7 @@ public void GetDateHeaderValue_ReturnsUpdatedValueAfterHeartbeat()

var mockHeartbeatHandler = new Mock<IHeartbeatHandler>();

using (var heartbeat = new Heartbeat(new[] { dateHeaderValueManager, mockHeartbeatHandler.Object }, systemClock, DebuggerWrapper.Singleton, testKestrelTrace))
using (var heartbeat = new Heartbeat(new[] { dateHeaderValueManager, mockHeartbeatHandler.Object }, systemClock, DebuggerWrapper.Singleton, testKestrelTrace, Heartbeat.Interval))
{
heartbeat.OnHeartbeat();

Expand Down Expand Up @@ -105,7 +105,7 @@ public void GetDateHeaderValue_ReturnsLastDateValueAfterHeartbeatDisposed()

var testKestrelTrace = new KestrelTrace(NullLoggerFactory.Instance);

using (var heartbeat = new Heartbeat(new IHeartbeatHandler[] { dateHeaderValueManager }, systemClock, DebuggerWrapper.Singleton, testKestrelTrace))
using (var heartbeat = new Heartbeat(new IHeartbeatHandler[] { dateHeaderValueManager }, systemClock, DebuggerWrapper.Singleton, testKestrelTrace, Heartbeat.Interval))
{
heartbeat.OnHeartbeat();
Assert.Equal(now.ToString(Rfc1123DateFormat), dateHeaderValueManager.GetDateHeaderValues().String);
Expand Down
92 changes: 73 additions & 19 deletions src/Servers/Kestrel/Core/test/HeartbeatTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Threading;
Expand All @@ -22,6 +23,75 @@ public void HeartbeatIntervalIsOneSecond()
Assert.Equal(TimeSpan.FromSeconds(1), Heartbeat.Interval);
}

[Fact]
public async void HeartbeatLoopRunsWithSpecifiedInterval()
{
var heartbeatCallCount = 0;
var tcs = new TaskCompletionSource();
var systemClock = new MockSystemClock();
var heartbeatHandler = new Mock<IHeartbeatHandler>();
var debugger = new Mock<IDebugger>();
var kestrelTrace = new KestrelTrace(LoggerFactory);
var now = systemClock.UtcNow;

var splits = new List<TimeSpan>();
Stopwatch sw = null;
heartbeatHandler.Setup(h => h.OnHeartbeat(now)).Callback(() =>
{
heartbeatCallCount++;
if (sw == null)
{
sw = Stopwatch.StartNew();
}
else
{
var split = sw.Elapsed;
splits.Add(split);

Logger.LogInformation($"Heartbeat split: {split.TotalMilliseconds}ms");

sw.Restart();
}

if (heartbeatCallCount == 5)
{
Logger.LogInformation($"Heartbeat run {heartbeatCallCount} times. Notifying test.");
tcs.SetResult();
}
});

var intervalMs = 300;

using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, debugger.Object, kestrelTrace, TimeSpan.FromMilliseconds(intervalMs)))
{
heartbeat.Start();

await tcs.Task.DefaultTimeout();
Logger.LogInformation($"Starting heartbeat dispose.");
}

// Interval timing isn't exact. For example, interval of 300ms results in split of 312.67ms.
// Under load the server might take a long time to resume. Provide tolerance for late resume.
Assert.Collection(splits,
ts => AssertApproxEqual(intervalMs, ts.TotalMilliseconds),
ts => AssertApproxEqual(intervalMs, ts.TotalMilliseconds),
ts => AssertApproxEqual(intervalMs, ts.TotalMilliseconds),
ts => AssertApproxEqual(intervalMs, ts.TotalMilliseconds));

static void AssertApproxEqual(double expectedValue, double value)
{
if (value < expectedValue)
{
Assert.Fail($"{value} is smaller than wait time of {expectedValue}.");
}
// Be tolerant of a much larger value. Heartbeat might not immediately resume if the server is under load.
if (value > expectedValue * 4)
{
Assert.Fail($"{value} is much larger than wait time of {expectedValue}.");
}
}
}

[Fact]
public async Task HeartbeatTakingLongerThanIntervalIsLoggedAsWarning()
{
Expand All @@ -43,7 +113,7 @@ public async Task HeartbeatTakingLongerThanIntervalIsLoggedAsWarning()

Task blockedHeartbeatTask;

using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, debugger.Object, kestrelTrace))
using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, debugger.Object, kestrelTrace, Heartbeat.Interval))
{
blockedHeartbeatTask = Task.Run(() => heartbeat.OnHeartbeat());

Expand Down Expand Up @@ -87,7 +157,7 @@ public async Task HeartbeatTakingLongerThanIntervalIsNotLoggedIfDebuggerAttached

Task blockedHeartbeatTask;

using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, debugger.Object, kestrelTrace))
using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, debugger.Object, kestrelTrace, Heartbeat.Interval))
{
blockedHeartbeatTask = Task.Run(() => heartbeat.OnHeartbeat());

Expand Down Expand Up @@ -116,27 +186,11 @@ public void ExceptionFromHeartbeatHandlerIsLoggedAsError()

heartbeatHandler.Setup(h => h.OnHeartbeat(systemClock.UtcNow)).Throws(ex);

using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, DebuggerWrapper.Singleton, kestrelTrace))
using (var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, DebuggerWrapper.Singleton, kestrelTrace, Heartbeat.Interval))
{
heartbeat.OnHeartbeat();
}

Assert.Equal(ex, TestSink.Writes.Single(message => message.LogLevel == LogLevel.Error).Exception);
}

[Fact]
public void ExceptionFromHeartbeatHandlerIsNotLoggedIfDisposed()
{
var systemClock = new MockSystemClock();
var heartbeatHandler = new Mock<IHeartbeatHandler>();
var debugger = new Mock<IDebugger>();
var kestrelTrace = new KestrelTrace(LoggerFactory);
var ex = new Exception();
heartbeatHandler.Setup(h => h.OnHeartbeat(systemClock.UtcNow)).Throws(ex);
debugger.Setup(d => d.IsAttached).Returns(true);
var heartbeat = new Heartbeat(new[] { heartbeatHandler.Object }, systemClock, debugger.Object, kestrelTrace);
heartbeat.Dispose();
heartbeat.OnHeartbeat();
Assert.Empty(TestSink.Writes);
}
}
3 changes: 2 additions & 1 deletion src/Servers/Kestrel/Core/test/KestrelServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ public void StartingServerInitializesHeartbeat()
new IHeartbeatHandler[] { testContext.DateHeaderValueManager },
testContext.MockSystemClock,
DebuggerWrapper.Singleton,
testContext.Log);
testContext.Log,
Heartbeat.Interval);

using (var server = new KestrelServerImpl(new[] { new MockTransportFactory() }, Array.Empty<IMultiplexedConnectionListenerFactory>(), testContext))
{
Expand Down
3 changes: 2 additions & 1 deletion src/Servers/Kestrel/shared/test/TestServiceContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public void InitializeHeartbeat()
new IHeartbeatHandler[] { DateHeaderValueManager, heartbeatManager },
new SystemClock(),
DebuggerWrapper.Singleton,
Log);
Log,
Heartbeat.Interval);

MockSystemClock = null;
SystemClock = heartbeatManager;
Expand Down

0 comments on commit 063f21e

Please sign in to comment.