From b6214ef81ab5e798ecf4e52e6b70ea9e602feb8c Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 15 Mar 2022 16:14:09 -0500 Subject: [PATCH] Fixed `IActorRef` leak inside `EventStream` (#5720) * reproduced #5717 Reproduced `IActorRef` leak inside the `EventStream` * cleaned up the `EventBusUnsubscriber` * close #5719 - cleaned up `EventStream` subscription management * added API approval For `Obsolete` attribute. * need to capture more data on why failures happen * harden bugfix5717specs --- .../CoreAPISpec.ApproveCore.approved.txt | 1 + src/core/Akka.Tests/Event/Bugfix5717Specs.cs | 66 ++++++++ src/core/Akka/Event/EventBusUnsubscriber.cs | 142 +++++------------- src/core/Akka/Event/EventStream.cs | 112 ++++++-------- 4 files changed, 158 insertions(+), 163 deletions(-) create mode 100644 src/core/Akka.Tests/Event/Bugfix5717Specs.cs diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 08805ed01dd..3e3a0702e48 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -3044,6 +3044,7 @@ namespace Akka.Event public class EventStream : Akka.Event.LoggingBus { public EventStream(bool debug) { } + [System.ObsoleteAttribute("Should be removed in 1.5")] public bool InitUnsubscriber(Akka.Actor.IActorRef unsubscriber, Akka.Actor.ActorSystem system) { } public void StartUnsubscriber(Akka.Actor.Internal.ActorSystemImpl system) { } public override bool Subscribe(Akka.Actor.IActorRef subscriber, System.Type channel) { } diff --git a/src/core/Akka.Tests/Event/Bugfix5717Specs.cs b/src/core/Akka.Tests/Event/Bugfix5717Specs.cs new file mode 100644 index 00000000000..ee2480f0c89 --- /dev/null +++ b/src/core/Akka.Tests/Event/Bugfix5717Specs.cs @@ -0,0 +1,66 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2022 Lightbend Inc. +// // Copyright (C) 2013-2022 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Configuration; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Tests.Event +{ + public class Bugfix5717Specs : AkkaSpec + { + public Bugfix5717Specs(ITestOutputHelper output) : base(Config.Empty, output){} + + /// + /// Reproduction for https://github.com/akkadotnet/akka.net/issues/5717 + /// + [Fact] + public async Task Should_unsubscribe_from_all_topics_on_Terminate() + { + var es = Sys.EventStream; + var tm1 = 1; + var tm2 = "FOO"; + var a1 = CreateTestProbe(); + var a2 = CreateTestProbe(); + + es.Subscribe(a1.Ref, typeof(int)); + es.Subscribe(a2.Ref, typeof(int)); + es.Subscribe(a2.Ref, typeof(string)); + es.Publish(tm1); + es.Publish(tm2); + a1.ExpectMsg(tm1); + a2.ExpectMsg(tm1); + a2.ExpectMsg(tm2); + + // kill second test probe + Watch(a2); + Sys.Stop(a2); + ExpectTerminated(a2); + + /* + * It's possible that the `Terminate` message may not have been processed by the + * Unsubscriber yet, so we want to try this operation more than once to see if it + * eventually executes the unsubscribe on the EventStream. + * + * If it still fails after multiple attempts, the issue is that the unsub was never + * executed in the first place. + */ + await AwaitAssertAsync(async () => + { + await EventFilter.DeadLetter().ExpectAsync(0, () => + { + es.Publish(tm1); + es.Publish(tm2); + a1.ExpectMsg(tm1); + }); + }, interval:TimeSpan.FromSeconds(250)); + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Event/EventBusUnsubscriber.cs b/src/core/Akka/Event/EventBusUnsubscriber.cs index 3bd9b3fd7e5..bf865c879bd 100644 --- a/src/core/Akka/Event/EventBusUnsubscriber.cs +++ b/src/core/Akka/Event/EventBusUnsubscriber.cs @@ -6,10 +6,7 @@ //----------------------------------------------------------------------- using Akka.Actor; -using Akka.Actor.Internal; using Akka.Annotations; -using Akka.Dispatch; -using Akka.Util.Internal; namespace Akka.Event { @@ -26,7 +23,7 @@ namespace Akka.Event /// watching a few actors too much - we opt for the 2nd choice here. /// [InternalApi] - class EventStreamUnsubscriber : ActorBase + internal class EventStreamUnsubscriber : ActorBase { private readonly EventStream _eventStream; private readonly bool _debug; @@ -45,140 +42,83 @@ public EventStreamUnsubscriber(EventStream eventStream, ActorSystem system, bool _debug = debug; } - - /// - /// TBD - /// - /// TBD - /// TBD + protected override bool Receive(object message) { - return message.Match().With(register => + switch (message) { - if (_debug) - _eventStream.Publish(new Debug(this.GetType().Name, GetType(), - string.Format("watching {0} in order to unsubscribe from EventStream when it terminates", register.Actor))); - Context.Watch(register.Actor); - }).With(unregister => - { - if (_debug) - _eventStream.Publish(new Debug(this.GetType().Name, GetType(), - string.Format("unwatching {0} since has no subscriptions", unregister.Actor))); - Context.Unwatch(unregister.Actor); - }).With(terminated => - { - if (_debug) - _eventStream.Publish(new Debug(this.GetType().Name, GetType(), - string.Format("unsubscribe {0} from {1}, because it was terminated", terminated.Actor , _eventStream ))); - _eventStream.Unsubscribe(terminated.Actor); - }) - .WasHandled; + case Register register: + { + if (_debug) + _eventStream.Publish(new Debug(GetType().Name, GetType(), + $"watching {register.Actor} in order to unsubscribe from EventStream when it terminates")); + Context.Watch(register.Actor); + break; + } + case UnregisterIfNoMoreSubscribedChannels unregister: + { + if (_debug) + _eventStream.Publish(new Debug(GetType().Name, GetType(), + $"unwatching {unregister.Actor} since has no subscriptions")); + Context.Unwatch(unregister.Actor); + break; + } + case Terminated terminated: + { + if (_debug) + _eventStream.Publish(new Debug(GetType().Name, GetType(), + $"unsubscribe {terminated.ActorRef} from {_eventStream}, because it was terminated")); + _eventStream.Unsubscribe(terminated.ActorRef); + break; + } + default: + return false; + } + + return true; } - /// - /// TBD - /// protected override void PreStart() { if (_debug) - _eventStream.Publish(new Debug(this.GetType().Name, GetType(), + _eventStream.Publish(new Debug(GetType().Name, GetType(), string.Format("registering unsubscriber with {0}", _eventStream))); - _eventStream.InitUnsubscriber(Self, _system); } /// - /// TBD + /// INTERNAL API + /// + /// Registers a new subscriber to be death-watched and automatically unsubscribed. /// internal class Register { - /// - /// TBD - /// - /// TBD public Register(IActorRef actor) { Actor = actor; } /// - /// TBD - /// - public IActorRef Actor { get; private set; } - } - - - /// - /// TBD - /// - internal class Terminated - { - /// - /// TBD - /// - /// TBD - public Terminated(IActorRef actor) - { - Actor = actor; - } - - /// - /// TBD + /// The actor we're going to deathwatch and automatically unsubscribe /// public IActorRef Actor { get; private set; } } /// - /// TBD + /// INTERNAL API + /// + /// Unsubscribes an actor that is no longer subscribed and does not need to be death-watched any longer. /// internal class UnregisterIfNoMoreSubscribedChannels { - /// - /// TBD - /// - /// TBD public UnregisterIfNoMoreSubscribedChannels(IActorRef actor) { Actor = actor; } /// - /// TBD + /// The actor we're no longer going to death watch. /// public IActorRef Actor { get; private set; } } } - - - - /// - /// Provides factory for Akka.Event.EventStreamUnsubscriber actors with unique names. - /// This is needed if someone spins up more EventStreams using the same ActorSystem, - /// each stream gets it's own unsubscriber. - /// - class EventStreamUnsubscribersProvider - { - private readonly AtomicCounter _unsubscribersCounter = new AtomicCounter(0); - private static readonly EventStreamUnsubscribersProvider _instance = new EventStreamUnsubscribersProvider(); - - - /// - /// TBD - /// - public static EventStreamUnsubscribersProvider Instance - { - get { return _instance; } - } - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - public void Start(ActorSystemImpl system, EventStream eventStream, bool debug) - { - system.SystemActorOf(Props.Create(eventStream, system, debug).WithDispatcher(Dispatchers.InternalDispatcherId), - string.Format("EventStreamUnsubscriber-{0}", _unsubscribersCounter.IncrementAndGet())); - } - } } diff --git a/src/core/Akka/Event/EventStream.cs b/src/core/Akka/Event/EventStream.cs index 13c37d21224..0c35364f12b 100644 --- a/src/core/Akka/Event/EventStream.cs +++ b/src/core/Akka/Event/EventStream.cs @@ -11,6 +11,7 @@ using System.Linq; using Akka.Actor; using Akka.Actor.Internal; +using Akka.Dispatch; using Akka.Util; using Akka.Util.Internal; @@ -29,8 +30,13 @@ public class EventStream : LoggingBus { private readonly bool _debug; - private readonly AtomicReference, IActorRef>> _initiallySubscribedOrUnsubscriber = - new AtomicReference, IActorRef>>(); + // used to uniquely name unsubscribers instances, should there be more than one ActorSystem / EventStream + private static readonly AtomicCounter UnsubscribersCounter = new AtomicCounter(0); + private readonly AtomicReference _unsubscriber = new AtomicReference(ActorRefs.NoSender); + + // in the event that an actor subscribers to the EventStream prior to ActorSystemImpl.Init is called + // we register them here and then move them all + private readonly ConcurrentSet _pendingUnsubscribers = new ConcurrentSet(); /// /// Initializes a new instance of the class. @@ -108,87 +114,69 @@ public override bool Unsubscribe(IActorRef subscriber) } /// - /// TBD + /// Used to start the Unsubscriber actor, responsible for garabage-collecting + /// all expired subscriptions when the subscribed actor terminates. /// /// TBD public void StartUnsubscriber(ActorSystemImpl system) { - EventStreamUnsubscribersProvider.Instance.Start(system, this, _debug); - } - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - public bool InitUnsubscriber(IActorRef unsubscriber, ActorSystem system) - { - if (system == null) - { - return false; - } - return _initiallySubscribedOrUnsubscriber.Match().With>>(v => + if (_unsubscriber.Value.IsNobody()) { - if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, Either.Right(unsubscriber))) + lock (this) { - if (_debug) - { - Publish(new Debug(SimpleName(this), GetType(), - string.Format("initialized unsubscriber to: {0} registering {1} initial subscribers with it", unsubscriber, v.Value.Count))); + // not started + var currentValue = _unsubscriber.Value; + var unsubscriber= system.SystemActorOf(Props.Create(this, system, _debug).WithDispatcher(Dispatchers.InternalDispatcherId), + $"EventStreamUnsubscriber-{UnsubscribersCounter.IncrementAndGet()}"); + if (_unsubscriber.CompareAndSet(currentValue, unsubscriber)) + { + // backfill all pending unsubscribers + foreach (var s in _pendingUnsubscribers) + { + unsubscriber.Tell(new EventStreamUnsubscriber.Register(s)); + } + _pendingUnsubscribers.Clear(); + } + else + { + // somehow, despite being locked, we managed to lose the compare and swap + if (_unsubscriber.Value.IsNobody()) + throw new IllegalActorStateException("EventStream is corrupted"); } - v.Value.ForEach(RegisterWithUnsubscriber); - - - } - else - { - InitUnsubscriber(unsubscriber, system); } + } + } - - }).With>(presentUnsubscriber => - { - if (_debug) - { - Publish(new Debug(SimpleName(this), GetType(), - string.Format("not using unsubscriber {0}, because already initialized with {1}", unsubscriber, presentUnsubscriber))); - - } - }).WasHandled; + [Obsolete("Should be removed in 1.5")] + public bool InitUnsubscriber(IActorRef unsubscriber, ActorSystem system) + { + StartUnsubscriber((ActorSystemImpl)system); + return true; } private void RegisterWithUnsubscriber(IActorRef subscriber) { - _initiallySubscribedOrUnsubscriber.Match().With>>(v => + if (_unsubscriber.Value.IsNobody()) { - if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, - Either.Left>(v.Value.Add(subscriber)))) - { - RegisterWithUnsubscriber(subscriber); - } - - }).With>(unsubscriber => + // pending + _pendingUnsubscribers.TryAdd(subscriber); + } + else { - unsubscriber.Value.Tell( new EventStreamUnsubscriber.Register(subscriber)); - }); + _unsubscriber.Value.Tell( new EventStreamUnsubscriber.Register(subscriber)); + } + } private void UnregisterIfNoMoreSubscribedChannels(IActorRef subscriber) { - _initiallySubscribedOrUnsubscriber.Match().With>>(v => + // not an important operation. If we fail to process this message due to a race condition, then the + // death watch subscription is a no-op anyway. + if (!_unsubscriber.Value.IsNobody()) { - if (_initiallySubscribedOrUnsubscriber.CompareAndSet(v, - Either.Left>(v.Value.Remove(subscriber)))) - { - UnregisterIfNoMoreSubscribedChannels(subscriber); - } - - }).With>(unsubscriber => - { - unsubscriber.Value.Tell(new EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber)); - }); + _unsubscriber.Value.Tell(new EventStreamUnsubscriber.UnregisterIfNoMoreSubscribedChannels(subscriber)); + } } } }