Skip to content

Commit 7316ffb

Browse files
Akka.Persistence HealthChecks (#7842) (#7845)
* Akka.Persistence: add health check support to `AsyncWriteJournal` #7840 * added messaging protocol to support plugin health check * Added tests for basic Akka.Persistence health checks this is mostly a sanity test. I don't want to get sucked into testing the `CircuitBreaker` necessarily either * added structured output to health check results * fix compilation errors * added failure specs * implemented `SnapshotStore` health checks * renamed test class * SnapshotStoreHealthCheckSpecs * API approvals
1 parent ca06b39 commit 7316ffb

11 files changed

+609
-31
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.DotNet.verified.txt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ namespace Akka.Persistence
113113
public override int GetHashCode() { }
114114
public override string ToString() { }
115115
}
116+
public sealed class CheckJournalHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalRequest, Akka.Persistence.IPersistenceMessage
117+
{
118+
public CheckJournalHealth(System.Threading.CancellationToken cancellationToken) { }
119+
public System.Threading.CancellationToken CancellationToken { get; }
120+
public override string ToString() { }
121+
}
122+
public sealed class CheckSnapshotStoreHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest
123+
{
124+
public CheckSnapshotStoreHealth(System.Threading.CancellationToken cancellationToken) { }
125+
public System.Threading.CancellationToken CancellationToken { get; }
126+
public override string ToString() { }
127+
}
116128
public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable<Akka.Persistence.DeleteMessagesFailure>
117129
{
118130
public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { }
@@ -319,6 +331,12 @@ namespace Akka.Persistence
319331
{
320332
Akka.Persistence.IStashOverflowStrategy Create(Akka.Configuration.Config config);
321333
}
334+
public sealed class JournalHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalResponse, Akka.Persistence.IPersistenceMessage
335+
{
336+
public JournalHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
337+
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
338+
public override string ToString() { }
339+
}
322340
public sealed class LoadSnapshot : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest, System.IEquatable<Akka.Persistence.LoadSnapshot>
323341
{
324342
public LoadSnapshot(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, long toSequenceNr) { }
@@ -378,12 +396,37 @@ namespace Akka.Persistence
378396
public Akka.Persistence.IStashOverflowStrategy DefaultInternalStashOverflowStrategy { get; }
379397
public Akka.Persistence.PersistenceSettings Settings { get; }
380398
public Akka.Persistence.Journal.EventAdapters AdaptersFor(string journalPluginId) { }
399+
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId, System.Threading.CancellationToken cancellationToken = null) { }
400+
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, System.Threading.CancellationToken cancellationToken = null) { }
381401
[Akka.Annotations.InternalStableApiAttribute()]
382402
public Akka.Actor.IActorRef JournalFor(string journalPluginId) { }
383403
public string PersistenceId(Akka.Actor.IActorRef actor) { }
384404
[Akka.Annotations.InternalStableApiAttribute()]
385405
public Akka.Actor.IActorRef SnapshotStoreFor(string snapshotPluginId) { }
386406
}
407+
[System.Runtime.CompilerServices.IsReadOnlyAttribute()]
408+
[System.Runtime.CompilerServices.NullableAttribute(0)]
409+
public struct PersistenceHealthCheckResult : System.IEquatable<Akka.Persistence.PersistenceHealthCheckResult>
410+
{
411+
public PersistenceHealthCheckResult(Akka.Persistence.PersistenceHealthStatus Status, string Description = null, System.Exception Exception = null, [System.Runtime.CompilerServices.NullableAttribute(new byte[] {
412+
2,
413+
0,
414+
0})] System.Collections.Generic.IReadOnlyDictionary<string, object> Data = null) { }
415+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
416+
2,
417+
0,
418+
0})]
419+
public System.Collections.Generic.IReadOnlyDictionary<string, object> Data { get; set; }
420+
public string Description { get; set; }
421+
public System.Exception Exception { get; set; }
422+
public Akka.Persistence.PersistenceHealthStatus Status { get; set; }
423+
}
424+
public enum PersistenceHealthStatus
425+
{
426+
Healthy = 0,
427+
Degraded = 1,
428+
Unhealthy = 2,
429+
}
387430
public sealed class PersistenceSettings : Akka.Actor.Settings
388431
{
389432
public PersistenceSettings(Akka.Actor.ActorSystem system, Akka.Configuration.Config config) { }
@@ -645,6 +688,12 @@ namespace Akka.Persistence
645688
public override int GetHashCode() { }
646689
public override string ToString() { }
647690
}
691+
public sealed class SnapshotStoreHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotResponse
692+
{
693+
public SnapshotStoreHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
694+
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
695+
public override string ToString() { }
696+
}
648697
public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation
649698
{
650699
public StashingHandlerInvocation(object evt, System.Action<object> handler) { }
@@ -877,6 +926,7 @@ namespace Akka.Persistence.Journal
877926
{
878927
protected readonly bool CanPublish;
879928
protected AsyncWriteJournal() { }
929+
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
880930
protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken);
881931
public abstract System.Threading.Tasks.Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken);
882932
protected virtual bool Receive(object message) { }
@@ -1235,6 +1285,7 @@ namespace Akka.Persistence.Snapshot
12351285
public abstract class SnapshotStore : Akka.Actor.ActorBase
12361286
{
12371287
protected SnapshotStore() { }
1288+
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
12381289
protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken);
12391290
protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
12401291
protected abstract System.Threading.Tasks.Task<Akka.Persistence.SelectedSnapshot> LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);

src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Net.verified.txt

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ namespace Akka.Persistence
113113
public override int GetHashCode() { }
114114
public override string ToString() { }
115115
}
116+
public sealed class CheckJournalHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalRequest, Akka.Persistence.IPersistenceMessage
117+
{
118+
public CheckJournalHealth(System.Threading.CancellationToken cancellationToken) { }
119+
public System.Threading.CancellationToken CancellationToken { get; }
120+
public override string ToString() { }
121+
}
122+
public sealed class CheckSnapshotStoreHealth : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest
123+
{
124+
public CheckSnapshotStoreHealth(System.Threading.CancellationToken cancellationToken) { }
125+
public System.Threading.CancellationToken CancellationToken { get; }
126+
public override string ToString() { }
127+
}
116128
public sealed class DeleteMessagesFailure : Akka.Actor.INoSerializationVerificationNeeded, System.IEquatable<Akka.Persistence.DeleteMessagesFailure>
117129
{
118130
public DeleteMessagesFailure(System.Exception cause, long toSequenceNr) { }
@@ -319,6 +331,12 @@ namespace Akka.Persistence
319331
{
320332
Akka.Persistence.IStashOverflowStrategy Create(Akka.Configuration.Config config);
321333
}
334+
public sealed class JournalHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IJournalMessage, Akka.Persistence.IJournalResponse, Akka.Persistence.IPersistenceMessage
335+
{
336+
public JournalHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
337+
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
338+
public override string ToString() { }
339+
}
322340
public sealed class LoadSnapshot : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotRequest, System.IEquatable<Akka.Persistence.LoadSnapshot>
323341
{
324342
public LoadSnapshot(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, long toSequenceNr) { }
@@ -378,12 +396,36 @@ namespace Akka.Persistence
378396
public Akka.Persistence.IStashOverflowStrategy DefaultInternalStashOverflowStrategy { get; }
379397
public Akka.Persistence.PersistenceSettings Settings { get; }
380398
public Akka.Persistence.Journal.EventAdapters AdaptersFor(string journalPluginId) { }
399+
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckJournalHealthAsync(string journalPluginId, System.Threading.CancellationToken cancellationToken = null) { }
400+
public System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckSnapshotStoreHealthAsync(string snapshotStorePluginId, System.Threading.CancellationToken cancellationToken = null) { }
381401
[Akka.Annotations.InternalStableApiAttribute()]
382402
public Akka.Actor.IActorRef JournalFor(string journalPluginId) { }
383403
public string PersistenceId(Akka.Actor.IActorRef actor) { }
384404
[Akka.Annotations.InternalStableApiAttribute()]
385405
public Akka.Actor.IActorRef SnapshotStoreFor(string snapshotPluginId) { }
386406
}
407+
[System.Runtime.CompilerServices.NullableAttribute(0)]
408+
public struct PersistenceHealthCheckResult : System.IEquatable<Akka.Persistence.PersistenceHealthCheckResult>
409+
{
410+
public PersistenceHealthCheckResult(Akka.Persistence.PersistenceHealthStatus Status, string Description = null, System.Exception Exception = null, [System.Runtime.CompilerServices.NullableAttribute(new byte[] {
411+
2,
412+
0,
413+
0})] System.Collections.Generic.IReadOnlyDictionary<string, object> Data = null) { }
414+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
415+
2,
416+
0,
417+
0})]
418+
public System.Collections.Generic.IReadOnlyDictionary<string, object> Data { get; set; }
419+
public string Description { get; set; }
420+
public System.Exception Exception { get; set; }
421+
public Akka.Persistence.PersistenceHealthStatus Status { get; set; }
422+
}
423+
public enum PersistenceHealthStatus
424+
{
425+
Healthy = 0,
426+
Degraded = 1,
427+
Unhealthy = 2,
428+
}
387429
public sealed class PersistenceSettings : Akka.Actor.Settings
388430
{
389431
public PersistenceSettings(Akka.Actor.ActorSystem system, Akka.Configuration.Config config) { }
@@ -645,6 +687,12 @@ namespace Akka.Persistence
645687
public override int GetHashCode() { }
646688
public override string ToString() { }
647689
}
690+
public sealed class SnapshotStoreHealthCheckResponse : Akka.Actor.INoSerializationVerificationNeeded, Akka.Persistence.IPersistenceMessage, Akka.Persistence.ISnapshotMessage, Akka.Persistence.ISnapshotResponse
691+
{
692+
public SnapshotStoreHealthCheckResponse(Akka.Persistence.PersistenceHealthCheckResult result) { }
693+
public Akka.Persistence.PersistenceHealthCheckResult Result { get; }
694+
public override string ToString() { }
695+
}
648696
public sealed class StashingHandlerInvocation : Akka.Persistence.IPendingHandlerInvocation
649697
{
650698
public StashingHandlerInvocation(object evt, System.Action<object> handler) { }
@@ -877,6 +925,7 @@ namespace Akka.Persistence.Journal
877925
{
878926
protected readonly bool CanPublish;
879927
protected AsyncWriteJournal() { }
928+
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
880929
protected abstract System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken);
881930
public abstract System.Threading.Tasks.Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, System.Threading.CancellationToken cancellationToken);
882931
protected virtual bool Receive(object message) { }
@@ -1233,6 +1282,7 @@ namespace Akka.Persistence.Snapshot
12331282
public abstract class SnapshotStore : Akka.Actor.ActorBase
12341283
{
12351284
protected SnapshotStore() { }
1285+
public virtual System.Threading.Tasks.Task<Akka.Persistence.PersistenceHealthCheckResult> CheckHealthAsync(System.Threading.CancellationToken cancellationToken = null) { }
12361286
protected abstract System.Threading.Tasks.Task DeleteAsync(Akka.Persistence.SnapshotMetadata metadata, System.Threading.CancellationToken cancellationToken);
12371287
protected abstract System.Threading.Tasks.Task DeleteAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
12381288
protected abstract System.Threading.Tasks.Task<Akka.Persistence.SelectedSnapshot> LoadAsync(string persistenceId, Akka.Persistence.SnapshotSelectionCriteria criteria, System.Threading.CancellationToken cancellationToken);
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="PersistenceHealthCheckSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Collections.Immutable;
11+
using System.Threading;
12+
using System.Threading.Tasks;
13+
using Akka.Configuration;
14+
using Akka.Persistence.Journal;
15+
using Akka.TestKit;
16+
using Akka.TestKit.Configs;
17+
using Xunit;
18+
using Xunit.Abstractions;
19+
20+
namespace Akka.Persistence.Tests;
21+
22+
public class JournalHealthCheckSpec : PersistenceSpec
23+
{
24+
private static Config HealthCheckConfig()
25+
{
26+
const string extraConfig = """
27+
28+
akka.persistence.journal.failing-open {
29+
class = "Akka.Persistence.Tests.FailingJournal, Akka.Persistence.Tests"
30+
circuit-breaker {
31+
max-failures = 1
32+
call-timeout = 1s
33+
reset-timeout = 10s
34+
}
35+
}
36+
akka.persistence.journal.failing-half-open {
37+
class = "Akka.Persistence.Tests.FailingJournal, Akka.Persistence.Tests"
38+
circuit-breaker {
39+
max-failures = 1
40+
call-timeout = 1s
41+
reset-timeout = 1s
42+
}
43+
}
44+
# Disable message serialization for circuit breaker tests to avoid serialization issues
45+
akka.actor.serialize-messages = off
46+
47+
""";
48+
return TestConfigs.TestSchedulerConfig
49+
.WithFallback(Configuration("PersistenceHealthCheckSpec", extraConfig: extraConfig));
50+
}
51+
52+
public JournalHealthCheckSpec(ITestOutputHelper output) : base(HealthCheckConfig(), output)
53+
{
54+
}
55+
56+
[Theory]
57+
[InlineData(null)] // default plugin
58+
[InlineData("akka.persistence.journal.inmem")]
59+
public async Task JournalHealthCheck_should_default_to_Healthy(string? pluginId)
60+
{
61+
using var cts = new CancellationTokenSource(RemainingOrDefault);
62+
var pluginHealth = await Extension.CheckJournalHealthAsync(pluginId, cts.Token);
63+
64+
Assert.Equal(PersistenceHealthStatus.Healthy, pluginHealth.Status);
65+
Assert.NotNull(pluginHealth.Description);
66+
}
67+
68+
[Fact]
69+
public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_is_Open()
70+
{
71+
// Get the journal actor reference
72+
var journal = Extension.JournalFor("akka.persistence.journal.failing-open");
73+
74+
// Trigger a failure to open the circuit breaker
75+
var writeMsg = new WriteMessages(new[] { new AtomicWrite(new Persistent("test", 1, "test-pid")) }.ToImmutableList(),
76+
TestActor, 1);
77+
journal.Tell(writeMsg, TestActor);
78+
79+
// Advance time to let the write fail and circuit breaker open
80+
var testScheduler = (TestScheduler)Sys.Scheduler;
81+
testScheduler.Advance(TimeSpan.FromSeconds(2));
82+
83+
using var cts = new CancellationTokenSource(RemainingOrDefault);
84+
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-open", cts.Token);
85+
86+
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
87+
Assert.Contains("Circuit breaker is open", pluginHealth.Description);
88+
}
89+
90+
[Fact]
91+
public async Task JournalHealthCheck_should_return_Degraded_when_CircuitBreaker_is_HalfOpen()
92+
{
93+
// Get the journal actor reference
94+
var journal = Extension.JournalFor("akka.persistence.journal.failing-half-open");
95+
96+
// Trigger a failure to open the circuit breaker
97+
var writeMsg = new WriteMessages(new[] { new AtomicWrite(new Persistent("test", 1, "test-pid")) }.ToImmutableList(),
98+
TestActor, 1);
99+
journal.Tell(writeMsg, TestActor);
100+
101+
var testScheduler = (TestScheduler)Sys.Scheduler;
102+
103+
// Advance time past call-timeout to let the write fail and circuit breaker open
104+
testScheduler.Advance(TimeSpan.FromSeconds(1));
105+
106+
// Give the async operations time to complete
107+
await Task.Delay(100);
108+
109+
// Advance time past reset-timeout to transition to half-open
110+
testScheduler.Advance(TimeSpan.FromSeconds(1));
111+
112+
// Give the transition time to complete
113+
await Task.Delay(100);
114+
115+
using var cts = new CancellationTokenSource(RemainingOrDefault);
116+
var pluginHealth = await Extension.CheckJournalHealthAsync("akka.persistence.journal.failing-half-open", cts.Token);
117+
118+
Assert.Equal(PersistenceHealthStatus.Degraded, pluginHealth.Status);
119+
Assert.Contains("Circuit breaker is half-open", pluginHealth.Description);
120+
}
121+
}
122+
123+
/// <summary>
124+
/// Test journal that always fails writes to trigger circuit breaker
125+
/// </summary>
126+
public class FailingJournal : MemoryJournal
127+
{
128+
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
129+
{
130+
throw new InvalidOperationException("Simulated journal write failure");
131+
}
132+
}

0 commit comments

Comments
 (0)