Skip to content

Conversation

Aaronontheweb
Copy link
Member

@Aaronontheweb Aaronontheweb commented Sep 24, 2025

Changes

Implements #7840

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Latest dev Benchmarks

Include data from the relevant benchmark prior to this change here.

This PR's Benchmarks

Include data from after this change here.

Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes, so far. Looking for feedback before I move on with implementing the SnapshotStore checks.

/// <summary>
/// Set to <c>true</c> when a fatal error has occurred, i.e., the Akka.Persistence configuration is illegal
/// </summary>
private bool _hasFatalError;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have fatal errors, such as configuration errors, in the CTOR we should try to capture those.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth it to have an enum to distinguish the types of known errors?

Thinking with a devops hat, a fatal error due to config vs a fatal error due to a condition that forces a journal shutdown/restart (Is that a thing or am I misremembering? 😇) would be useful to track... but maybe I'm forgetting things here (probably?).

As an additional thought... IDK if it's a requirement for all plugins buuuut a simple check like getting the max ordering ID would be nice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could just capture the Exception itself in a nullable field - that's probably easiest.

There's only a handful of "fatal" exceptions - we treat the rest of the exceptions as "eventually recoverable"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an additional thought... IDK if it's a requirement for all plugins buuuut a simple check like getting the max ordering ID would be nice.

that might be a stretch just given some of the limitations across plugins

case DeleteMessagesTo deleteMessagesTo:
HandleDeleteMessagesTo(deleteMessagesTo);
return true;
case CheckHealth checkHealth:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messaging handler for the health check inside the journal.

{
var i = 0;
var enumerator = results?.GetEnumerator();
using var enumerator = results?.GetEnumerator();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a memory leak here.

{
public CheckHealth(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Health check messaging protocol - these messages are all tagged with INoSerializationVerificationNeeded so it's safe to pass the CancellationToken around via them.

/// <param name="journalPluginId">The HOCON id of the Akka.Persistence plugin./</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>A <see cref="PersistenceHealthCheckResult"/> with health status and possibly a descriptive message.</returns>
public async Task<PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenience method for invoking the health check - if the journalPluginId is not found this method will err.

Copy link
Member

@to11mtm to11mtm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some thoughts. Might poke at it more but these were the hard hitters.

{
if(_breaker.IsHalfOpen)
return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded,
$"Circuit breaker is half-open, some operations may be failing intermittently with error: {_breaker.LastCaughtException?.Message ?? "N/A"}"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two thoughts looking at this;

First, IMO It would be nice if we had some way to track 'last success' (And maybe even have auto-polling as a config option to track)

Second is a question, could we have a more 'structured' output instead of just a formatted string? It at least makes it easier to handle such events from a parsing standpoint (esp if we are able to add time or possibly other things, need to keep looking at this to know what's up)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second is a question, could we have a more 'structured' output instead of just a formatted string? It at least makes it easier to handle such events from a parsing standpoint (esp if we are able to add time or possibly other things, need to keep looking at this to know what's up)

As long as it can fit into a HealthCheckResult:

public HealthCheckResult(HealthStatus status, string? description = null, Exception? exception = null, IReadOnlyDictionary<string, object>? data = null)
    {
        Status = status;
        Description = description;
        Exception = exception;
        Data = data ?? _emptyReadOnlyDictionary;
    }

Then that should be fine.

}

private void ProcessResults(IImmutableList<Exception> results, int atomicWriteCount, WriteMessages writeMessage, IActorRef resequencer,
private static void ProcessResults(IImmutableList<Exception> results, int atomicWriteCount, WriteMessages writeMessage, IActorRef resequencer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soooo this is safe and I get why it may have happened, I would however suggest double-checking this against Persistence SQL (or another plugin of choice that uses AsyncJournal with some extra logic behind it) just in case the jumps bt methodtables cause an issue.

(To be clear I'm probably overthinking this BUUUUUT It would still be good to know the difference just in case 😇🤷‍♂️)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was always private so it can't have side-effects in other plugins.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow I'm dumb I meant to say the benchmarks.

Main side effect I'm thinking of is code locality.

But, again, probably overthinking it.

/// <summary>
/// Set to <c>true</c> when a fatal error has occurred, i.e., the Akka.Persistence configuration is illegal
/// </summary>
private bool _hasFatalError;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth it to have an enum to distinguish the types of known errors?

Thinking with a devops hat, a fatal error due to config vs a fatal error due to a condition that forces a journal shutdown/restart (Is that a thing or am I misremembering? 😇) would be useful to track... but maybe I'm forgetting things here (probably?).

As an additional thought... IDK if it's a requirement for all plugins buuuut a simple check like getting the max ordering ID would be nice.

if(_breaker.IsOpen)
return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded,
$"Circuit breaker is open, some operations may be failing intermittently with error: with error: {_breaker.LastCaughtException?.Message ?? "N/A"}"));
return Task.FromResult(_hasFatalError ? new PersistenceHealthCheckResult(PersistenceHealthStatus.Unhealthy, "Fatal error has occurred. The ActorSystem must be restarted.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Formatting here makes this a PITA to read, probably best to split the ternary into more lines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how I originally had it (the way you suggested) until Rider nitpicked me. Never doubt your vibe I guess 🤷

@Aaronontheweb Aaronontheweb changed the title [WIP] Akka.Persistence HealthChecks Akka.Persistence HealthChecks Sep 30, 2025
@Aaronontheweb Aaronontheweb marked this pull request as ready for review September 30, 2025 17:58
Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all of the new public message and enum types we added to support health checks.

akka.actor.serialize-messages = off
""";
return TestConfigs.TestSchedulerConfig
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the TestScheduler here to drive the CircuitBreaker resets.

var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token);

Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
Assert.Contains("Circuit breaker is open", pluginHealth.Description);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not necessary to be this specific, but wanted to illustrate that leveraging the CircuitBreaker is a good default health heuristic.

testScheduler.Advance(TimeSpan.FromSeconds(1));

// Give the transition time to complete
await Task.Delay(100);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we can use the TestScheduler to advance time, the circuit breaker still has to asynchronously perform its update.

{
if(_breaker.IsHalfOpen)
return Task.FromResult(new PersistenceHealthCheckResult(PersistenceHealthStatus.Degraded,
$"Circuit breaker is half-open, some operations may be failing intermittently", _breaker.LastCaughtException, _defaultHealthCheckTags));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We record the PersistenceHealthStatus, a description, the last exception caught by the CircuitBreaker, and our default tags - which just include the plugin id.

return true;
case CheckJournalHealth checkHealth:
var sender = Sender;
CheckHealthAsync(checkHealth.CancellationToken)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual handling of the health check invocation


async Task ExecuteHighestSequenceNr()
{
void CompleteHighSeqNo(long highSeqNo)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this below the return

/// <param name="snapshotStorePluginId">The HOCON id of the Akka.Persistence plugin.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>A <see cref="PersistenceHealthCheckResult"/> with health status and possibly a descriptive message.</returns>
public async Task<PersistenceHealthCheckResult> CheckSnapshotStoreHealthAsync(string snapshotStorePluginId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenience method for checking the snapshot store plugin health.

@Aaronontheweb
Copy link
Member Author

All of the failing unit tests are racy - not relation to these changes.

Copy link
Contributor

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb merged commit ba5e5f8 into akkadotnet:v1.5 Sep 30, 2025
6 of 11 checks passed
@Aaronontheweb
Copy link
Member Author

I'll need to port this to dev

@Aaronontheweb Aaronontheweb deleted the akka-persistence-healthchecks branch September 30, 2025 23:42
@Arkatufus Arkatufus added this to the 1.5.51 milestone Oct 1, 2025
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this pull request Oct 1, 2025
* Akka.Persistence: add health check support to `AsyncWriteJournal`

akkadotnet#7840

* added messaging protocol to support plugin health check

* Added tests for basic Akka.Persistence health checks

this is mostly a sanity test. I don't want to get sucked into testing the `CircuitBreaker` necessarily either

* added structured output to health check results

* fix compilation errors

* added failure specs

* implemented `SnapshotStore` health checks

* renamed test class

* SnapshotStoreHealthCheckSpecs

* API approvals
Aaronontheweb added a commit that referenced this pull request Oct 1, 2025
* Akka.Persistence: add health check support to `AsyncWriteJournal`

#7840

* added messaging protocol to support plugin health check

* Added tests for basic Akka.Persistence health checks

this is mostly a sanity test. I don't want to get sucked into testing the `CircuitBreaker` necessarily either

* added structured output to health check results

* fix compilation errors

* added failure specs

* implemented `SnapshotStore` health checks

* renamed test class

* SnapshotStoreHealthCheckSpecs

* API approvals
This was referenced Oct 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants