Skip to content

Commit

Permalink
Akka.Event: expose the EventStream on BusLogging for extensibilit…
Browse files Browse the repository at this point in the history
…y purposes (#7210)

* Akka.Event: expose the `EventStream` on `BusLogging` for extensibility purposes

close #7209

* fixed bug in `FusingActorMaterializerSpec`
  • Loading branch information
Aaronontheweb authored May 31, 2024
1 parent 1f0d09c commit 5956b82
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3295,10 +3295,13 @@ namespace Akka.Event
public sealed class BusLogging : Akka.Event.LoggingAdapterBase
{
public BusLogging(Akka.Event.LoggingBus bus, string logSource, System.Type logClass, Akka.Event.ILogMessageFormatter logMessageFormatter) { }
public Akka.Event.LoggingBus Bus { get; }
public override bool IsDebugEnabled { get; }
public override bool IsErrorEnabled { get; }
public override bool IsInfoEnabled { get; }
public override bool IsWarningEnabled { get; }
public System.Type LogClass { get; }
public string LogSource { get; }
protected override void NotifyLog(Akka.Event.LogLevel logLevel, object message, System.Exception cause = null) { }
}
public sealed class DeadLetter : Akka.Event.AllDeadLetters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3287,10 +3287,13 @@ namespace Akka.Event
public sealed class BusLogging : Akka.Event.LoggingAdapterBase
{
public BusLogging(Akka.Event.LoggingBus bus, string logSource, System.Type logClass, Akka.Event.ILogMessageFormatter logMessageFormatter) { }
public Akka.Event.LoggingBus Bus { get; }
public override bool IsDebugEnabled { get; }
public override bool IsErrorEnabled { get; }
public override bool IsInfoEnabled { get; }
public override bool IsWarningEnabled { get; }
public System.Type LogClass { get; }
public string LogSource { get; }
protected override void NotifyLog(Akka.Event.LogLevel logLevel, object message, System.Exception cause = null) { }
}
public sealed class DeadLetter : Akka.Event.AllDeadLetters
Expand Down
15 changes: 8 additions & 7 deletions src/core/Akka.Streams.Tests/FusingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ public async Task A_SubFusingActorMaterializer_must_work_with_asynchronous_bound
[Fact]
public async Task A_SubFusingActorMaterializer_must_use_multiple_actors_when_there_are_asynchronous_boundaries_in_the_subflows_manual ()
{
string RefFunc()
{
var bus = (BusLogging)GraphInterpreter.Current.Log;
return GetInstanceField(typeof(BusLogging), bus, "_logSource") as string;
}

var async = Flow.Create<int>().Select(x =>
{
TestActor.Tell(RefFunc());
Expand All @@ -84,6 +78,13 @@ string RefFunc()
var refs = await ReceiveNAsync(20).Distinct().ToListAsync();
// main flow + 10 sub-flows
refs.Count.Should().Be(11);
return;

string RefFunc()
{
var bus = (BusLogging)GraphInterpreter.Current.Log;
return bus.LogSource;
}
}

[Fact]
Expand All @@ -92,7 +93,7 @@ public async Task A_SubFusingActorMaterializer_must_use_multiple_actors_when_the
string RefFunc()
{
var bus = (BusLogging)GraphInterpreter.Current.Log;
return GetInstanceField(typeof(BusLogging), bus, "_logSource") as string;
return bus.LogSource;
}

var flow = Flow.Create<int>().Select(x =>
Expand Down
36 changes: 25 additions & 11 deletions src/core/Akka/Event/BusLogging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using Akka.Actor;

namespace Akka.Event
{
Expand All @@ -14,9 +15,22 @@ namespace Akka.Event
/// </summary>
public sealed class BusLogging : LoggingAdapterBase
{
private readonly LoggingBus _bus;
private readonly Type _logClass;
private readonly string _logSource;
/// <summary>
/// For convenience, this is the destination to which logs are written.
///
/// Typically powered by the <see cref="EventStream"/> on the <see cref="ActorSystem"/>.
/// </summary>
public LoggingBus Bus { get; }

/// <summary>
/// The type responsible for emitting these logs
/// </summary>
public Type LogClass { get; }

/// <summary>
/// The instance of the <see cref="LogClass"/> responsible for emitting these logs.
/// </summary>
public string LogSource { get; }

/// <summary>
/// Initializes a new instance of the <see cref="BusLogging" /> class.
Expand All @@ -28,9 +42,9 @@ public sealed class BusLogging : LoggingAdapterBase
public BusLogging(LoggingBus bus, string logSource, Type logClass, ILogMessageFormatter logMessageFormatter)
: base(logMessageFormatter)
{
_bus = bus;
_logSource = logSource;
_logClass = logClass;
Bus = bus;
LogSource = logSource;
LogClass = logClass;

IsErrorEnabled = bus.LogLevel <= LogLevel.ErrorLevel;
IsWarningEnabled = bus.LogLevel <= LogLevel.WarningLevel;
Expand Down Expand Up @@ -62,18 +76,18 @@ private LogEvent CreateLogEvent(LogLevel logLevel, object message, Exception cau
{
return logLevel switch
{
LogLevel.DebugLevel => new Debug(cause, _logSource, _logClass, message),
LogLevel.InfoLevel => new Info(cause, _logSource, _logClass, message),
LogLevel.WarningLevel => new Warning(cause, _logSource, _logClass, message),
LogLevel.ErrorLevel => new Error(cause, _logSource, _logClass, message),
LogLevel.DebugLevel => new Debug(cause, LogSource, LogClass, message),
LogLevel.InfoLevel => new Info(cause, LogSource, LogClass, message),
LogLevel.WarningLevel => new Warning(cause, LogSource, LogClass, message),
LogLevel.ErrorLevel => new Error(cause, LogSource, LogClass, message),
_ => throw new ArgumentOutOfRangeException(nameof(logLevel), logLevel, null)
};
}

protected override void NotifyLog(LogLevel logLevel, object message, Exception cause = null)
{
var logEvent = CreateLogEvent(logLevel, message, cause);
_bus.Publish(logEvent);
Bus.Publish(logEvent);
}
}
}

0 comments on commit 5956b82

Please sign in to comment.