Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Sharding.Internal;
using Akka.Cluster.Tools.Singleton;
Expand All @@ -26,19 +27,19 @@ public class RememberEntitiesFailureSpec : AkkaSpec
{
internal class EntityActor : ActorBase
{
private readonly ILoggingAdapter log = Context.GetLogger();
private readonly ILoggingAdapter _log = Context.GetLogger();

public EntityActor()
{
log.Info("Entity actor [{0}] starting up", Context.Self.Path.Name);
_log.Info("Entity actor [{0}] starting up", Context.Self.Path.Name);
}

protected override bool Receive(object message)
{
switch (message)
{
case "stop":
log.Info("Stopping myself!");
_log.Info("Stopping myself!");
Context.Stop(Self);
return true;
case "graceful-stop":
Expand Down Expand Up @@ -122,8 +123,8 @@ public Delay(TimeSpan howLong)
}

// outside store since we need to be able to set them before sharding initializes
private static ImmutableDictionary<string, IFail> failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
private static readonly IFail failCoordinatorGetShards = null;
private static ImmutableDictionary<string, IFail> _failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
private static readonly IFail FailCoordinatorGetShards = null;

private class ShardStoreCreated
{
Expand Down Expand Up @@ -221,7 +222,7 @@ protected override bool Receive(object message)
switch (message)
{
case RememberEntitiesShardStore.GetEntities _:
switch (failShardGetEntities.GetValueOrDefault(shardId))
switch (_failShardGetEntities.GetValueOrDefault(shardId))
{
case null:
Sender.Tell(new RememberEntitiesShardStore.RememberedEntities(ImmutableHashSet<string>.Empty));
Expand Down Expand Up @@ -317,7 +318,7 @@ protected override bool Receive(object message)
switch (message)
{
case RememberEntitiesCoordinatorStore.GetShards _:
switch (failCoordinatorGetShards)
switch (FailCoordinatorGetShards)
{
case null:
Sender.Tell(new RememberEntitiesCoordinatorStore.RememberedShards(ImmutableHashSet<string>.Empty));
Expand Down Expand Up @@ -390,7 +391,7 @@ protected override bool Receive(object message)
akka.cluster.sharding.updating-state-timeout = 1s
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on")
.WithFallback(ClusterSingletonManager.DefaultConfig()
.WithFallback(ClusterSingleton.DefaultConfig()
.WithFallback(ClusterSharding.DefaultConfig()));

public RememberEntitiesFailureSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
Expand Down Expand Up @@ -441,7 +442,7 @@ public void Remember_entities_handling_in_sharding_must_recover_when_initial_rem
private void Remember_entities_handling_in_sharding_must_recover_when_initial_remember_entities_load_fails(IFail wayToFail)
{
Log.Debug("Getting entities for shard 1 will fail");
failShardGetEntities = ImmutableDictionary<string, IFail>.Empty.Add("1", wayToFail);
_failShardGetEntities = ImmutableDictionary<string, IFail>.Empty.Add("1", wayToFail);

try
{
Expand All @@ -456,7 +457,7 @@ private void Remember_entities_handling_in_sharding_must_recover_when_initial_re
probe.ExpectNoMsg(); // message is lost because shard crashes

Log.Debug("Resetting initial fail");
failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
_failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;

// shard should be restarted and eventually succeed
AwaitAssert(() =>
Expand All @@ -469,7 +470,7 @@ private void Remember_entities_handling_in_sharding_must_recover_when_initial_re
}
finally
{
failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
_failShardGetEntities = ImmutableDictionary<string, IFail>.Empty;
}
}

Expand Down Expand Up @@ -694,41 +695,41 @@ private void Remember_entities_handling_in_sharding_must_recover_on_graceful_ent
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_NoResponse()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_NoResponse()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new NoResponse());
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new NoResponse());
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_CrashStore()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_CrashStore()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new CrashStore());
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new CrashStore());
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_StopStore()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_StopStore()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new StopStore());
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new StopStore());
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_500()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_500()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new Delay(TimeSpan.FromMilliseconds(500)));
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new Delay(TimeSpan.FromMilliseconds(500)));
}

[Fact]
public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_1000()
public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_1000()
{
Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new Delay(TimeSpan.FromSeconds(1)));
return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(new Delay(TimeSpan.FromSeconds(1)));
}

private void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(IFail wayToFail)
private async Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails(IFail wayToFail)
{
var storeProbe = CreateTestProbe();
Sys.EventStream.Subscribe(storeProbe.Ref, typeof(CoordinatorStoreCreated));

var sharding = ClusterSharding.Get(Sys).Start(
var sharding = await ClusterSharding.Get(Sys).StartAsync(
$"coordinatorStoreStopGraceful-{wayToFail}",
Props.Create(() => new EntityActor()),
ClusterShardingSettings.Create(Sys).WithRememberEntities(true),
Expand All @@ -739,12 +740,12 @@ private void Remember_entities_handling_in_sharding_must_recover_when_coordinato
var probe = CreateTestProbe();

// coordinator store is triggered by coordinator starting up
var coordinatorStore = storeProbe.ExpectMsg<CoordinatorStoreCreated>().Store;
var coordinatorStore = (await storeProbe.ExpectMsgAsync<CoordinatorStoreCreated>()).Store;
coordinatorStore.Tell(new FakeCoordinatorStoreActor.FailAddShard("1", wayToFail), probe.Ref);
probe.ExpectMsg<Done>();
await probe.ExpectMsgAsync<Done>();

sharding.Tell(new EntityEnvelope(1, "hello-1"), probe.Ref);
probe.ExpectNoMsg(TimeSpan.FromSeconds(1)); // because shard cannot start while store failing
await probe.ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); // because shard cannot start while store failing

if (wayToFail is StopStore or CrashStore)
{
Expand All @@ -754,12 +755,12 @@ private void Remember_entities_handling_in_sharding_must_recover_when_coordinato

// fail it when stopping
coordinatorStore.Tell(new FakeCoordinatorStoreActor.ClearFailShard("1"), storeProbe.Ref);
storeProbe.ExpectMsg<Done>();
await storeProbe.ExpectMsgAsync<Done>();

probe.AwaitAssert(() =>
await probe.AwaitAssertAsync(async () =>
{
sharding.Tell(new EntityEnvelope(1, "hello-2"), probe.Ref);
probe.ExpectMsg("hello-2"); // should now work again
await probe.ExpectMsgAsync("hello-2"); // should now work again
}, TimeSpan.FromSeconds(5));

Sys.Stop(sharding);
Expand Down