-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Akka.Persistence HealthChecks #7842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Akka.Persistence HealthChecks #7842
Conversation
this is mostly a sanity test. I don't want to get sucked into testing the `CircuitBreaker` necessarily either
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Detailed my changes, so far. Looking for feedback before I move on with implementing the SnapshotStore
checks.
/// <summary> | ||
/// Set to <c>true</c> when a fatal error has occurred, i.e., the Akka.Persistence configuration is illegal | ||
/// </summary> | ||
private bool _hasFatalError; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have fatal errors, such as configuration errors, in the CTOR we should try to capture those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worth it to have an enum to distinguish the types of known errors?
Thinking with a devops hat, a fatal error due to config vs a fatal error due to a condition that forces a journal shutdown/restart (Is that a thing or am I misremembering? 😇) would be useful to track... but maybe I'm forgetting things here (probably?).
As an additional thought... IDK if it's a requirement for all plugins buuuut a simple check like getting the max ordering ID would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could just capture the Exception
itself in a nullable field - that's probably easiest.
There's only a handful of "fatal" exceptions - we treat the rest of the exceptions as "eventually recoverable"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an additional thought... IDK if it's a requirement for all plugins buuuut a simple check like getting the max ordering ID would be nice.
that might be a stretch just given some of the limitations across plugins
case DeleteMessagesTo deleteMessagesTo: | ||
HandleDeleteMessagesTo(deleteMessagesTo); | ||
return true; | ||
case CheckHealth checkHealth: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Messaging handler for the health check inside the journal.
{ | ||
var i = 0; | ||
var enumerator = results?.GetEnumerator(); | ||
using var enumerator = results?.GetEnumerator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed a memory leak here.
{ | ||
public CheckHealth(CancellationToken cancellationToken) | ||
{ | ||
CancellationToken = cancellationToken; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Health check messaging protocol - these messages are all tagged with INoSerializationVerificationNeeded
so it's safe to pass the CancellationToken
around via them.
/// <param name="journalPluginId">The HOCON id of the Akka.Persistence plugin./</param> | ||
/// <param name="cancellationToken">An optional cancellation token.</param> | ||
/// <returns>A <see cref="PersistenceHealthCheckResult"/> with health status and possibly a descriptive message.</returns> | ||
public async Task<PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convenience method for invoking the health check - if the journalPluginId
is not found this method will err.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some thoughts. Might poke at it more but these were the hard hitters.
{ | ||
if(_breaker.IsHalfOpen) | ||
return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded, | ||
$"Circuit breaker is half-open, some operations may be failing intermittently with error: {_breaker.LastCaughtException?.Message ?? "N/A"}")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two thoughts looking at this;
First, IMO It would be nice if we had some way to track 'last success' (And maybe even have auto-polling as a config option to track)
Second is a question, could we have a more 'structured' output instead of just a formatted string? It at least makes it easier to handle such events from a parsing standpoint (esp if we are able to add time or possibly other things, need to keep looking at this to know what's up)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second is a question, could we have a more 'structured' output instead of just a formatted string? It at least makes it easier to handle such events from a parsing standpoint (esp if we are able to add time or possibly other things, need to keep looking at this to know what's up)
As long as it can fit into a HealthCheckResult
:
public HealthCheckResult(HealthStatus status, string? description = null, Exception? exception = null, IReadOnlyDictionary<string, object>? data = null)
{
Status = status;
Description = description;
Exception = exception;
Data = data ?? _emptyReadOnlyDictionary;
}
Then that should be fine.
} | ||
|
||
private void ProcessResults(IImmutableList<Exception> results, int atomicWriteCount, WriteMessages writeMessage, IActorRef resequencer, | ||
private static void ProcessResults(IImmutableList<Exception> results, int atomicWriteCount, WriteMessages writeMessage, IActorRef resequencer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Soooo this is safe and I get why it may have happened, I would however suggest double-checking this against Persistence SQL (or another plugin of choice that uses AsyncJournal with some extra logic behind it) just in case the jumps bt methodtables cause an issue.
(To be clear I'm probably overthinking this BUUUUUT It would still be good to know the difference just in case 😇🤷♂️)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was always private so it can't have side-effects in other plugins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wow I'm dumb I meant to say the benchmarks.
Main side effect I'm thinking of is code locality.
But, again, probably overthinking it.
/// <summary> | ||
/// Set to <c>true</c> when a fatal error has occurred, i.e., the Akka.Persistence configuration is illegal | ||
/// </summary> | ||
private bool _hasFatalError; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worth it to have an enum to distinguish the types of known errors?
Thinking with a devops hat, a fatal error due to config vs a fatal error due to a condition that forces a journal shutdown/restart (Is that a thing or am I misremembering? 😇) would be useful to track... but maybe I'm forgetting things here (probably?).
As an additional thought... IDK if it's a requirement for all plugins buuuut a simple check like getting the max ordering ID would be nice.
if(_breaker.IsOpen) | ||
return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded, | ||
$"Circuit breaker is open, some operations may be failing intermittently with error: with error: {_breaker.LastCaughtException?.Message ?? "N/A"}")); | ||
return Task.FromResult(_hasFatalError ? new PersistenceHealthCheckResult(PersistenceHealthStatus.Unhealthy, "Fatal error has occurred. The ActorSystem must be restarted.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Formatting here makes this a PITA to read, probably best to split the ternary into more lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's how I originally had it (the way you suggested) until Rider nitpicked me. Never doubt your vibe I guess 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Detailed my changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are all of the new public message and enum types we added to support health checks.
akka.actor.serialize-messages = off | ||
"""; | ||
return TestConfigs.TestSchedulerConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the TestScheduler
here to drive the CircuitBreaker
resets.
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token); | ||
|
||
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status); | ||
Assert.Contains("Circuit breaker is open", pluginHealth.Description); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not necessary to be this specific, but wanted to illustrate that leveraging the CircuitBreaker
is a good default health heuristic.
testScheduler.Advance(TimeSpan.FromSeconds(1)); | ||
|
||
// Give the transition time to complete | ||
await Task.Delay(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though we can use the TestScheduler
to advance time, the circuit breaker still has to asynchronously perform its update.
{ | ||
if(_breaker.IsHalfOpen) | ||
return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded, | ||
$"Circuit breaker is half-open, some operations may be failing intermittently", _breaker.LastCaughtException, _defaultHealthCheckTags)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We record the PersistenceHealthStatus
, a description, the last exception caught by the CircuitBreaker
, and our default tags - which just include the plugin id.
return true; | ||
case CheckJournalHealth checkHealth: | ||
var sender = Sender; | ||
CheckHealthAsync(checkHealth.CancellationToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actual handling of the health check invocation
|
||
async Task ExecuteHighestSequenceNr() | ||
{ | ||
void CompleteHighSeqNo(long highSeqNo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this below the return
/// <param name="snapshotStorePluginId">The HOCON id of the Akka.Persistence plugin.</param> | ||
/// <param name="cancellationToken">An optional cancellation token.</param> | ||
/// <returns>A <see cref="PersistenceHealthCheckResult"/> with health status and possibly a descriptive message.</returns> | ||
public async Task<PersistenceHealthCheckResult> CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convenience method for checking the snapshot store plugin health.
All of the failing unit tests are racy - not relation to these changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I'll need to port this to |
* Akka.Persistence: add health check support to `AsyncWriteJournal` akkadotnet#7840 * added messaging protocol to support plugin health check * Added tests for basic Akka.Persistence health checks this is mostly a sanity test. I don't want to get sucked into testing the `CircuitBreaker` necessarily either * added structured output to health check results * fix compilation errors * added failure specs * implemented `SnapshotStore` health checks * renamed test class * SnapshotStoreHealthCheckSpecs * API approvals
* Akka.Persistence: add health check support to `AsyncWriteJournal` #7840 * added messaging protocol to support plugin health check * Added tests for basic Akka.Persistence health checks this is mostly a sanity test. I don't want to get sucked into testing the `CircuitBreaker` necessarily either * added structured output to health check results * fix compilation errors * added failure specs * implemented `SnapshotStore` health checks * renamed test class * SnapshotStoreHealthCheckSpecs * API approvals
Changes
Implements #7840
Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):
Latest
dev
BenchmarksInclude data from the relevant benchmark prior to this change here.
This PR's Benchmarks
Include data from after this change here.