Skip to content

Commit

Permalink
Fix ClusterSingletonProxy fails to reacquire singleton actor (#7315)
Browse files Browse the repository at this point in the history
* MNTR Reproduction for akkadotnet/Akka.Management#2490

* Implement fix

* Add comment
  • Loading branch information
Arkatufus authored Aug 8, 2024
1 parent da3ded3 commit 5e7be3a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using FluentAssertions.Extensions;

namespace Akka.Cluster.Tools.Tests.MultiNode.Singleton
{
Expand All @@ -43,7 +44,7 @@ public ClusterSingletonManagerLeaseSpecConfig()
akka.actor.provider = ""cluster""
akka.remote.log-remote-lifecycle-events = off
#akka.cluster.auto-down-unreachable-after = off
#akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
# akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.testkit.auto-down-unreachable-after = 0s
test-lease {
Expand Down Expand Up @@ -134,7 +135,9 @@ public class ClusterSingletonManagerLeaseSpec : MultiNodeClusterSpec
protected override int InitialParticipantsValueFactory => Roles.Count;

// used on the controller
private TestProbe leaseProbe;
private TestProbe _leaseProbe;

private IActorRef _proxy;

public ClusterSingletonManagerLeaseSpec()
: this(new ClusterSingletonManagerLeaseSpecConfig())
Expand All @@ -145,7 +148,7 @@ protected ClusterSingletonManagerLeaseSpec(ClusterSingletonManagerLeaseSpecConfi
{
_config = config;

leaseProbe = CreateTestProbe();
_leaseProbe = CreateTestProbe();
}

[MultiNodeFact]
Expand All @@ -156,6 +159,7 @@ public void ClusterSingletonManagerLeaseSpecs()
Cluster_singleton_manager_with_lease_should_find_the_lease_on_every_node();
Cluster_singleton_manager_with_lease_should_Start_singleton_and_ping_from_all_nodes();
Cluster_singleton_manager_with_lease_should_Move_singleton_when_oldest_node_downed();
Cluster_singleton_manager_with_lease_proxy_should_reacquire_singleton_actor_when_lease_lost();
}

public void Cluster_singleton_manager_with_lease_should_form_a_cluster()
Expand Down Expand Up @@ -216,19 +220,21 @@ public void Cluster_singleton_manager_with_lease_should_Start_singleton_and_ping
{
Sys.ActorOf(
ClusterSingletonManager.Props(
ClusterSingletonManagerLeaseSpecConfig.ImportantSingleton.Props, PoisonPill.Instance, ClusterSingletonManagerSettings.Create(Sys).WithRole("worker")),
ClusterSingletonManagerLeaseSpecConfig.ImportantSingleton.Props,
PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(Sys).WithRole("worker")),
"important");
}, _config.First, _config.Second, _config.Third, _config.Fourth);
EnterBarrier("singleton-started");

var proxy = Sys.ActorOf(
_proxy = Sys.ActorOf(
ClusterSingletonProxy.Props(
singletonManagerPath: "/user/important",
settings: ClusterSingletonProxySettings.Create(Sys).WithRole("worker")));

RunOn(() =>
{
proxy.Tell("Ping");
_proxy.Tell("Ping");
// lease has not been granted so now allowed to come up
ExpectNoMsg(TimeSpan.FromSeconds(2));
}, _config.First, _config.Second, _config.Third, _config.Fourth);
Expand Down Expand Up @@ -286,32 +292,88 @@ public void Cluster_singleton_manager_with_lease_should_Move_singleton_when_olde

EnterBarrier("first node downed");

var proxy = Sys.ActorOf(
ClusterSingletonProxy.Props(
singletonManagerPath: "/user/important",
settings: ClusterSingletonProxySettings.Create(Sys).WithRole("worker")));

RunOn(() =>
{
proxy.Tell("Ping");
_proxy.Tell("Ping");
// lease has not been granted so now allowed to come up
ExpectNoMsg(TimeSpan.FromSeconds(2));
}, _config.Second, _config.Third, _config.Fourth);
EnterBarrier("singleton-not-migrated");

RunOn(() =>
{
TestLeaseActorClientExt.Get(Sys).GetLeaseActor().Tell(new TestLeaseActor.ActionRequest(new TestLeaseActor.Acquire(GetAddress(_config.Second).HostPort()), true));
var leaseActor = TestLeaseActorClientExt.Get(Sys).GetLeaseActor();
leaseActor.Tell(new TestLeaseActor.ActionRequest(new TestLeaseActor.Release(GetAddress(_config.First).HostPort()), true));
leaseActor.Tell(new TestLeaseActor.ActionRequest(new TestLeaseActor.Acquire(GetAddress(_config.Second).HostPort()), true));
}, _config.Controller);

EnterBarrier("singleton-moving-to-second");

RunOn(() =>
{
ExpectMsg(new ClusterSingletonManagerLeaseSpecConfig.ImportantSingleton.Response("Ping", GetAddress(_config.Second)), TimeSpan.FromSeconds(20));
}, _config.Second, _config.Third, _config.Fourth);

EnterBarrier("singleton-moved-to-second");
}

// Reproduction for https://github.com/akkadotnet/Akka.Management/issues/2490
public void Cluster_singleton_manager_with_lease_proxy_should_reacquire_singleton_actor_when_lease_lost()
{
RunOn(() =>
{
var singletonManager = new RootActorPath(GetAddress(_config.Second)) / "user" / "important";
var selection = Sys.ActorSelection(singletonManager);
var actorRef = selection.ResolveOne(3.Seconds()).GetAwaiter().GetResult();
actorRef.Tell(new LeaseLost(new Exception("Lease not found")), TestLeaseActorClientExt.Get(Sys).GetLeaseActor());
}, _config.Second);

EnterBarrier("lease-deleted");

RunOn(() =>
{
TestLeaseActor.LeaseRequests requests = null;
AwaitAssert(() =>
{
TestLeaseActorClientExt.Get(Sys).GetLeaseActor().Tell(TestLeaseActor.GetRequests.Instance);
var msg = ExpectMsg<TestLeaseActor.LeaseRequests>();
msg.Requests.Count.Should().Be(2);
requests = msg;
}, TimeSpan.FromSeconds(10));
requests.Requests[0].Should().Be(new TestLeaseActor.Acquire(GetAddress(_config.Second).HostPort()));
requests.Requests[1].Should().Be(new TestLeaseActor.Release(GetAddress(_config.Second).HostPort()));
TestLeaseActorClientExt.Get(Sys).GetLeaseActor().Tell(
new TestLeaseActor.ActionRequest(new TestLeaseActor.Release(GetAddress(_config.Second).HostPort()), false));
}, _config.Controller);

EnterBarrier("singleton-actor-downed");

RunOn(() =>
{
_proxy.Tell("Ping");
// lease was lost
ExpectNoMsg(TimeSpan.FromSeconds(2));
}, _config.Second, _config.Third, _config.Fourth);
EnterBarrier("lease-lost");

RunOn(() =>
{
TestLeaseActorClientExt.Get(Sys).GetLeaseActor().Tell(new TestLeaseActor.ActionRequest(new TestLeaseActor.Acquire(GetAddress(_config.Second).HostPort()), true));
}, _config.Controller);

EnterBarrier("singleton-actor-recreated");

// In the bug, even though second node manages to reacquire the lease and restarts the singleton actor,
// all the proxies failed to reacquire the new singleton actor ref
RunOn(() =>
{
proxy.Tell("Ping");
ExpectMsg(new ClusterSingletonManagerLeaseSpecConfig.ImportantSingleton.Response("Ping", GetAddress(_config.Second)), TimeSpan.FromSeconds(20));
}, _config.Second, _config.Third, _config.Fourth);
EnterBarrier("finished");

EnterBarrier("singleton-proxy-reacquire-singleton-actor");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
{
if (Equals(_singleton, terminated.ActorRef))
{
// buffering mode, identification of new will start when old node is removed
// buffering mode
_singleton = null;
// Bugfix: https://github.com/akkadotnet/Akka.Management/issues/2490
// try to re-acquire singleton in-case this is caused by a lost lease condition
IdentifySingleton();
}
});
ReceiveAny(msg =>
Expand Down

0 comments on commit 5e7be3a

Please sign in to comment.