Skip to content

Commit 1a22e39

Browse files
Akka.Cluster.Sharding: harden event-sourced RememberEntities infrastructure against transient Akka.Persistence failures (#7401)
* Added reproduction for #7399 * fixed RE coordinator / shard failure close #7399 - wrapped the RememberEntities coordinator and shard-store inside
1 parent 4becbdd commit 1a22e39

File tree

3 files changed

+228
-3
lines changed

3 files changed

+228
-3
lines changed
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="Bugfix7399Specs.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Collections.Immutable;
11+
using System.Linq;
12+
using System.Threading.Tasks;
13+
using Akka.Actor;
14+
using Akka.Configuration;
15+
using Akka.Persistence;
16+
using Akka.Persistence.Journal;
17+
using Akka.Persistence.Snapshot;
18+
using Akka.TestKit;
19+
using FluentAssertions;
20+
using FluentAssertions.Extensions;
21+
using Xunit;
22+
using Xunit.Abstractions;
23+
24+
namespace Akka.Cluster.Sharding.Tests;
25+
26+
/// <summary>
27+
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/7399
28+
/// </summary>
29+
public class Bugfix7399Specs : AkkaSpec
30+
{
31+
public static readonly Config SpecConfig = @$"
32+
akka.loglevel = DEBUG
33+
akka.remote.dot-netty.tcp.port = 0
34+
akka.persistence.journal.plugin = ""akka.persistence.journal.failure""
35+
akka.persistence.journal.failure.class = ""{typeof(FailingJournal).AssemblyQualifiedName}""
36+
37+
akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.failure""
38+
akka.persistence.snapshot-store.failure.class = ""{typeof(FailingSnapshot).AssemblyQualifiedName}""
39+
40+
akka.cluster.sharding {{
41+
journal-plugin-id = akka.persistence.journal.failure
42+
snapshot-plugin-id = akka.persistence.snapshot-store.failure
43+
44+
remember-entities = on
45+
state-store-mode = ddata
46+
remember-entities-store = eventsourced
47+
distributed-data.durable.keys = []
48+
}}
49+
50+
# quick backoffs
51+
akka.cluster.sharding.entity-restart-backoff = 1s
52+
akka.cluster.sharding.shard-failure-backoff = 1s
53+
akka.cluster.sharding.coordinator-failure-backoff = 1s
54+
akka.cluster.sharding.updating-state-timeout = 1s
55+
akka.cluster.sharding.verbose-debug-logging = on
56+
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
57+
58+
akka.actor.provider = cluster";
59+
60+
public Bugfix7399Specs(ITestOutputHelper helper) : base(SpecConfig, helper)
61+
{
62+
}
63+
64+
protected override void AtStartup()
65+
{
66+
// Form a one node cluster
67+
var cluster = Cluster.Get(Sys);
68+
cluster.Join(cluster.SelfAddress);
69+
AwaitAssert(() =>
70+
{
71+
cluster.ReadView.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(1);
72+
});
73+
}
74+
75+
[Fact]
76+
public async Task RememberEntitiesShouldRecoverFromInitialFailure()
77+
{
78+
// set both journal and snapshot store in failing state
79+
FailingJournal.Working = false;
80+
FailingSnapshot.Working = false;
81+
82+
// start the shard: an error message is expected, since it tries to read which entity needs to be remembered
83+
var shard = await ClusterSharding.Get(Sys).StartAsync(
84+
typeName: "shard-test",
85+
entityProps: Props.Create(() => new ShardingActor()),
86+
messageExtractor: new MessageExtractor(100),
87+
settings: ClusterShardingSettings.Create(Sys));
88+
89+
// ping a message to the shard
90+
var probe = CreateTestProbe();
91+
shard.Tell(new ShardingActor.TestMessage("1", "hello"), probe.Ref);
92+
93+
// shouldn't get anything back due to R-E failures
94+
await probe.ExpectNoMsgAsync(500.Milliseconds());
95+
96+
// set both journal and snapshot store in working state
97+
FailingJournal.Working = true;
98+
FailingSnapshot.Working = true;
99+
100+
// ping a message to the shard
101+
await WithinAsync(TimeSpan.FromSeconds(10), async () =>
102+
{
103+
await AwaitAssertAsync(async () =>
104+
{
105+
shard.Tell(new ShardingActor.TestMessage("1", "hello"), probe.Ref);
106+
await probe.ExpectMsgAsync("hello");
107+
});
108+
});
109+
}
110+
111+
class MessageExtractor : HashCodeMessageExtractor
112+
{
113+
public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
114+
{
115+
}
116+
117+
public override string EntityId(object message)
118+
{
119+
return message switch
120+
{
121+
ShardingActor.TestMessage x => x.EntityId,
122+
_ => null
123+
};
124+
}
125+
}
126+
127+
class ShardingActor : ReceiveActor
128+
{
129+
public record TestMessage(string EntityId, string Content);
130+
131+
public ShardingActor()
132+
{
133+
Receive<TestMessage>(x => { Sender.Tell(x.Content); });
134+
}
135+
}
136+
137+
public class FailingJournal : SharedMemoryJournal
138+
{
139+
public static bool Working = false;
140+
141+
public override Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr,
142+
long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback)
143+
{
144+
if (!Working)
145+
{
146+
throw new ApplicationException("Failed");
147+
}
148+
149+
return base.ReplayMessagesAsync(context, persistenceId, fromSequenceNr, toSequenceNr, max,
150+
recoveryCallback);
151+
}
152+
153+
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
154+
{
155+
if (!Working)
156+
{
157+
throw new ApplicationException("Failed");
158+
}
159+
160+
return base.WriteMessagesAsync(messages);
161+
}
162+
}
163+
164+
public class FailingSnapshot : SnapshotStore
165+
{
166+
public static bool Working = false;
167+
168+
protected override Task DeleteAsync(SnapshotMetadata metadata)
169+
{
170+
if (!Working)
171+
{
172+
throw new ApplicationException("Failed");
173+
}
174+
175+
return Task.CompletedTask;
176+
}
177+
178+
protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
179+
{
180+
if (!Working)
181+
{
182+
throw new ApplicationException("Failed");
183+
}
184+
185+
return Task.CompletedTask;
186+
}
187+
188+
protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId,
189+
SnapshotSelectionCriteria criteria)
190+
{
191+
if (!Working)
192+
{
193+
throw new ApplicationException("Failed");
194+
}
195+
196+
return null;
197+
}
198+
199+
protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot)
200+
{
201+
if (!Working)
202+
{
203+
throw new ApplicationException("Failed");
204+
}
205+
206+
return Task.CompletedTask;
207+
}
208+
}
209+
}

src/contrib/cluster/Akka.Cluster.Sharding/Internal/EventSourcedRememberEntitiesProvider.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//-----------------------------------------------------------------------
77

88
using Akka.Actor;
9+
using Akka.Pattern;
910

1011
namespace Akka.Cluster.Sharding.Internal
1112
{
@@ -29,7 +30,15 @@ public EventSourcedRememberEntitiesProvider(string typeName, ClusterShardingSett
2930
/// <returns></returns>
3031
public Props ShardStoreProps(string shardId)
3132
{
32-
return EventSourcedRememberEntitiesShardStore.Props(TypeName, shardId, Settings);
33+
var backoffOptions = Backoff.OnStop(
34+
EventSourcedRememberEntitiesShardStore.Props(TypeName, shardId, Settings),
35+
childName: "shardstore",
36+
minBackoff: Settings.TuningParameters.ShardFailureBackoff,
37+
maxBackoff: Settings.TuningParameters.ShardFailureBackoff,
38+
randomFactor: 0.2,
39+
maxNrOfRetries: -1);
40+
41+
return BackoffSupervisor.Props(backoffOptions);
3342
}
3443

3544
/// <summary>
@@ -39,7 +48,14 @@ public Props ShardStoreProps(string shardId)
3948
/// <returns></returns>
4049
public Props CoordinatorStoreProps()
4150
{
42-
return EventSourcedRememberEntitiesCoordinatorStore.Props(TypeName, Settings);
51+
var backoffOptions = Backoff.OnStop(
52+
EventSourcedRememberEntitiesCoordinatorStore.Props(TypeName, Settings),
53+
childName: "coordinator",
54+
minBackoff: Settings.TuningParameters.CoordinatorFailureBackoff,
55+
maxBackoff: Settings.TuningParameters.CoordinatorFailureBackoff,
56+
randomFactor: 0.2,
57+
maxNrOfRetries: -1);
58+
return BackoffSupervisor.Props(backoffOptions);
4359
}
4460
}
4561
}

src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public RememberEntityStarter(
9393
Timers.StartPeriodicTimer("retry", ResendUnAcked.Instance, settings.TuningParameters.RetryInterval);
9494
}
9595

96-
public ITimerScheduler Timers { get; set; }
96+
public ITimerScheduler Timers { get; set; } = null!;
9797

9898
public ILoggingAdapter Log { get; } = Context.GetLogger();
9999

0 commit comments

Comments
 (0)