9
9
using System . Collections . Immutable ;
10
10
using System . Linq ;
11
11
using System . Threading ;
12
+ using System . Threading . Tasks ;
12
13
using Akka . Actor ;
13
14
using Akka . Cluster . Sharding . Internal ;
14
15
using Akka . Cluster . Tools . Singleton ;
@@ -26,19 +27,19 @@ public class RememberEntitiesFailureSpec : AkkaSpec
26
27
{
27
28
internal class EntityActor : ActorBase
28
29
{
29
- private readonly ILoggingAdapter log = Context . GetLogger ( ) ;
30
+ private readonly ILoggingAdapter _log = Context . GetLogger ( ) ;
30
31
31
32
public EntityActor ( )
32
33
{
33
- log . Info ( "Entity actor [{0}] starting up" , Context . Self . Path . Name ) ;
34
+ _log . Info ( "Entity actor [{0}] starting up" , Context . Self . Path . Name ) ;
34
35
}
35
36
36
37
protected override bool Receive ( object message )
37
38
{
38
39
switch ( message )
39
40
{
40
41
case "stop" :
41
- log . Info ( "Stopping myself!" ) ;
42
+ _log . Info ( "Stopping myself!" ) ;
42
43
Context . Stop ( Self ) ;
43
44
return true ;
44
45
case "graceful-stop" :
@@ -122,8 +123,8 @@ public Delay(TimeSpan howLong)
122
123
}
123
124
124
125
// outside store since we need to be able to set them before sharding initializes
125
- private static ImmutableDictionary < string , IFail > failShardGetEntities = ImmutableDictionary < string , IFail > . Empty ;
126
- private static readonly IFail failCoordinatorGetShards = null ;
126
+ private static ImmutableDictionary < string , IFail > _failShardGetEntities = ImmutableDictionary < string , IFail > . Empty ;
127
+ private static readonly IFail FailCoordinatorGetShards = null ;
127
128
128
129
private class ShardStoreCreated
129
130
{
@@ -221,7 +222,7 @@ protected override bool Receive(object message)
221
222
switch ( message )
222
223
{
223
224
case RememberEntitiesShardStore . GetEntities _:
224
- switch ( failShardGetEntities . GetValueOrDefault ( shardId ) )
225
+ switch ( _failShardGetEntities . GetValueOrDefault ( shardId ) )
225
226
{
226
227
case null :
227
228
Sender . Tell ( new RememberEntitiesShardStore . RememberedEntities ( ImmutableHashSet < string > . Empty ) ) ;
@@ -317,7 +318,7 @@ protected override bool Receive(object message)
317
318
switch ( message )
318
319
{
319
320
case RememberEntitiesCoordinatorStore . GetShards _:
320
- switch ( failCoordinatorGetShards )
321
+ switch ( FailCoordinatorGetShards )
321
322
{
322
323
case null :
323
324
Sender . Tell ( new RememberEntitiesCoordinatorStore . RememberedShards ( ImmutableHashSet < string > . Empty ) ) ;
@@ -390,7 +391,7 @@ protected override bool Receive(object message)
390
391
akka.cluster.sharding.updating-state-timeout = 1s
391
392
akka.cluster.sharding.verbose-debug-logging = on
392
393
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on" )
393
- . WithFallback ( ClusterSingletonManager . DefaultConfig ( )
394
+ . WithFallback ( ClusterSingleton . DefaultConfig ( )
394
395
. WithFallback ( ClusterSharding . DefaultConfig ( ) ) ) ;
395
396
396
397
public RememberEntitiesFailureSpec ( ITestOutputHelper helper ) : base ( SpecConfig , helper )
@@ -441,7 +442,7 @@ public void Remember_entities_handling_in_sharding_must_recover_when_initial_rem
441
442
private void Remember_entities_handling_in_sharding_must_recover_when_initial_remember_entities_load_fails ( IFail wayToFail )
442
443
{
443
444
Log . Debug ( "Getting entities for shard 1 will fail" ) ;
444
- failShardGetEntities = ImmutableDictionary < string , IFail > . Empty . Add ( "1" , wayToFail ) ;
445
+ _failShardGetEntities = ImmutableDictionary < string , IFail > . Empty . Add ( "1" , wayToFail ) ;
445
446
446
447
try
447
448
{
@@ -456,7 +457,7 @@ private void Remember_entities_handling_in_sharding_must_recover_when_initial_re
456
457
probe . ExpectNoMsg ( ) ; // message is lost because shard crashes
457
458
458
459
Log . Debug ( "Resetting initial fail" ) ;
459
- failShardGetEntities = ImmutableDictionary < string , IFail > . Empty ;
460
+ _failShardGetEntities = ImmutableDictionary < string , IFail > . Empty ;
460
461
461
462
// shard should be restarted and eventually succeed
462
463
AwaitAssert ( ( ) =>
@@ -469,7 +470,7 @@ private void Remember_entities_handling_in_sharding_must_recover_when_initial_re
469
470
}
470
471
finally
471
472
{
472
- failShardGetEntities = ImmutableDictionary < string , IFail > . Empty ;
473
+ _failShardGetEntities = ImmutableDictionary < string , IFail > . Empty ;
473
474
}
474
475
}
475
476
@@ -694,41 +695,41 @@ private void Remember_entities_handling_in_sharding_must_recover_on_graceful_ent
694
695
}
695
696
696
697
[ Fact ]
697
- public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_NoResponse ( )
698
+ public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_NoResponse ( )
698
699
{
699
- Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new NoResponse ( ) ) ;
700
+ return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new NoResponse ( ) ) ;
700
701
}
701
702
702
703
[ Fact ]
703
- public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_CrashStore ( )
704
+ public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_CrashStore ( )
704
705
{
705
- Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new CrashStore ( ) ) ;
706
+ return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new CrashStore ( ) ) ;
706
707
}
707
708
708
709
[ Fact ]
709
- public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_StopStore ( )
710
+ public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_StopStore ( )
710
711
{
711
- Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new StopStore ( ) ) ;
712
+ return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new StopStore ( ) ) ;
712
713
}
713
714
714
715
[ Fact ]
715
- public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_500 ( )
716
+ public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_500 ( )
716
717
{
717
- Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new Delay ( TimeSpan . FromMilliseconds ( 500 ) ) ) ;
718
+ return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new Delay ( TimeSpan . FromMilliseconds ( 500 ) ) ) ;
718
719
}
719
720
720
721
[ Fact ]
721
- public void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_1000 ( )
722
+ public Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails_Delay_1000 ( )
722
723
{
723
- Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new Delay ( TimeSpan . FromSeconds ( 1 ) ) ) ;
724
+ return Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( new Delay ( TimeSpan . FromSeconds ( 1 ) ) ) ;
724
725
}
725
726
726
- private void Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( IFail wayToFail )
727
+ private async Task Remember_entities_handling_in_sharding_must_recover_when_coordinator_storing_shard_start_fails ( IFail wayToFail )
727
728
{
728
729
var storeProbe = CreateTestProbe ( ) ;
729
730
Sys . EventStream . Subscribe ( storeProbe . Ref , typeof ( CoordinatorStoreCreated ) ) ;
730
731
731
- var sharding = ClusterSharding . Get ( Sys ) . Start (
732
+ var sharding = await ClusterSharding . Get ( Sys ) . StartAsync (
732
733
$ "coordinatorStoreStopGraceful-{ wayToFail } ",
733
734
Props . Create ( ( ) => new EntityActor ( ) ) ,
734
735
ClusterShardingSettings . Create ( Sys ) . WithRememberEntities ( true ) ,
@@ -739,12 +740,12 @@ private void Remember_entities_handling_in_sharding_must_recover_when_coordinato
739
740
var probe = CreateTestProbe ( ) ;
740
741
741
742
// coordinator store is triggered by coordinator starting up
742
- var coordinatorStore = storeProbe . ExpectMsg < CoordinatorStoreCreated > ( ) . Store ;
743
+ var coordinatorStore = ( await storeProbe . ExpectMsgAsync < CoordinatorStoreCreated > ( ) ) . Store ;
743
744
coordinatorStore . Tell ( new FakeCoordinatorStoreActor . FailAddShard ( "1" , wayToFail ) , probe . Ref ) ;
744
- probe . ExpectMsg < Done > ( ) ;
745
+ await probe . ExpectMsgAsync < Done > ( ) ;
745
746
746
747
sharding . Tell ( new EntityEnvelope ( 1 , "hello-1" ) , probe . Ref ) ;
747
- probe . ExpectNoMsg ( TimeSpan . FromSeconds ( 1 ) ) ; // because shard cannot start while store failing
748
+ await probe . ExpectNoMsgAsync ( TimeSpan . FromSeconds ( 1 ) ) ; // because shard cannot start while store failing
748
749
749
750
if ( wayToFail is StopStore or CrashStore )
750
751
{
@@ -754,12 +755,12 @@ private void Remember_entities_handling_in_sharding_must_recover_when_coordinato
754
755
755
756
// fail it when stopping
756
757
coordinatorStore . Tell ( new FakeCoordinatorStoreActor . ClearFailShard ( "1" ) , storeProbe . Ref ) ;
757
- storeProbe . ExpectMsg < Done > ( ) ;
758
+ await storeProbe . ExpectMsgAsync < Done > ( ) ;
758
759
759
- probe . AwaitAssert ( ( ) =>
760
+ await probe . AwaitAssertAsync ( async ( ) =>
760
761
{
761
762
sharding . Tell ( new EntityEnvelope ( 1 , "hello-2" ) , probe . Ref ) ;
762
- probe . ExpectMsg ( "hello-2" ) ; // should now work again
763
+ await probe . ExpectMsgAsync ( "hello-2" ) ; // should now work again
763
764
} , TimeSpan . FromSeconds ( 5 ) ) ;
764
765
765
766
Sys . Stop ( sharding ) ;
0 commit comments