From 8fd8c62c60b2445183d3b2c0d19519d2696ab1a3 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 24 Jun 2024 22:49:31 +0700 Subject: [PATCH] Add ClusterClient initial contact discovery feature (#7261) * Add cluster client initial contact discovery feature * Add unit test * Fix timers, add verbose logging flag * Update API Approval list * Simplify logic * Simplify cluster client actor name * Remove cluster client name * Mark new Discovery.Config methods as InternalStableApi * Update API Approval list --- .../ClusterClientDiscoverySpec.cs | 297 ++++++++++++++++++ .../Akka.Cluster.Tools.csproj | 1 + .../Client/ClusterClient.cs | 4 +- .../Client/ClusterClientDiscovery.cs | 274 ++++++++++++++++ .../Client/ClusterClientDiscoverySettings.cs | 41 +++ .../Client/ClusterClientSettings.cs | 69 +++- .../Akka.Cluster.Tools/Client/reference.conf | 15 + ...ec.ApproveClusterTools.DotNet.verified.txt | 33 ++ ...ISpec.ApproveClusterTools.Net.verified.txt | 33 ++ ...ISpec.ApproveDiscovery.DotNet.verified.txt | 4 + ...eAPISpec.ApproveDiscovery.Net.verified.txt | 4 + .../Config/ConfigServiceDiscovery.cs | 66 +++- 12 files changed, 828 insertions(+), 13 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs create mode 100644 src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs create mode 100644 src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscoverySettings.cs diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs new file mode 100644 index 00000000000..819009d9458 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs @@ -0,0 +1,297 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.TestKit; +using Akka.Cluster.Tools.Client; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Configuration; +using Akka.Discovery; +using Akka.Discovery.Config; +using Akka.MultiNode.TestAdapter; +using Akka.Remote.TestKit; +using Akka.TestKit.TestActors; +using FluentAssertions; + +namespace Akka.Cluster.Tools.Tests.MultiNode.Client +{ + public sealed class ClusterClientDiscoverySpecConfig : MultiNodeConfig + { + public RoleName Client { get; } + public RoleName First { get; } + public RoleName Second { get; } + public RoleName Third { get; } + + public ClusterClientDiscoverySpecConfig() + { + Client = Role("client"); + First = Role("first"); + Second = Role("second"); + Third = Role("third"); + + CommonConfig = ConfigurationFactory.ParseString(""" +akka.loglevel = INFO + +akka.remote.dot-netty.tcp.hostname = localhost +akka.actor.provider = cluster +akka.remote.log-remote-lifecycle-events = off +akka.cluster.client { + heartbeat-interval = 1d + acceptable-heartbeat-pause = 1d + reconnect-timeout = 3s + refresh-contacts-interval = 1d +} +akka.test.filter-leeway = 10s +""") + .WithFallback(ClusterClientReceptionist.DefaultConfig()) + .WithFallback(DistributedPubSub.DefaultConfig()) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + + NodeConfig(new[]{ Client }, + new []{ + ConfigurationFactory.ParseString(""" +akka { + cluster.client { + heartbeat-interval = 1s + acceptable-heartbeat-pause = 2s + use-initial-contacts-discovery = true + reconnect-timeout = 4s + verbose-logging = true + discovery + { + service-name = test-cluster + discovery-timeout = 10s + } + } + + + discovery + { + method = config + config.services.test-cluster.endpoints = [] + } +} +""")}); + TestTransport = true; + } + } + + public class ClusterClientDiscoverySpec : MultiNodeClusterSpec + { + private readonly ClusterClientDiscoverySpecConfig _config; + private ConfigServiceDiscovery _discoveryService; + + public ClusterClientDiscoverySpec() : this(new ClusterClientDiscoverySpecConfig()) { } + + protected ClusterClientDiscoverySpec(ClusterClientDiscoverySpecConfig config) + : base(config, typeof(ClusterClientDiscoverySpec)) + { + _config = config; + } + + protected override int InitialParticipantsValueFactory => 3; + + private void Join(RoleName from, RoleName to) + { + RunOn(() => + { + Cluster.Join(Node(to).Address); + ClusterClientReceptionist.Get(Sys); + }, from); + EnterBarrier(from.Name + "-joined"); + } + + private IActorRef _clusterClient = null; + + [MultiNodeFact] + public void ClusterClientDiscoverySpecs() + { + ClusterClient_must_startup_cluster_with_single_node(); + ClusterClient_must_establish_connection_to_first_node(); + ClusterClient_must_down_existing_cluster(); + ClusterClient_second_node_must_form_a_new_cluster(); + ClusterClient_must_re_establish_on_cluster_restart(); + ClusterClient_must_simulate_a_cluster_forced_shutdown(); + ClusterClient_third_node_formed_a_cluster(); + ClusterClient_must_re_establish_on_cluster_restart_after_hard_shutdown(); + } + + private void ClusterClient_must_startup_cluster_with_single_node() + { + Within(TimeSpan.FromSeconds(30), () => + { + Join(_config.First, _config.First); + RunOn(() => + { + var service = Sys.ActorOf(EchoActor.Props(this), "testService"); + ClusterClientReceptionist.Get(Sys).RegisterService(service); + AwaitMembersUp(1); + }, _config.First); + EnterBarrier("cluster-started"); + + RunOn(() => + { + _discoveryService = + (ConfigServiceDiscovery)Discovery.Discovery.Get(Sys).LoadServiceDiscovery("config"); + var address = GetAddress(_config.First); + _discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); + + var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result; + resolved.Addresses.Count.Should().Be(1); + }, _config.Client); + EnterBarrier("discovery-entry-added"); + }); + } + + private void ClusterClient_must_establish_connection_to_first_node() + { + RunOn(() => + { + _clusterClient = Sys.ActorOf(ClusterClient.Props(ClusterClientSettings.Create(Sys)), "client1"); + + Within(TimeSpan.FromSeconds(5), () => + { + AwaitAssert(() => + { + _clusterClient.Tell(GetContactPoints.Instance, TestActor); + var contacts = ExpectMsg(TimeSpan.FromSeconds(1)).ContactPointsList; + contacts.Count.Should().Be(1); + contacts.First().Address.Should().Be(Node(_config.First).Address); + }, RemainingOrDefault); + }); + + _clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity:true)); + ExpectMsg().Should().Be("hello"); + }, _config.Client); + EnterBarrier("established"); + } + + private void ClusterClient_must_down_existing_cluster() + { + RunOn(() => + { + Cluster.Get(Sys).Leave(Node(_config.First).Address); + }, _config.First); + + EnterBarrier("cluster-downed"); + + RunOn(() => + { + var address = GetAddress(_config.First); + _discoveryService.TryRemoveEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); + + var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result; + resolved.Addresses.Count.Should().Be(0); + }, _config.Client); + EnterBarrier("discovery-entry-removed"); + + } + + private void ClusterClient_second_node_must_form_a_new_cluster() + { + Join(_config.Second, _config.Second); + RunOn(() => + { + var service = Sys.ActorOf(EchoActor.Props(this), "testService"); + ClusterClientReceptionist.Get(Sys).RegisterService(service); + AwaitMembersUp(1); + }, _config.Second); + + EnterBarrier("cluster-restarted"); + + RunOn(() => + { + var address = GetAddress(_config.Second); + _discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); + + var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result; + resolved.Addresses.Count.Should().Be(1); + }, _config.Client); + EnterBarrier("discovery-entry-updated"); + } + + private void ClusterClient_must_re_establish_on_cluster_restart() + { + RunOn(() => + { + Within(TimeSpan.FromSeconds(5), () => + { + AwaitAssert(() => + { + _clusterClient.Tell(GetContactPoints.Instance, TestActor); + var contacts = ExpectMsg(TimeSpan.FromSeconds(1)).ContactPointsList; + contacts.Count.Should().Be(1); + contacts.First().Address.Should().Be(Node(_config.Second).Address); + }, RemainingOrDefault); + }); + + _clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity: true)); + ExpectMsg().Should().Be("hello"); + + }, _config.Client); + EnterBarrier("re-establish-successful"); + } + + private void ClusterClient_must_simulate_a_cluster_forced_shutdown() + { + RunOn(() => + { + // simulate a hard shutdown + TestConductor.Exit(_config.Second, 0).Wait(); + }, _config.Client); + EnterBarrier("hard-shutdown-and-discovery-entry-updated"); + } + + private void ClusterClient_third_node_formed_a_cluster() + { + Join(_config.Third, _config.Third); + RunOn(() => + { + var service = Sys.ActorOf(EchoActor.Props(this), "testService"); + ClusterClientReceptionist.Get(Sys).RegisterService(service); + AwaitMembersUp(1); + }, _config.Third); + + EnterBarrier("cluster-restarted"); + + RunOn(() => + { + var address = GetAddress(_config.Third); + _discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); + + var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result; + resolved.Addresses.Count.Should().Be(2); + }, _config.Client); + EnterBarrier("discovery-entry-updated"); + } + + private void ClusterClient_must_re_establish_on_cluster_restart_after_hard_shutdown() + { + RunOn(() => + { + Within(TimeSpan.FromSeconds(20), () => + { + AwaitAssert(() => + { + _clusterClient.Tell(GetContactPoints.Instance, TestActor); + var contacts = ExpectMsg(TimeSpan.FromSeconds(1)).ContactPointsList; + contacts.Count.Should().Be(1); + contacts.First().Address.Should().Be(Node(_config.Third).Address); + }, TimeSpan.FromSeconds(20)); + }); + + _clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity: true)); + ExpectMsg().Should().Be("hello"); + + }, _config.Client); + EnterBarrier("re-establish-successful"); + } + + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj index 2d9908dcc6c..83e10ee8fc6 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj +++ b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj @@ -15,6 +15,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs index 074ee6ddd6f..42d05eaf562 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs @@ -274,7 +274,9 @@ public static Props Props(ClusterClientSettings settings) if (settings == null) throw new ArgumentNullException(nameof(settings)); - return Actor.Props.Create(() => new ClusterClient(settings)).WithDeploy(Deploy.Local); + return settings.UseInitialContactDiscovery + ? Actor.Props.Create(() => new ClusterClientDiscovery(settings)).WithDeploy(Deploy.Local) + : Actor.Props.Create(() => new ClusterClient(settings)).WithDeploy(Deploy.Local); } private ILoggingAdapter _log = Context.GetLogger(); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs new file mode 100644 index 00000000000..688f89b43d3 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs @@ -0,0 +1,274 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Discovery; +using Akka.Event; + +#nullable enable +namespace Akka.Cluster.Tools.Client; + +public class ClusterClientDiscovery: UntypedActor, IWithUnboundedStash, IWithTimers +{ + #region Discovery messages + + internal sealed class DiscoverTick: IEquatable + { + public static readonly DiscoverTick Instance = new(); + + private DiscoverTick() { } + public bool Equals(DiscoverTick? other) => other is not null; + public override bool Equals(object? obj) => ReferenceEquals(this, obj) || obj is DiscoverTick; + public override int GetHashCode() => 0; + } + + private sealed record Contact(ActorPath Path, ActorSelection Selection); + private sealed record DiscoveryFailure(Exception Cause); + private sealed record ResolveResult(Contact Contact, IActorRef? Subject); + + #endregion + + private readonly TimeSpan _defaultReconnectTimeout = TimeSpan.FromSeconds(10); + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly ClusterClientSettings _settings; + + private readonly ClusterClientDiscoverySettings _discoverySettings; + private readonly ServiceDiscovery? _serviceDiscovery; + private readonly Lookup? _lookup; + private readonly TimeSpan _discoveryTimeout; + private readonly TimeSpan _discoveryRetryInterval; + private readonly string _targetActorSystemName; + private readonly string _receptionistName; + private readonly string _transportProtocol; + + private readonly bool _verboseLogging; + + public ClusterClientDiscovery(ClusterClientSettings settings) + { + _settings = settings; + _discoverySettings = settings.DiscoverySettings; + + if(_settings.InitialContacts.Count > 0) + _log.Warning("Initial contacts is being ignored because ClusterClient contacts discovery is being used"); + + var discoveryMethod = _discoverySettings.DiscoveryMethod; + if(string.IsNullOrWhiteSpace(discoveryMethod) || discoveryMethod == "") + { + _log.Info( + "No default initial contacts discovery implementation configured in\n" + + "`akka.cluster.client.discovery.method`. Trying to Fall back to default\n" + + "discovery method declared in `akka.discovery.method`"); + discoveryMethod = Context.System.Settings.Config.GetString("akka.discovery.method"); + } + if (string.IsNullOrWhiteSpace(discoveryMethod) || discoveryMethod == "") + { + _log.Warning( + "No default initial contacts discovery implementation configured in both\n" + + "`akka.cluster.client.discovery.method` and `akka.discovery.method`.\n" + + "Make sure to configure this setting to your preferred implementation such as 'config'\n" + + "in your application.conf (from the akka-discovery module). Falling back to default config\n" + + "based discovery method"); + discoveryMethod = "config"; + } + + if (_settings.ReconnectTimeout is null) + { + _log.Warning( + "No reconnect timeout were configured in `akka.cluster.client.reconnect-timeout`,\n" + + "this setting is required when using cluster client initial contact discovery feature.\n" + + "Falling back to default value ({0}) instead.", _defaultReconnectTimeout); + _settings = _settings.WithReconnectTimeout(_defaultReconnectTimeout); + } + + if (string.IsNullOrWhiteSpace(_discoverySettings.ActorSystemName)) + { + _log.Warning( + "No target ActorSystem name configured in `akka.cluster.client.discovery.actor-system-name`,\n" + + "falling back to this ActorSystem name ({0}) instead.", Context.System.Name); + _targetActorSystemName = Context.System.Name; + } + else + { + _targetActorSystemName = _discoverySettings.ActorSystemName!; + } + + _transportProtocol = ((ExtendedActorSystem)Context.System).Provider.DefaultAddress.Protocol; + _receptionistName = settings.DiscoverySettings.ReceptionistName; + + _lookup = new Lookup(_discoverySettings.ServiceName, _discoverySettings.PortName); + _serviceDiscovery = Discovery.Discovery.Get(Context.System).LoadServiceDiscovery(discoveryMethod); + _discoveryRetryInterval = _settings.DiscoverySettings.DiscoveryRetryInterval; + _discoveryTimeout = _discoverySettings.DiscoveryTimeout; + + _verboseLogging = _settings.VerboseLogging; + + Become(Discovering); + } + + public IStash Stash { get; set; } = null!; + public ITimerScheduler Timers { get; set; } = null!; + + protected override void OnReceive(object message) + { + throw new NotImplementedException("Should never reach this code"); + } + + protected override void PreStart() + { + base.PreStart(); + + // Kickoff discovery lookup + Self.Tell(DiscoverTick.Instance); + } + + private ActorPath ResolvedTargetToReceptionistActorPath(ServiceDiscovery.ResolvedTarget target) + { + var networkAddress = string.IsNullOrWhiteSpace(target.Host) ? target.Address.ToString() : target.Host; + var address = new Address(_transportProtocol, _targetActorSystemName, networkAddress, target.Port); + return new RootActorPath(address) / "system" / _discoverySettings.ReceptionistName; + } + + private static async Task ResolveContact(Contact contact, TimeSpan timeout, CancellationToken ct) + { + try + { + var identity = await contact.Selection.Ask(new Identify(null), timeout, ct); + return new ResolveResult(contact, identity.Subject); + } + catch (Exception) + { + return new ResolveResult(contact, null); + } + } + + private bool Discovering(object message) + { + switch (message) + { + case DiscoverTick: + if(_verboseLogging && _log.IsDebugEnabled) + _log.Debug("Discovering initial contacts"); + + _serviceDiscovery!.Lookup(_lookup, _discoveryTimeout) + .PipeTo(Self, Self, failure: cause => new DiscoveryFailure(cause)); + return true; + + case ServiceDiscovery.Resolved resolved: + { + Timers.CancelAll(); + + if (resolved.Addresses.Count == 0) + { + if(_verboseLogging && _log.IsInfoEnabled) + _log.Info("No initial contact were discovered. Will try again."); + + // discovery didn't find any contacts, retry discovery + Timers.StartSingleTimer(DiscoverTick.Instance, DiscoverTick.Instance, _discoveryRetryInterval); + return true; + } + + var contacts = resolved.Addresses.Select(address => { + var path = ResolvedTargetToReceptionistActorPath(address); + return new Contact(path, Context.ActorSelection(path)); + }).ToImmutableHashSet(); + + if(_verboseLogging && _log.IsDebugEnabled) + _log.Debug("Initial contacts are discovered at [{0}], verifying existence.", string.Join(", ", contacts.Select(c => c.Path))); + + VerifyContacts().PipeTo(Self, Self); + + return true; + + async Task VerifyContacts() + { + var tasks = contacts.Select(c => ResolveContact(c, TimeSpan.FromSeconds(1), default)); + return await Task.WhenAll(tasks); + } + } + + case ResolveResult[] resolved: + { + var contacts = resolved.Where(r => r.Subject is not null).Select(r => r.Contact).ToArray(); + if (contacts.Length == 0) + { + if(_verboseLogging && _log.IsInfoEnabled) + _log.Info("Cluster client contact point resolution phase failed, will try again."); + + Timers.StartSingleTimer(DiscoverTick.Instance, DiscoverTick.Instance, _discoveryRetryInterval); + } + else + { + if(_log.IsInfoEnabled) + _log.Info("Cluster client initial contacts are verified at [{0}], starting cluster client actor.", string.Join(", ", contacts.Select(c => c.Path))); + + Become(Active(contacts)); + } + + return true; + } + + case DiscoveryFailure fail: + if(_verboseLogging && _log.IsInfoEnabled) + _log.Info(fail.Cause, "Cluster client contact point service discovery phase failed, will try again."); + + Timers.StartSingleTimer(DiscoverTick.Instance, DiscoverTick.Instance, _discoveryRetryInterval); + return true; + + default: + Stash.Stash(); + return true; + } + } + + private Receive Active(Contact[] contacts) + { + if(_verboseLogging && _log.IsDebugEnabled) + _log.Debug("Entering active state"); + + Timers.CancelAll(); + + // Setup cluster client initial contacts + var currentSettings = _settings.WithInitialContacts(contacts.Select(c => c.Path).ToImmutableHashSet()); + + var clusterClient = Context.System.ActorOf(Props.Create(() => new ClusterClient(currentSettings)).WithDeploy(Deploy.Local)); + Context.Watch(clusterClient); + Stash.UnstashAll(); + + return message => + { + switch (message) + { + case Terminated terminated: + if (terminated.ActorRef.Equals(clusterClient)) + { + if(_verboseLogging && _log.IsInfoEnabled) + _log.Info("Cluster client failed to reconnect to all receptionists, rediscovering."); + + // Kickoff discovery lookup + Self.Tell(DiscoverTick.Instance); + Become(Discovering); + } + else + { + clusterClient.Forward(message); + } + break; + + default: + clusterClient.Forward(message); + break; + } + + return true; + }; + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscoverySettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscoverySettings.cs new file mode 100644 index 00000000000..d26ccad9cb4 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscoverySettings.cs @@ -0,0 +1,41 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using Akka.Configuration; + +namespace Akka.Cluster.Tools.Client; + +#nullable enable +public sealed record ClusterClientDiscoverySettings( + string? DiscoveryMethod, + string? ActorSystemName, + string? ServiceName, + string ReceptionistName, + string? PortName, + TimeSpan DiscoveryRetryInterval, + TimeSpan DiscoveryTimeout) +{ + public static readonly ClusterClientDiscoverySettings Empty = new ("", null, null, "receptionist", null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(60)); + + public static ClusterClientDiscoverySettings Create(Config clusterClientConfig) + { + var config = clusterClientConfig.GetConfig("discovery"); + if (config is null) + return Empty; + + return new ClusterClientDiscoverySettings( + config.GetString("method"), + config.GetString("actor-system-name"), + config.GetString("service-name"), + config.GetString("receptionist-name", "receptionist"), + config.GetString("port-name"), + config.GetTimeSpan("discovery-retry-interval", TimeSpan.FromSeconds(1)), + config.GetTimeSpan("discovery-timeout", TimeSpan.FromSeconds(60)) + ); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs index 6e74648b760..c1d70896446 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs @@ -14,6 +14,7 @@ using Akka.Configuration; using Akka.Remote; +#nullable enable namespace Akka.Cluster.Tools.Client { /// @@ -56,7 +57,7 @@ public static ClusterClientSettings Create(Config config) var initialContacts = config.GetStringList("initial-contacts", new string[] { }).Select(ActorPath.Parse).ToImmutableSortedSet(); var useReconnect = config.GetString("reconnect-timeout", "").ToLowerInvariant(); - TimeSpan? reconnectTimeout = + var reconnectTimeout = useReconnect.Equals("off") || useReconnect.Equals("false") || useReconnect.Equals("no") ? @@ -70,7 +71,10 @@ public static ClusterClientSettings Create(Config config) config.GetTimeSpan("acceptable-heartbeat-pause"), config.GetInt("buffer-size"), config.GetBoolean("use-legacy-serialization"), - reconnectTimeout); + config.GetBoolean("use-initial-contacts-discovery"), + ClusterClientDiscoverySettings.Create(config), + reconnectTimeout, + config.GetBoolean("verbose-logging")); } /// @@ -122,6 +126,12 @@ public static ClusterClientSettings Create(Config config) /// public bool UseLegacySerialization { get; } + public bool UseInitialContactDiscovery { get; } + + public ClusterClientDiscoverySettings DiscoverySettings { get; } + + public bool VerboseLogging { get; } + /// /// TBD /// @@ -166,6 +176,7 @@ public ClusterClientSettings( /// TBD /// TBD /// TBD + [Obsolete("Use constructor with useInitialContactsDiscovery and discoverySettings argument instead. Since 1.5.25")] public ClusterClientSettings( IImmutableSet initialContacts, TimeSpan establishingGetContactsInterval, @@ -175,6 +186,46 @@ public ClusterClientSettings( int bufferSize, bool useLegacySerialization, TimeSpan? reconnectTimeout = null) + : this( + initialContacts: initialContacts, + establishingGetContactsInterval: establishingGetContactsInterval, + refreshContactsInterval: refreshContactsInterval, + heartbeatInterval: heartbeatInterval, + acceptableHeartbeatPause: acceptableHeartbeatPause, + bufferSize: bufferSize, + useLegacySerialization: useLegacySerialization, + useInitialContactsDiscovery: false, + discoverySettings: null, + reconnectTimeout: reconnectTimeout) + { + } + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public ClusterClientSettings( + IImmutableSet initialContacts, + TimeSpan establishingGetContactsInterval, + TimeSpan refreshContactsInterval, + TimeSpan heartbeatInterval, + TimeSpan acceptableHeartbeatPause, + int bufferSize, + bool useLegacySerialization, + bool useInitialContactsDiscovery, + ClusterClientDiscoverySettings? discoverySettings = null, + TimeSpan? reconnectTimeout = null, + bool verboseLogging = false) { if (bufferSize is < 0 or > 10000) { @@ -189,6 +240,9 @@ public ClusterClientSettings( BufferSize = bufferSize; ReconnectTimeout = reconnectTimeout; UseLegacySerialization = useLegacySerialization; + UseInitialContactDiscovery = useInitialContactsDiscovery; + DiscoverySettings = discoverySettings ?? ClusterClientDiscoverySettings.Empty; + VerboseLogging = verboseLogging; } /// @@ -260,14 +314,21 @@ public ClusterClientSettings WithReconnectTimeout(TimeSpan? reconnectTimeout) public ClusterClientSettings WithUseLegacySerialization(bool useLegacySerialization) => Copy(useLegacySerialization: useLegacySerialization); + public ClusterClientSettings WithInitialContactsDiscovery( + bool useInitialContactsDiscovery, + ClusterClientDiscoverySettings? discoverySettings = null) + => Copy(useInitialContactsDiscovery: useInitialContactsDiscovery, discoverySettings: discoverySettings); + private ClusterClientSettings Copy( - IImmutableSet initialContacts = null, + IImmutableSet? initialContacts = null, TimeSpan? establishingGetContactsInterval = null, TimeSpan? refreshContactsInterval = null, TimeSpan? heartbeatInterval = null, TimeSpan? acceptableHeartbeatPause = null, int? bufferSize = null, bool? useLegacySerialization = null, + bool? useInitialContactsDiscovery = null, + ClusterClientDiscoverySettings? discoverySettings = null, TimeSpan? reconnectTimeout = null) { return new ClusterClientSettings( @@ -278,6 +339,8 @@ private ClusterClientSettings Copy( acceptableHeartbeatPause ?? AcceptableHeartbeatPause, bufferSize ?? BufferSize, useLegacySerialization ?? UseLegacySerialization, + useInitialContactsDiscovery ?? UseInitialContactDiscovery, + discoverySettings ?? DiscoverySettings, reconnectTimeout ?? ReconnectTimeout); } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf index bf6c91489fe..33af33d46db 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf @@ -94,6 +94,21 @@ akka.cluster.client { # Turning this setting to off or false will cause the ClusterClient messages # to be serialized using the ClusterClientMessageSerializer and not the default Object serializer. use-legacy-serialization = on + + use-initial-contacts-discovery = false + + discovery + { + method = + actor-system-name = null + receptionist-name = receptionist + service-name = null + port-name = null + discovery-retry-interval = 1s + discovery-timeout = 60s + } + + verbose-logging = false } # //#cluster-client-config diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt index e8b13f02aac..bfbf5aad29c 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt @@ -41,6 +41,31 @@ namespace Akka.Cluster.Tools.Client public override int GetHashCode() { } } } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public class ClusterClientDiscovery : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + { + public ClusterClientDiscovery(Akka.Cluster.Tools.Client.ClusterClientSettings settings) { } + public Akka.Actor.IStash Stash { get; set; } + public Akka.Actor.ITimerScheduler Timers { get; set; } + protected override void OnReceive(object message) { } + protected override void PreStart() { } + } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public sealed class ClusterClientDiscoverySettings : System.IEquatable + { + [System.Runtime.CompilerServices.NullableAttribute(1)] + public static readonly Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Empty; + public ClusterClientDiscoverySettings(string DiscoveryMethod, string ActorSystemName, string ServiceName, [System.Runtime.CompilerServices.NullableAttribute(1)] string ReceptionistName, string PortName, System.TimeSpan DiscoveryRetryInterval, System.TimeSpan DiscoveryTimeout) { } + public string ActorSystemName { get; set; } + public string DiscoveryMethod { get; set; } + public System.TimeSpan DiscoveryRetryInterval { get; set; } + public System.TimeSpan DiscoveryTimeout { get; set; } + public string PortName { get; set; } + [System.Runtime.CompilerServices.NullableAttribute(1)] + public string ReceptionistName { get; set; } + public string ServiceName { get; set; } + public static Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Create(Akka.Configuration.Config clusterClientConfig) { } + } public sealed class ClusterClientReceptionist : Akka.Actor.IExtension { public ClusterClientReceptionist(Akka.Actor.ExtendedActorSystem system) { } @@ -58,25 +83,33 @@ namespace Akka.Cluster.Tools.Client public ClusterClientReceptionistExtensionProvider() { } public override Akka.Cluster.Tools.Client.ClusterClientReceptionist CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ClusterClientSettings : Akka.Actor.INoSerializationVerificationNeeded { [System.ObsoleteAttribute("Use constructor with useLegacySerialization argument instead. Since 1.5.15")] public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, System.Nullable reconnectTimeout = null) { } + [System.ObsoleteAttribute("Use constructor with useInitialContactsDiscovery and discoverySettings argument i" + + "nstead. Since 1.5.25")] public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, bool useLegacySerialization, System.Nullable reconnectTimeout = null) { } + public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, bool useLegacySerialization, bool useInitialContactsDiscovery, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings discoverySettings = null, System.Nullable reconnectTimeout = null, bool verboseLogging = False) { } public System.TimeSpan AcceptableHeartbeatPause { get; } public int BufferSize { get; } + public Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings DiscoverySettings { get; } public System.TimeSpan EstablishingGetContactsInterval { get; } public System.TimeSpan HeartbeatInterval { get; } public System.Collections.Immutable.IImmutableSet InitialContacts { get; } public System.Nullable ReconnectTimeout { get; } public System.TimeSpan RefreshContactsInterval { get; } + public bool UseInitialContactDiscovery { get; } public bool UseLegacySerialization { get; } + public bool VerboseLogging { get; } public static Akka.Cluster.Tools.Client.ClusterClientSettings Create(Akka.Actor.ActorSystem system) { } public static Akka.Cluster.Tools.Client.ClusterClientSettings Create(Akka.Configuration.Config config) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithBufferSize(int bufferSize) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithEstablishingGetContactsInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithHeartbeatInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithInitialContacts(System.Collections.Immutable.IImmutableSet initialContacts) { } + public Akka.Cluster.Tools.Client.ClusterClientSettings WithInitialContactsDiscovery(bool useInitialContactsDiscovery, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings discoverySettings = null) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithReconnectTimeout(System.Nullable reconnectTimeout) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithRefreshContactsInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithUseLegacySerialization(bool useLegacySerialization) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt index a25e701e6aa..284c0d053f8 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt @@ -41,6 +41,31 @@ namespace Akka.Cluster.Tools.Client public override int GetHashCode() { } } } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public class ClusterClientDiscovery : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + { + public ClusterClientDiscovery(Akka.Cluster.Tools.Client.ClusterClientSettings settings) { } + public Akka.Actor.IStash Stash { get; set; } + public Akka.Actor.ITimerScheduler Timers { get; set; } + protected override void OnReceive(object message) { } + protected override void PreStart() { } + } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public sealed class ClusterClientDiscoverySettings : System.IEquatable + { + [System.Runtime.CompilerServices.NullableAttribute(1)] + public static readonly Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Empty; + public ClusterClientDiscoverySettings(string DiscoveryMethod, string ActorSystemName, string ServiceName, [System.Runtime.CompilerServices.NullableAttribute(1)] string ReceptionistName, string PortName, System.TimeSpan DiscoveryRetryInterval, System.TimeSpan DiscoveryTimeout) { } + public string ActorSystemName { get; set; } + public string DiscoveryMethod { get; set; } + public System.TimeSpan DiscoveryRetryInterval { get; set; } + public System.TimeSpan DiscoveryTimeout { get; set; } + public string PortName { get; set; } + [System.Runtime.CompilerServices.NullableAttribute(1)] + public string ReceptionistName { get; set; } + public string ServiceName { get; set; } + public static Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Create(Akka.Configuration.Config clusterClientConfig) { } + } public sealed class ClusterClientReceptionist : Akka.Actor.IExtension { public ClusterClientReceptionist(Akka.Actor.ExtendedActorSystem system) { } @@ -58,25 +83,33 @@ namespace Akka.Cluster.Tools.Client public ClusterClientReceptionistExtensionProvider() { } public override Akka.Cluster.Tools.Client.ClusterClientReceptionist CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ClusterClientSettings : Akka.Actor.INoSerializationVerificationNeeded { [System.ObsoleteAttribute("Use constructor with useLegacySerialization argument instead. Since 1.5.15")] public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, System.Nullable reconnectTimeout = null) { } + [System.ObsoleteAttribute("Use constructor with useInitialContactsDiscovery and discoverySettings argument i" + + "nstead. Since 1.5.25")] public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, bool useLegacySerialization, System.Nullable reconnectTimeout = null) { } + public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, bool useLegacySerialization, bool useInitialContactsDiscovery, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings discoverySettings = null, System.Nullable reconnectTimeout = null, bool verboseLogging = False) { } public System.TimeSpan AcceptableHeartbeatPause { get; } public int BufferSize { get; } + public Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings DiscoverySettings { get; } public System.TimeSpan EstablishingGetContactsInterval { get; } public System.TimeSpan HeartbeatInterval { get; } public System.Collections.Immutable.IImmutableSet InitialContacts { get; } public System.Nullable ReconnectTimeout { get; } public System.TimeSpan RefreshContactsInterval { get; } + public bool UseInitialContactDiscovery { get; } public bool UseLegacySerialization { get; } + public bool VerboseLogging { get; } public static Akka.Cluster.Tools.Client.ClusterClientSettings Create(Akka.Actor.ActorSystem system) { } public static Akka.Cluster.Tools.Client.ClusterClientSettings Create(Akka.Configuration.Config config) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithBufferSize(int bufferSize) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithEstablishingGetContactsInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithHeartbeatInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithInitialContacts(System.Collections.Immutable.IImmutableSet initialContacts) { } + public Akka.Cluster.Tools.Client.ClusterClientSettings WithInitialContactsDiscovery(bool useInitialContactsDiscovery, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings discoverySettings = null) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithReconnectTimeout(System.Nullable reconnectTimeout) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithRefreshContactsInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithUseLegacySerialization(bool useLegacySerialization) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt index db85958f822..c9fc1b4eaa4 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt @@ -22,6 +22,10 @@ namespace Akka.Discovery.Config { public ConfigServiceDiscovery(Akka.Actor.ExtendedActorSystem system) { } public override System.Threading.Tasks.Task Lookup(Akka.Discovery.Lookup lookup, System.TimeSpan resolveTimeout) { } + [Akka.Annotations.InternalStableApiAttribute()] + public bool TryAddEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } + [Akka.Annotations.InternalStableApiAttribute()] + public bool TryRemoveEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } } [Akka.Annotations.InternalApiAttribute()] public class static ConfigServicesParser diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt index 2811d821db1..830caa28560 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt @@ -22,6 +22,10 @@ namespace Akka.Discovery.Config { public ConfigServiceDiscovery(Akka.Actor.ExtendedActorSystem system) { } public override System.Threading.Tasks.Task Lookup(Akka.Discovery.Lookup lookup, System.TimeSpan resolveTimeout) { } + [Akka.Annotations.InternalStableApiAttribute()] + public bool TryAddEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } + [Akka.Annotations.InternalStableApiAttribute()] + public bool TryRemoveEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } } [Akka.Annotations.InternalApiAttribute()] public class static ConfigServicesParser diff --git a/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs b/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs index 86bba8d376c..22f3cf6e50f 100644 --- a/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs +++ b/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Threading.Tasks; using Akka.Actor; @@ -39,11 +40,12 @@ public static class ConfigServicesParser [InternalApi] public class ConfigServiceDiscovery : ServiceDiscovery { - private readonly Dictionary _resolvedServices; + private readonly ILoggingAdapter _log; + private ImmutableDictionary _resolvedServices; public ConfigServiceDiscovery(ExtendedActorSystem system) { - var log = Logging.GetLogger(system, nameof(ConfigServiceDiscovery)); + _log = Logging.GetLogger(system, nameof(ConfigServiceDiscovery)); var config = system.Settings.Config.GetConfig("akka.discovery.config") ?? throw new ArgumentException( @@ -52,35 +54,81 @@ public ConfigServiceDiscovery(ExtendedActorSystem system) var servicePath = config.GetString("services-path"); if (string.IsNullOrWhiteSpace(servicePath)) { - log.Warning( + _log.Warning( "The config path [akka.discovery.config] must contain field `service-path` that points to a " + "configuration path that contains an array of node services for Discovery to contact."); - _resolvedServices = new Dictionary(); + _resolvedServices = ImmutableDictionary.Empty; } else { var services = system.Settings.Config.GetConfig(servicePath); if (services == null) { - log.Warning( + _log.Warning( "You are trying to use config based discovery service and the settings path described in\n" + $"`akka.discovery.config.services-path` does not exists. Make sure that [{servicePath}] path \n" + "exists and to fill this setting with pre-defined node addresses to make sure that a cluster \n" + "can be formed"); - _resolvedServices = new Dictionary(); + _resolvedServices = ImmutableDictionary.Empty; } else { - _resolvedServices = ConfigServicesParser.Parse(services); + _resolvedServices = ConfigServicesParser.Parse(services).ToImmutableDictionary(); if(_resolvedServices.Count == 0) - log.Warning( + _log.Warning( $"You are trying to use config based discovery service and the settings path [{servicePath}]\n" + "described `akka.discovery.config.services-path` is empty. Make sure to fill this setting \n" + "with pre-defined node addresses to make sure that a cluster can be formed."); } } - log.Debug($"Config discovery serving: {string.Join(", ", _resolvedServices.Values)}"); + _log.Debug($"Config discovery serving: {string.Join(", ", _resolvedServices.Values)}"); + } + + [InternalStableApi] + public bool TryRemoveEndpoint(string serviceName, ResolvedTarget target) + { + if (!_resolvedServices.TryGetValue(serviceName, out var resolved)) + { + _log.Info($"Could not find service {serviceName}, adding a new service. Available services: {string.Join(", ", _resolvedServices.Keys)}"); + resolved = new Resolved(serviceName); + _resolvedServices = _resolvedServices.SetItem(serviceName, resolved); + } + + if (!resolved.Addresses.Contains(target)) + { + _log.Info($"ResolvedTarget was not in service {serviceName}, nothing to remove."); + return false; + } + + var newResolved = new Resolved(serviceName, resolved.Addresses.Remove(target)); + _resolvedServices = _resolvedServices.SetItem(serviceName, newResolved); + + _log.Debug($"ResolvedTarget {target} has been removed from service {serviceName}"); + return true; + } + + [InternalStableApi] + public bool TryAddEndpoint(string serviceName, ResolvedTarget target) + { + if (!_resolvedServices.TryGetValue(serviceName, out var resolved)) + { + _log.Info($"Could not find service {serviceName}, adding a new service. Available services: {string.Join(", ", _resolvedServices.Keys)}"); + resolved = new Resolved(serviceName); + _resolvedServices = _resolvedServices.SetItem(serviceName, resolved); + } + + if (resolved.Addresses.Contains(target)) + { + _log.Info($"ResolvedTarget is already in service {serviceName}, nothing to add."); + return false; + } + + var newResolved = new Resolved(serviceName, resolved.Addresses.Add(target)); + _resolvedServices = _resolvedServices.SetItem(serviceName, newResolved); + + _log.Debug($"ResolvedTarget {target} has been added to service {serviceName}"); + return true; } public override Task Lookup(Lookup lookup, TimeSpan resolveTimeout)