Skip to content

Commit

Permalink
add simple actor telemetry (#6294)
Browse files Browse the repository at this point in the history
* added initial actor telemetry for #6293

* added basic telemetry tests for local actors

* added spec to validate that `RemoteActorRef` doesn't influence counters

* updated `SpawnActorBenchmarks` to include telemetry impact

* converted telemetry events into `sealed class`es with `internal` constructors

* removed `Reason`
  • Loading branch information
Aaronontheweb authored Dec 8, 2022
1 parent 3156272 commit 7f68c48
Show file tree
Hide file tree
Showing 9 changed files with 598 additions and 7 deletions.
16 changes: 14 additions & 2 deletions src/benchmark/Akka.Benchmarks/Actor/SpawnActorBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@ public class SpawnActorBenchmarks
{
[Params(100_000)]
public int ActorCount { get;set; }

[Params(true, false)]
public bool EnableTelemetry { get; set; }

private ActorSystem system;

[IterationSetup]
public void Setup()
{
system = ActorSystem.Create("system");
if(EnableTelemetry) // need to measure the impact of publishing actor start / stop events
system = ActorSystem.Create("system", "akka.actor.telemetry.enabled = true");
else
system = ActorSystem.Create("system");
}

[IterationCleanup]
Expand All @@ -38,7 +45,12 @@ public void Cleanup()
public async Task Actor_spawn()
{
var parent = system.ActorOf(Parent.Props);
await parent.Ask<TestDone>(new StartTest(ActorCount), TimeSpan.FromMinutes(2));

// spawn a bunch of actors
await parent.Ask<TestDone>(new StartTest(ActorCount), TimeSpan.FromMinutes(2)).ConfigureAwait(false);

// terminate the hierarchy
await parent.GracefulStop(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
}

#region actors
Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ namespace Akka.Actor
public static readonly Akka.Actor.IActorRef NoSender;
public static readonly Akka.Actor.Nobody Nobody;
}
public sealed class ActorRestarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout
{
public System.Type ActorType { get; }
public System.Exception Reason { get; }
public Akka.Actor.IActorRef Subject { get; }
}
public class ActorSelection : Akka.Actor.ICanTell
{
public ActorSelection() { }
Expand All @@ -340,13 +346,23 @@ namespace Akka.Actor
public Akka.Actor.ActorSelectionMessage Copy(object message = null, Akka.Actor.SelectionPathElement[] elements = null, System.Nullable<bool> wildCardFanOut = null) { }
public override string ToString() { }
}
public sealed class ActorStarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout
{
public System.Type ActorType { get; }
public Akka.Actor.IActorRef Subject { get; }
}
public class ActorStashPlugin : Akka.Actor.ActorProducerPluginBase
{
public ActorStashPlugin() { }
public override void AfterIncarnated(Akka.Actor.ActorBase actor, Akka.Actor.IActorContext context) { }
public override void BeforeIncarnated(Akka.Actor.ActorBase actor, Akka.Actor.IActorContext context) { }
public override bool CanBeAppliedTo(System.Type actorType) { }
}
public sealed class ActorStopped : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout
{
public System.Type ActorType { get; }
public Akka.Actor.IActorRef Subject { get; }
}
public abstract class ActorSystem : Akka.Actor.IActorRefFactory, System.IDisposable
{
protected ActorSystem() { }
Expand Down Expand Up @@ -988,6 +1004,11 @@ namespace Akka.Actor
{
Akka.Actor.IStash Stash { get; set; }
}
public interface IActorTelemetryEvent : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout
{
System.Type ActorType { get; }
Akka.Actor.IActorRef Subject { get; }
}
public interface IAdvancedScheduler : Akka.Actor.IActionScheduler, Akka.Actor.IRunnableScheduler { }
public interface IAutoReceivedMessage { }
public interface ICanTell
Expand Down Expand Up @@ -1682,6 +1703,7 @@ namespace Akka.Actor
public bool DebugRouterMisconfiguration { get; }
public bool DebugUnhandledMessage { get; }
public int DefaultVirtualNodesFactor { get; }
public bool EmitActorTelemetry { get; }
public bool FsmDebugEvent { get; }
public bool HasCluster { get; }
public string Home { get; }
Expand Down
150 changes: 150 additions & 0 deletions src/core/Akka.Remote.Tests/RemoteActorTelemetrySpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//-----------------------------------------------------------------------
// <copyright file="RemoteActorTelemetrySpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using Akka.Util.Internal;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Remote.Tests
{
public class RemoteActorTelemetrySpecs : AkkaSpec
{
// create HOCON configuraiton that enables telemetry and Akka.Remote
private static readonly string Config = @"
akka {
actor {
provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
telemetry.enabled = true
}
remote {
log-remote-lifecycle-events = on
dot-netty.tcp {
port = 0
hostname = localhost
}
}
}";

public RemoteActorTelemetrySpecs(ITestOutputHelper outputHelper) : base(Config, outputHelper)
{

}

private class TelemetrySubscriber : ReceiveActor
{
// keep track of integer counters for each event type
private int _actorCreated;
private int _actorStopped;
private int _actorRestarted;

// create a message type that will send the current values of all counters
public sealed class GetTelemetry
{
public int ActorCreated { get; }
public int ActorStopped { get; }
public int ActorRestarted { get; }

public GetTelemetry(int actorCreated, int actorStopped, int actorRestarted)
{
ActorCreated = actorCreated;
ActorStopped = actorStopped;
ActorRestarted = actorRestarted;
}
}

public class GetTelemetryRequest
{
// make singleton
public static readonly GetTelemetryRequest Instance = new GetTelemetryRequest();

private GetTelemetryRequest()
{
}
}

public TelemetrySubscriber()
{
// Receive each type of IActorTelemetryEvent
Receive<ActorStarted>(e => { _actorCreated++; });
Receive<ActorStopped>(e => { _actorStopped++; });
Receive<ActorRestarted>(e => { _actorRestarted++; });
// receive a request for current counter values and return a GetTelemetry result
Receive<GetTelemetryRequest>(e =>
Sender.Tell(new GetTelemetry(_actorCreated, _actorStopped, _actorRestarted)));
}

protected override void PreStart()
{
Context.System.EventStream.Subscribe(Self, typeof(IActorTelemetryEvent));
}
}

// create a unit test where a second ActorSystem connects to Sys and receives an IActorRef from Sys and subscribes to Telemetry events
[Fact]
public async Task RemoteActorRefs_should_not_produce_telemetry()
{
// create a second ActorSystem that connects to Sys
var system2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
try
{
// create a subscriber to receive telemetry events
var subscriber = system2.ActorOf(Props.Create<TelemetrySubscriber>());

// send a request for the current telemetry counters
var telemetry = await subscriber
.Ask<TelemetrySubscriber.GetTelemetry>(TelemetrySubscriber.GetTelemetryRequest.Instance);

// verify that the counters are all correct
Assert.Equal(0, telemetry.ActorCreated);
Assert.Equal(0, telemetry.ActorStopped);
Assert.Equal(0, telemetry.ActorRestarted);

// create an actor in Sys
var actor1 = Sys.ActorOf(BlackHoleActor.Props, "actor1");

// resolve the currently bound Akka.Remote address for Sys
var address = Sys.AsInstanceOf<ExtendedActorSystem>().Provider.DefaultAddress;

// create a RootActorPath for actor1 that uses the previous address value
var actor1Path = new RootActorPath(address) / "user" / "actor1";

// have system2 send a request to actor1 via Akka.Remote
var actor2 = await system2.ActorSelection(actor1Path).ResolveOne(RemainingOrDefault);

// send a request for the current telemetry counters
telemetry = await subscriber
.Ask<TelemetrySubscriber.GetTelemetry>(TelemetrySubscriber.GetTelemetryRequest.Instance);

// verify that created actors is greater than 1
var previouslyCreated = telemetry.ActorCreated;
Assert.True(previouslyCreated > 1); // should have had some /system actors started as well
Assert.Equal(0, telemetry.ActorStopped);
Assert.Equal(0, telemetry.ActorRestarted);

// stop the actor in Sys
Sys.Stop(actor1);

// send a request for the current telemetry counters
telemetry = await subscriber
.Ask<TelemetrySubscriber.GetTelemetry>(TelemetrySubscriber.GetTelemetryRequest.Instance);
// verify that the counters are all zero
Assert.Equal(previouslyCreated, telemetry.ActorCreated); // should not have changed
Assert.Equal(0, telemetry.ActorStopped);
Assert.Equal(0, telemetry.ActorRestarted);
}
finally
{
Shutdown(system2);
}
}
}
}
Loading

0 comments on commit 7f68c48

Please sign in to comment.