Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix NRE inside RemotingTerminator #4686

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,7 @@ private void InitFSM()
{
When(TerminatorState.Uninitialized, @event =>
{
var internals = @event.FsmEvent as Internals;
if (internals != null)
if (@event.FsmEvent is Internals internals)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's always lovely to see how new language features make old code pretty 👍

{
_systemGuardian.Tell(RegisterTerminationHook.Instance);
return GoTo(TerminatorState.Idle).Using(internals);
Expand Down
110 changes: 45 additions & 65 deletions src/core/Akka/Actor/FSM.cs
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,7 @@ public Timer(string name, object message, bool repeat, int generation, ActorBase
/// <param name="timeout">TBD</param>
public void Schedule(IActorRef actor, TimeSpan timeout)
{
object timerMsg;
if (Message is IAutoReceivedMessage)
timerMsg = Message;
else
timerMsg = this;
var timerMsg = Message is IAutoReceivedMessage ? Message : this;

_ref = Repeat
? _scheduler.ScheduleTellRepeatedlyCancelable(timeout, timeout, actor, timerMsg, Context.Self)
Expand Down Expand Up @@ -671,7 +667,7 @@ public override string ToString()
/// <typeparam name="TData">The state data type</typeparam>
public abstract class FSM<TState, TData> : FSMBase, IListeners, IInternalSupportsTestFSMRef<TState,TData>
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly ILoggingAdapter _log = Context.GetLoggerStartup();
IgorFedchenko marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Initializes a new instance of the FSM class.
Expand Down Expand Up @@ -813,8 +809,8 @@ public TransformHelper(StateFunction func)
/// <returns>TBD</returns>
public StateFunction Using(Func<State<TState, TData>, State<TState, TData>> andThen)
{
StateFunction continuedDelegate = @event => andThen.Invoke(Func.Invoke(@event));
return continuedDelegate;
State<TState, TData> ContinuedDelegate(Event<TData> @event) => andThen.Invoke(Func.Invoke(@event));
return ContinuedDelegate;
}
}

Expand Down Expand Up @@ -1083,14 +1079,14 @@ private void HandleTransition(TState previous, TState next)
/// <returns>A <see cref="StateFunction"/> which combines both the results of <paramref name="original"/> and <paramref name="fallback"/></returns>
private static StateFunction OrElse(StateFunction original, StateFunction fallback)
{
StateFunction chained = delegate(Event<TData> @event)
State<TState, TData> Chained(Event<TData> @event)
{
var originalResult = original.Invoke(@event);
if (originalResult == null) return fallback.Invoke(@event);
return originalResult;
};
}

return chained;
return Chained;
}

#endregion
Expand All @@ -1100,20 +1096,24 @@ private static StateFunction OrElse(StateFunction original, StateFunction fallba
/// <inheritdoc/>
protected override bool Receive(object message)
{
var timeoutMarker = message as TimeoutMarker;
if (timeoutMarker != null)
switch (message)
{
if (_generation == timeoutMarker.Generation)
case TimeoutMarker timeoutMarker:
{
ProcessMsg(StateTimeout.Instance, "state timeout");
if (_generation == timeoutMarker.Generation)
{
ProcessMsg(StateTimeout.Instance, "state timeout");
}
return true;
}
return true;
}

if (message is Timer timer)
{
if (ReferenceEquals(timer.Owner, this) && _timers.TryGetValue(timer.Name, out var oldTimer) && oldTimer.Generation == timer.Generation)
case Timer timer:
{
// if we don't own the timer, don't have a reference to it, or
// if it's an older reference - ignore it.
if (!ReferenceEquals(timer.Owner, this)
|| !_timers.TryGetValue(timer.Name, out var oldTimer)
|| oldTimer.Generation != timer.Generation)
return true;
if (_timeoutFuture != null)
{
_timeoutFuture.Cancel(false);
Expand All @@ -1125,43 +1125,27 @@ protected override bool Receive(object message)
_timers.Remove(timer.Name);
}
ProcessMsg(timer.Message, timer);
return true;
}
return true;
}

var subscribeTransitionCallBack = message as SubscribeTransitionCallBack;
if (subscribeTransitionCallBack != null)
{
Context.Watch(subscribeTransitionCallBack.ActorRef);
Listeners.Add(subscribeTransitionCallBack.ActorRef);
//send the current state back as a reference point
subscribeTransitionCallBack.ActorRef.Tell(new CurrentState<TState>(Self, _currentState.StateName));
return true;
}

var listen = message as Listen;
if (listen != null)
{
Context.Watch(listen.Listener);
Listeners.Add(listen.Listener);
listen.Listener.Tell(new CurrentState<TState>(Self, _currentState.StateName));
return true;
}

var unsubscribeTransitionCallBack = message as UnsubscribeTransitionCallBack;
if (unsubscribeTransitionCallBack != null)
{
Context.Unwatch(unsubscribeTransitionCallBack.ActorRef);
Listeners.Remove(unsubscribeTransitionCallBack.ActorRef);
return true;
}

var deafen = message as Deafen;
if (deafen != null)
{
Context.Unwatch(deafen.Listener);
Listeners.Remove(deafen.Listener);
return true;
case SubscribeTransitionCallBack subscribeTransitionCallBack:
Context.Watch(subscribeTransitionCallBack.ActorRef);
Listeners.Add(subscribeTransitionCallBack.ActorRef);
//send the current state back as a reference point
subscribeTransitionCallBack.ActorRef.Tell(new CurrentState<TState>(Self, _currentState.StateName));
return true;
case Listen listen:
Context.Watch(listen.Listener);
Listeners.Add(listen.Listener);
listen.Listener.Tell(new CurrentState<TState>(Self, _currentState.StateName));
return true;
case UnsubscribeTransitionCallBack unsubscribeTransitionCallBack:
Context.Unwatch(unsubscribeTransitionCallBack.ActorRef);
Listeners.Remove(unsubscribeTransitionCallBack.ActorRef);
return true;
case Deafen deafen:
Context.Unwatch(deafen.Listener);
Listeners.Remove(deafen.Listener);
return true;
}

if (_timeoutFuture != null)
Expand Down Expand Up @@ -1213,16 +1197,13 @@ private void ProcessEvent(Event<TData> fsmEvent, object source)

private string GetSourceString(object source)
{
var s = source as string;
if (s != null)
if (source is string s)
return s;

var timer = source as Timer;
if (timer != null)
if (source is Timer timer)
return "timer '" + timer.Name + "'";

var actorRef = source as IActorRef;
if (actorRef != null)
if (source is IActorRef actorRef)
return actorRef.ToString();

return "unknown";
Expand Down Expand Up @@ -1329,8 +1310,7 @@ protected override void PostStop()
/// <param name="reason">TBD</param>
protected virtual void LogTermination(Reason reason)
{
var failure = reason as Failure;
if (failure != null)
if (reason is Failure failure)
{
if (failure.Cause is Exception)
{
Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka/Event/Logging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,28 @@ public static string StringFor(this LogLevel logLevel)
}
}

/// <summary>
/// INTERNAL API.
///
/// Used by actors / infrastructure that are starting up around the same time as the RemoteTransport
/// is being booted, and therefore can cause problems similar to https://github.com/akkadotnet/akka.net/issues/4677 at startup.
/// </summary>
/// <param name="context">The context used to configure the logging adapter.</param>
/// <param name="logMessageFormatter">The formatter used to format log messages.</param>
/// <returns>The newly created logging adapter.</returns>
internal static ILoggingAdapter GetLoggerStartup(this IActorContext context, ILogMessageFormatter logMessageFormatter = null)
{
try
{
return context.GetLogger(logMessageFormatter);
}
catch // had a failure, don't want to propagate it. Just start the logger without remote context
{
var logSource = LogSource.Create(context);
return new BusLogging(context.System.EventStream, logSource.Source, logSource.Type, logMessageFormatter ?? new DefaultLogMessageFormatter());
}
}

/// <summary>
/// Creates a new logging adapter using the specified context's event stream.
/// </summary>
Expand Down