diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 42d56172e35..1cbe87146a8 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -1,3 +1,89 @@
+#### 1.4.41 August 31 2022 ####
+Akka.NET v1.4.41 is a minor release that contains some minor bug fix and throughput performance improvement for Akka.Remote
+
+* [Akka: Fix AddLogger in LoggingBus](https://github.com/akkadotnet/akka.net/issues/6028)
+
+ Akka loggers are now loaded asynchronously by default. The `ActorSystem` will wait at most `akka.logger-startup-timeout` period long (5 seconds by default) for all loggers to report that they are ready before continuing the start-up process.
+
+ A warning will be logged on each loggers that did not report within this grace period. These loggers will still be awaited upon inside a detached Task until either it is ready or the `ActorSystem` is shut down.
+
+ These late loggers will not capture all log events until they are ready. If your logs are missing portion of the start-up events, check that the logger were loaded within this grace period.
+
+* [Akka: Log Exception cause inside Directive.Resume SupervisorStrategy warning log](https://github.com/akkadotnet/akka.net/issues/6070)
+* [DData: Add "verbose-debug-logging" setting to suppress debug message spam](https://github.com/akkadotnet/akka.net/issues/6080)
+* [Akka: Regenerate protobuf codes](https://github.com/akkadotnet/akka.net/issues/6087)
+
+ All protobuf codes were re-generated, causing a significant improvement in message deserialization, increasing `Akka.Remote` throughput.
+
+__Before__
+``` ini
+BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19041.1415 (2004/May2020Update/20H1)
+AMD Ryzen 9 3900X, 1 CPU, 24 logical and 12 physical cores
+.NET SDK=6.0.200
+ [Host] : .NET 6.0.2 (6.0.222.6406), X64 RyuJIT
+ DefaultJob : .NET 6.0.2 (6.0.222.6406), X64 RyuJIT
+```
+| Method | Mean | Error | StdDev | Gen 0 | Gen 1 | Allocated |
+|----------------------- |-----------:|---------:|---------:|-------:|-------:|----------:|
+| WritePayloadPdu | 1,669.6 ns | 21.10 ns | 19.74 ns | 0.2156 | - | 1,808 B |
+| DecodePayloadPdu | 2,039.7 ns | 12.52 ns | 11.71 ns | 0.2156 | 0.0031 | 1,816 B |
+| DecodePduOnly | 131.3 ns | 1.32 ns | 1.11 ns | 0.0563 | 0.0002 | 472 B |
+| DecodeMessageOnly | 1,665.0 ns | 15.03 ns | 14.05 ns | 0.1406 | - | 1,184 B |
+| DeserializePayloadOnly | 151.2 ns | 1.88 ns | 1.76 ns | 0.0199 | - | 168 B |
+
+__After__
+``` ini
+BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19041.1415 (2004/May2020Update/20H1)
+AMD Ryzen 9 3900X, 1 CPU, 24 logical and 12 physical cores
+.NET SDK=6.0.200
+ [Host] : .NET 6.0.2 (6.0.222.6406), X64 RyuJIT
+ DefaultJob : .NET 6.0.2 (6.0.222.6406), X64 RyuJIT
+```
+| Method | Mean | Error | StdDev | Gen 0 | Gen 1 | Allocated |
+|----------------------- |-----------:|---------:|---------:|-------:|-------:|----------:|
+| WritePayloadPdu | 1,623.4 ns | 19.95 ns | 18.66 ns | 0.2219 | 0.0031 | 1,880 B |
+| DecodePayloadPdu | 1,738.6 ns | 22.79 ns | 21.31 ns | 0.2250 | - | 1,888 B |
+| DecodePduOnly | 175.1 ns | 2.31 ns | 1.93 ns | 0.0572 | - | 480 B |
+| DecodeMessageOnly | 1,296.8 ns | 11.89 ns | 10.54 ns | 0.1469 | 0.0016 | 1,232 B |
+| DeserializePayloadOnly | 143.6 ns | 1.59 ns | 1.33 ns | 0.0199 | 0.0002 | 168 B |
+
+
+If you want to see the [full set of changes made in Akka.NET v1.4.41, click here](https://github.com/akkadotnet/akka.net/milestone/72).
+
+| COMMITS | LOC+ | LOC- | AUTHOR |
+|---------|-------|------|---------------------|
+| 4 | 13003 | 1150 | Gregorius Soedharmo |
+| 1 | 3 | 4 | Aaron Stannard |
+
+#### 1.4.40 July 19 2022 ####
+Akka.NET v1.4.40 is a minor release that contains a bug fix for DotNetty SSL support.
+
+* [Akka.Remote: SSL Configuration Fails even EnbleSsl property is set to false](https://github.com/akkadotnet/akka.net/issues/6043)
+* [Akka.Streams: Add IAsyncEnumerable Source](https://github.com/akkadotnet/akka.net/issues/6047)
+
+If you want to see the [full set of changes made in Akka.NET v1.4.40, click here](https://github.com/akkadotnet/akka.net/milestone/71).
+
+| COMMITS | LOC+ | LOC- | AUTHOR |
+|---------|-------|------|---------------------|
+| 8 | 544 | 64 | Gregorius Soedharmo |
+| 1 | 669 | 3 | Aaron Stannard |
+| 1 | 123 | 26 | Ebere Abanonu |
+| 1 | 101 | 3 | aminchenkov |
+
+#### 1.4.39 June 1 2022 ####
+Akka.NET v1.4.39 is a minor release that contains some very important bug fixes for Akka.Remote and Akka.Cluster users.
+
+* [Akka.Cluster: Error in `SplitBrainResolver.PreStart` when using `ChannelTaskScheduler` for internal-dispatcher](https://github.com/akkadotnet/akka.net/issues/5962)
+* [Akka.Cluster.Sharding: make PersistentShardCoordinator a tolerant reader](https://github.com/akkadotnet/akka.net/issues/5604) - Akka.Persistence-backed sharding is more lenient when recovering state.
+* [Akka.Remote: Trap all `Exception`s thrown while trying to dispatch messages in `Akka.Remote.EndpointReader`](https://github.com/akkadotnet/akka.net/pull/5971) - any kind of exception thrown during deserialization can no longer force a disassociation to occur in Akka.Remote.
+
+If you want to see the [full set of changes made in Akka.NET v1.4.39, click here](https://github.com/akkadotnet/akka.net/milestone/70).
+
+| COMMITS | LOC+ | LOC- | AUTHOR |
+| --- | --- | --- | --- |
+| 3 | 204 | 99 | Aaron Stannard |
+| 1 | 1 | 13 | Gregorius Soedharmo |
+
#### 1.4.38 May 6 2022 ####
Akka.NET v1.4.38 is a minor release that contains some minor bug fixes.
diff --git a/build.ps1 b/build.ps1
index d5a61e2dcec..a0c44742283 100644
--- a/build.ps1
+++ b/build.ps1
@@ -32,7 +32,7 @@ Param(
$FakeVersion = "4.63.0"
$NugetVersion = "5.8.0";
$NugetUrl = "https://dist.nuget.org/win-x86-commandline/v$NugetVersion/nuget.exe"
-$ProtobufVersion = "3.13.0"
+$ProtobufVersion = "3.21.5"
$DocfxVersion = "2.58.9"
$IncrementalistVersion = "0.6.0";
diff --git a/src/Akka.sln b/src/Akka.sln
index a06864509d8..92c600e4c62 100644
--- a/src/Akka.sln
+++ b/src/Akka.sln
@@ -86,6 +86,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
..\.gitignore = ..\.gitignore
Akka.sln.DotSettings = Akka.sln.DotSettings
NuGet.Config = NuGet.Config
+ ..\RELEASE_NOTES.md = ..\RELEASE_NOTES.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Remote.Tests.MultiNode", "core\Akka.Remote.Tests.MultiNode\Akka.Remote.Tests.MultiNode.csproj", "{C9105C76-B084-4DA1-9348-1C74A8F22F6B}"
diff --git a/src/benchmark/Akka.Benchmarks/Configurations/Configs.cs b/src/benchmark/Akka.Benchmarks/Configurations/Configs.cs
index 646a42cfc1f..31ee17a20d0 100644
--- a/src/benchmark/Akka.Benchmarks/Configurations/Configs.cs
+++ b/src/benchmark/Akka.Benchmarks/Configurations/Configs.cs
@@ -8,6 +8,7 @@
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Exporters;
+using BenchmarkDotNet.Loggers;
namespace Akka.Benchmarks.Configurations
{
@@ -20,6 +21,7 @@ public MicroBenchmarkConfig()
{
this.Add(MemoryDiagnoser.Default);
this.Add(MarkdownExporter.GitHub);
+ AddLogger(ConsoleLogger.Default);
}
}
diff --git a/src/benchmark/Akka.Benchmarks/Remoting/AkkaPduCodecBenchmark.cs b/src/benchmark/Akka.Benchmarks/Remoting/AkkaPduCodecBenchmark.cs
new file mode 100644
index 00000000000..058fe1d1551
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/Remoting/AkkaPduCodecBenchmark.cs
@@ -0,0 +1,194 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2022 Lightbend Inc.
+// // Copyright (C) 2013-2022 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Akka.Actor;
+using Akka.Actor.Dsl;
+using Akka.Benchmarks.Configurations;
+using Akka.Configuration;
+using Akka.Remote;
+using Akka.Remote.Serialization;
+using Akka.Remote.Transport;
+using BenchmarkDotNet.Attributes;
+using BenchmarkDotNet.Loggers;
+using Google.Protobuf;
+
+namespace Akka.Benchmarks.Remoting
+{
+ [Config(typeof(MicroBenchmarkConfig))]
+ public class AkkaPduCodecBenchmark
+ {
+ public const int Operations = 10_000;
+
+ private ExtendedActorSystem _sys1;
+ private ExtendedActorSystem _sys2;
+ private IRemoteActorRefProvider _rarp;
+
+ private Config _config = @"akka.actor.provider = remote
+ akka.remote.dot-netty.tcp.port = 0";
+
+ private IActorRef _senderActorRef;
+ private IActorRef _localReceiveRef;
+ private RemoteActorRef _remoteReceiveRef;
+ private RemoteActorRef _remoteSenderRef;
+
+ private Address _addr1;
+ private Address _addr2;
+ private AkkaPduProtobuffCodec _recvCodec;
+ private AkkaPduProtobuffCodec _sendCodec;
+
+ ///
+ /// The message we're going to serialize
+ ///
+ private readonly object _message = "foobar";
+
+ private readonly Ack _lastAck = new Ack(-1);
+
+ private ByteString _fullDecode;
+ private ByteString _pduDecoded;
+ private Akka.Remote.Serialization.Proto.Msg.Payload _payloadDecoded;
+
+ [GlobalSetup]
+ public async Task Setup()
+ {
+ _sys1 = (ExtendedActorSystem)ActorSystem.Create("BenchSys", _config);
+ _sys2 = (ExtendedActorSystem)ActorSystem.Create("BenchSys", _config);
+ _rarp = RARP.For(_sys1).Provider;
+ _addr1 = _rarp.DefaultAddress;
+ _addr2 = RARP.For(_sys2).Provider.DefaultAddress;
+
+ _senderActorRef =
+ _sys2.ActorOf(act => { act.ReceiveAny((o, context) => context.Sender.Tell(context.Sender)); },
+ "sender1");
+
+ _localReceiveRef = _sys1.ActorOf(act => { act.ReceiveAny((o, context) => context.Sender.Tell(context.Sender)); },
+ "recv1");
+
+ // create an association
+ _remoteReceiveRef = (RemoteActorRef)(await _sys2.ActorSelection(new RootActorPath(RARP.For(_sys1).Provider.DefaultAddress) / "user" /
+ _localReceiveRef.Path.Name).ResolveOne(TimeSpan.FromSeconds(3)));
+
+ _remoteSenderRef = (RemoteActorRef)(await _sys1.ActorSelection(new RootActorPath(RARP.For(_sys2).Provider.DefaultAddress) / "user" /
+ _senderActorRef.Path.Name).ResolveOne(TimeSpan.FromSeconds(3)));
+
+ _recvCodec = new AkkaPduProtobuffCodec(_sys1);
+ _sendCodec = new AkkaPduProtobuffCodec(_sys2);
+ _fullDecode = CreatePayloadPdu();
+ _pduDecoded = ((Payload)_recvCodec.DecodePdu(_fullDecode)).Bytes;
+ _payloadDecoded = _recvCodec.DecodeMessage(_pduDecoded, _rarp, _addr1).MessageOption.SerializedMessage;
+ }
+
+ [GlobalCleanup]
+ public async Task Cleanup()
+ {
+
+ void PrintCacheStats(string prefix, ActorSystem sys)
+ {
+ var resolveCache = ActorRefResolveThreadLocalCache.For(sys);
+ var pathCache = ActorPathThreadLocalCache.For(sys);
+ var addressCache = AddressThreadLocalCache.For(sys);
+
+ ConsoleLogger.Default.WriteLine(LogKind.Result,
+ $"[{prefix}] ResolveCache entries: [{resolveCache.Cache.Stats.Entries}]");
+ ConsoleLogger.Default.WriteLine(LogKind.Result,
+ $"[{prefix}] PathCache entries: [{pathCache.Cache.Stats.Entries}]");
+ ConsoleLogger.Default.WriteLine(LogKind.Result,
+ $"[{prefix}] AddressCache entries: [{addressCache.Cache.Stats.Entries}]");
+ }
+
+ PrintCacheStats("Addr1", _sys1);
+ PrintCacheStats("Addr2", _sys2);
+
+ var resolveCache = ActorRefResolveThreadLocalCache.For(_sys1);
+ var pathCache = ActorPathThreadLocalCache.For(_sys1);
+ var addressCache = AddressThreadLocalCache.For(_sys1);
+
+ var senderResolveCache = ActorRefResolveThreadLocalCache.For(_sys2);
+ var senderPathCache = ActorPathThreadLocalCache.For(_sys2);
+ var senderAddressCache = AddressThreadLocalCache.For(_sys2);
+
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr1] Used ResolveCache for recipient? {resolveCache.Cache.TryGet(_remoteReceiveRef.Path.ToSerializationFormat(), out _)}");
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr1] Used PathCache for recipient? {pathCache.Cache.TryGet(_remoteReceiveRef.Path.ToSerializationFormat(), out _)}");
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr1] Used ResolveCache for sender? {resolveCache.Cache.TryGet(_remoteSenderRef.Path.ToSerializationFormat(), out _)}");
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr1] Used PathCache for sender? {pathCache.Cache.TryGet(_remoteSenderRef.Path.ToSerializationFormat(), out _)}");
+
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr2] Used ResolveCache for recipient? {senderResolveCache.Cache.TryGet(_remoteReceiveRef.Path.ToSerializationFormat(), out _)}");
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr2] Used PathCache for recipient? {senderPathCache.Cache.TryGet(_remoteReceiveRef.Path.ToSerializationFormat(), out _)}");
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr2] Used ResolveCache for sender? {senderResolveCache.Cache.TryGet(_senderActorRef.Path.ToSerializationFormat(), out _)}");
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr2] Used PathCache for sender? {senderPathCache.Cache.TryGet(_senderActorRef.Path.ToSerializationFormat(), out _)}");
+
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr1] Used AddressCache for sys1? {addressCache.Cache.TryGet(_addr2.ToString(), out _)}");
+ ConsoleLogger.Default.WriteLine(LogKind.Result, $"[Addr2] Used AddressCache for sys2? {senderAddressCache.Cache.TryGet(_addr1.ToString(), out _)}");
+
+ await Task.WhenAll(_sys1.Terminate(), _sys2.Terminate());
+ }
+
+ ///
+ /// Simulates the write-side of the wire
+ ///
+ [Benchmark(OperationsPerInvoke = Operations)]
+ public void WritePayloadPdu()
+ {
+ for (var i = 0; i < Operations; i++)
+ {
+ CreatePayloadPdu();
+ }
+ }
+
+ ///
+ /// Simulates the read-side of the wire
+ ///
+ [Benchmark(OperationsPerInvoke = Operations)]
+ public void DecodePayloadPdu()
+ {
+ for (var i = 0; i < Operations; i++)
+ {
+ var pdu = _recvCodec.DecodePdu(_fullDecode);
+ if (pdu is Payload p)
+ {
+ var msg = _recvCodec.DecodeMessage(p.Bytes, _rarp, _addr1);
+ var deserialize = MessageSerializer.Deserialize(_sys1, msg.MessageOption.SerializedMessage);
+ }
+ }
+ }
+
+ [Benchmark(OperationsPerInvoke = Operations)]
+ public void DecodePduOnly()
+ {
+ for (var i = 0; i < Operations; i++)
+ {
+ var pdu = _recvCodec.DecodePdu(_fullDecode);
+ }
+ }
+
+ [Benchmark(OperationsPerInvoke = Operations)]
+ public void DecodeMessageOnly()
+ {
+ for (var i = 0; i < Operations; i++)
+ {
+ var msg = _recvCodec.DecodeMessage(_pduDecoded, _rarp, _addr1);
+ }
+ }
+
+ [Benchmark(OperationsPerInvoke = Operations)]
+ public void DeserializePayloadOnly()
+ {
+ for (var i = 0; i < Operations; i++)
+ {
+ var deserialize = MessageSerializer.Deserialize(_sys1, _payloadDecoded);
+ }
+ }
+
+ private ByteString CreatePayloadPdu()
+ {
+ return _sendCodec.ConstructPayload(_sendCodec.ConstructMessage(_remoteReceiveRef.LocalAddressToUse, _remoteReceiveRef,
+ MessageSerializer.Serialize(_sys2, _addr2, _message), _senderActorRef, null, _lastAck));
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/common.props b/src/common.props
index 9a757dcb3cc..c3f21b38243 100644
--- a/src/common.props
+++ b/src/common.props
@@ -1,8 +1,8 @@
- Copyright © 2013-2021 Akka.NET Team
+ Copyright © 2013-2022 Akka.NET Team
Akka.NET Team
- 1.4.38
+ 1.4.39
akkalogo.png
https://github.com/akkadotnet/akka.net
https://github.com/akkadotnet/akka.net/blob/master/LICENSE
@@ -34,7 +34,15 @@
true
- Placeholder for nightlies**
+ Akka.NET v1.4.39 is a minor release that contains some very important bug fixes for Akka.Remote and Akka.Cluster users.
+[Akka.Cluster: Error in `SplitBrainResolver.PreStart` when using `ChannelTaskScheduler` for internal-dispatcher](https://github.com/akkadotnet/akka.net/issues/5962)
+[Akka.Cluster.Sharding: make PersistentShardCoordinator a tolerant reader](https://github.com/akkadotnet/akka.net/issues/5604) - Akka.Persistence-backed sharding is more lenient when recovering state.
+[Akka.Remote: Trap all `Exception`s thrown while trying to dispatch messages in `Akka.Remote.EndpointReader`](https://github.com/akkadotnet/akka.net/pull/5971) - any kind of exception thrown during deserialization can no longer force a disassociation to occur in Akka.Remote.
+If you want to see the [full set of changes made in Akka.NET v1.4.39, click here](https://github.com/akkadotnet/akka.net/milestone/70).
+| COMMITS | LOC+ | LOC- | AUTHOR |
+| --- | --- | --- | --- |
+| 3 | 204 | 99 | Aaron Stannard |
+| 1 | 1 | 13 | Gregorius Soedharmo |
diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj
index 09af113497e..fd43d3604de 100644
--- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj
+++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Akka.Cluster.Sharding.Tests.csproj
@@ -37,4 +37,9 @@
Always
+
+
+ ClusterGenerators.cs
+
+
\ No newline at end of file
diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs
index 55406a48924..8dfc5bc2195 100644
--- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs
+++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs
@@ -6,14 +6,17 @@
//-----------------------------------------------------------------------
using System;
+using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
+using Akka.Event;
using Akka.TestKit;
-using Akka.Util.Extensions;
+using Akka.Util;
using FluentAssertions;
+using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
@@ -36,6 +39,52 @@ protected override void OnReceive(object message)
{
}
}
+
+
+ internal class ConstructorFailActor : ActorBase
+ {
+ private static bool _thrown;
+ private readonly ILoggingAdapter _log = Context.GetLogger();
+
+ public ConstructorFailActor()
+ {
+ if (!_thrown)
+ {
+ _thrown = true;
+ throw new Exception("EXPLODING CONSTRUCTOR!");
+ }
+ }
+
+ protected override bool Receive(object message)
+ {
+ _log.Info("Msg {0}", message);
+ Sender.Tell($"ack {message}");
+ return true;
+ }
+ }
+
+ internal class PreStartFailActor : ActorBase
+ {
+ private static bool _thrown;
+ private readonly ILoggingAdapter _log = Context.GetLogger();
+
+ protected override void PreStart()
+ {
+ base.PreStart();
+ if (!_thrown)
+ {
+ _thrown = true;
+ throw new Exception("EXPLODING PRE-START!");
+ }
+ }
+
+ protected override bool Receive(object message)
+ {
+ _log.Info("Msg {0}", message);
+ Sender.Tell($"ack {message}");
+ return true;
+ }
+ }
static PersistentShardSpec()
{
@@ -87,5 +136,98 @@ public void Persistent_Shard_must_remember_entities_started_with_StartEntity()
ExpectMsgAllOf(new Shard.ShardStats("shard-1", 1));
});
}
+
+ [Theory(DisplayName = "Persistent shard must recover from transient failures inside sharding entity constructor and PreStart method")]
+ [MemberData(nameof(PropsFactory))]
+ public async Task Persistent_Shard_must_recover_from_failing_entity(Props entityProp)
+ {
+ ExtractEntityId extractEntityId = message =>
+ {
+ switch (message)
+ {
+ case ShardSpec.EntityEnvelope env:
+ return (env.Id.ToString(), env.Payload);
+ }
+ return Option<(string, object)>.None;
+ };
+
+ ExtractShardId extractShardId = message =>
+ {
+ switch (message)
+ {
+ case ShardSpec.EntityEnvelope msg:
+ return msg.Id.ToString();
+ }
+ return null;
+ };
+
+ var settings = ClusterShardingSettings.Create(Sys);
+ var tuning = settings.TuningParameters;
+ settings = settings.WithTuningParameters(new TuningParameters
+ (
+ coordinatorFailureBackoff: tuning.CoordinatorFailureBackoff,
+ retryInterval: tuning.RetryInterval,
+ bufferSize: tuning.BufferSize,
+ handOffTimeout: tuning.HandOffTimeout,
+ shardStartTimeout: tuning.ShardStartTimeout,
+ shardFailureBackoff: tuning.ShardFailureBackoff,
+ entityRestartBackoff: 1.Seconds(),
+ rebalanceInterval: tuning.RebalanceInterval,
+ snapshotAfter: tuning.SnapshotAfter,
+ keepNrOfBatches: tuning.KeepNrOfBatches,
+ leastShardAllocationRebalanceThreshold: tuning.LeastShardAllocationRebalanceThreshold,
+ leastShardAllocationMaxSimultaneousRebalance: tuning.LeastShardAllocationMaxSimultaneousRebalance,
+ waitingForStateTimeout: tuning.WaitingForStateTimeout,
+ updatingStateTimeout: tuning.UpdatingStateTimeout,
+ entityRecoveryStrategy: tuning.EntityRecoveryStrategy,
+ entityRecoveryConstantRateStrategyFrequency: tuning.EntityRecoveryConstantRateStrategyFrequency,
+ entityRecoveryConstantRateStrategyNumberOfEntities: tuning.EntityRecoveryConstantRateStrategyNumberOfEntities,
+ leastShardAllocationAbsoluteLimit: tuning.LeastShardAllocationAbsoluteLimit,
+ leastShardAllocationRelativeLimit: tuning.LeastShardAllocationRelativeLimit
+ ));
+
+ var props = Props.Create(() => new PersistentShard(
+ "cats",
+ "shard-1",
+ _ => entityProp,
+ settings,
+ extractEntityId,
+ extractShardId,
+ PoisonPill.Instance
+ ));
+
+ Sys.EventStream.Subscribe(TestActor);
+
+ var persistentShard = Sys.ActorOf(props);
+
+ persistentShard.Tell(new ShardRegion.StartEntity("1"));
+ ExpectMsg(new ShardRegion.StartEntityAck("1", "shard-1"));
+
+ // entity died here
+ var err = ExpectMsg();
+ err.Cause.Should().BeOfType();
+
+ await AwaitConditionAsync(() =>
+ {
+ persistentShard.Tell(Shard.GetCurrentShardState.Instance);
+ var failedState = ExpectMsg();
+ return failedState.EntityIds.Count == 0;
+ });
+
+ // entity should be restarted when it received this message
+ persistentShard.Tell(new ShardSpec.EntityEnvelope(1, "Restarted"));
+ ExpectMsg("ack Restarted");
+
+ persistentShard.Tell(Shard.GetCurrentShardState.Instance);
+ var state = ExpectMsg();
+ state.EntityIds.Count.Should().Be(1);
+ state.EntityIds.First().Should().Be("1");
+ }
+
+ public static IEnumerable