Skip to content

Commit

Permalink
Activation repartitioner: use null for no-op message observer to av…
Browse files Browse the repository at this point in the history
…oid interface call (#9056)
  • Loading branch information
ReubenBond authored Jul 10, 2024
1 parent a44d4a3 commit a85e337
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 34 deletions.
4 changes: 2 additions & 2 deletions src/Orleans.Core/Networking/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private async Task ProcessOutgoing()

Exception error = default;
var serializer = this.shared.ServiceProvider.GetRequiredService<MessageSerializer>();
var messageStatisticsSink = this.shared.MessageStatisticsSink;
var messageObserver = this.shared.MessageStatisticsSink.GetMessageObserver();
try
{
var output = this._transport.Output;
Expand All @@ -375,7 +375,7 @@ private async Task ProcessOutgoing()
inflight.Add(message);
var (headerLength, bodyLength) = serializer.Write(output, message);
RecordMessageSend(message, headerLength + bodyLength, headerLength);
messageStatisticsSink.RecordMessage(message);
messageObserver?.Invoke(message);
message = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#nullable enable
using System;
using Orleans.Runtime;

namespace Orleans.Placement.Repartitioning;

internal interface IMessageStatisticsSink
{
void RecordMessage(Message message);
Action<Message>? GetMessageObserver();
}

internal sealed class NoOpMessageStatisticsSink : IMessageStatisticsSink
{
public void RecordMessage(Message message) { }
public Action<Message>? GetMessageObserver() => null;
}
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ internal static void AddDefaultServices(ISiloBuilder builder)
services.TryAddSingleton<MessageCenter>();
services.TryAddFromExisting<IMessageCenter, MessageCenter>();
services.TryAddSingleton(FactoryUtility.Create<MessageCenter, Gateway>);
services.TryAddSingleton<IConnectedClientCollection>(sp => (IConnectedClientCollection)sp.GetRequiredService<MessageCenter>().Gateway ?? new EmptyConnectedClientCollection());
services.TryAddSingleton<IConnectedClientCollection>(sp => sp.GetRequiredService<MessageCenter>().Gateway as IConnectedClientCollection ?? new EmptyConnectedClientCollection());
services.TryAddSingleton<InternalGrainRuntime>();
services.TryAddSingleton<InsideRuntimeClient>();
services.TryAddFromExisting<IRuntimeClient, InsideRuntimeClient>();
Expand Down
55 changes: 28 additions & 27 deletions src/Orleans.Runtime/Messaging/MessageCenter.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -22,12 +23,12 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable
private readonly SiloMessagingOptions messagingOptions;
private readonly PlacementService placementService;
private readonly GrainLocator _grainLocator;
private readonly IMessageStatisticsSink _messageStatisticsSink;
private readonly Action<Message>? _messageObserver;
private readonly ILogger log;
private readonly Catalog catalog;
private bool stopped;
private HostedClient hostedClient;
private Action<Message> sniffIncomingMessageHandler;
private HostedClient? hostedClient;
private Action<Message>? sniffIncomingMessageHandler;

public MessageCenter(
ILocalSiloDetails siloDetails,
Expand All @@ -50,7 +51,7 @@ public MessageCenter(
this.messagingTrace = messagingTrace;
this.placementService = placementService;
_grainLocator = grainLocator;
_messageStatisticsSink = messageStatisticsSink;
_messageObserver = messageStatisticsSink.GetMessageObserver();
this.log = logger;
this.messageFactory = messageFactory;
this._siloAddress = siloDetails.SiloAddress;
Expand All @@ -61,19 +62,19 @@ public MessageCenter(
}
}

public Gateway Gateway { get; }
public Gateway? Gateway { get; }

internal bool IsBlockingApplicationMessages { get; private set; }

public void SetHostedClient(HostedClient client) => this.hostedClient = client;
public void SetHostedClient(HostedClient? client) => this.hostedClient = client;

public bool TryDeliverToProxy(Message msg)
{
if (!msg.TargetGrain.IsClient()) return false;
if (this.Gateway is Gateway gateway && gateway.TryDeliverToProxy(msg)
|| this.hostedClient is HostedClient client && client.TryDispatchToClient(msg))
{
_messageStatisticsSink.RecordMessage(msg);
_messageObserver?.Invoke(msg);
return true;
}

Expand Down Expand Up @@ -125,7 +126,7 @@ public async Task StopAcceptingClientMessages()
}
}

public Action<Message> SniffIncomingMessage
public Action<Message>? SniffIncomingMessage
{
set
{
Expand Down Expand Up @@ -251,8 +252,8 @@ static async Task SendAsync(MessageCenter messageCenter, ValueTask<Connection> c
public void RejectMessage(
Message message,
Message.RejectionTypes rejectionType,
Exception exc,
string rejectInfo = null)
Exception? exc,
string? rejectInfo = null)
{
if (message.Direction == Message.Directions.Request
|| (message.Direction == Message.Directions.OneWay && message.HasCacheInvalidationHeader))
Expand All @@ -271,20 +272,20 @@ public void RejectMessage(

internal void ProcessRequestsToInvalidActivation(
List<Message> messages,
GrainAddress oldAddress,
SiloAddress forwardingAddress,
string failedOperation = null,
Exception exc = null,
GrainAddress? oldAddress,
SiloAddress? forwardingAddress,
string? failedOperation = null,
Exception? exc = null,
bool rejectMessages = false)
{
if (rejectMessages)
{
GrainAddress validAddress = forwardingAddress switch
GrainAddress? validAddress = forwardingAddress switch
{
null => null,
_ => new()
{
GrainId = oldAddress.GrainId,
GrainId = oldAddress?.GrainId ?? default,
SiloAddress = forwardingAddress,
}
};
Expand All @@ -304,12 +305,12 @@ internal void ProcessRequestsToInvalidActivation(
else
{
this.messagingTrace.OnDispatcherForwardingMultiple(messages.Count, oldAddress, forwardingAddress, failedOperation, exc);
GrainAddress destination = forwardingAddress switch
GrainAddress? destination = forwardingAddress switch
{
null => null,
_ => new()
{
GrainId = oldAddress.GrainId,
GrainId = oldAddress?.GrainId ?? default,
SiloAddress = forwardingAddress,
}
};
Expand All @@ -323,10 +324,10 @@ internal void ProcessRequestsToInvalidActivation(

private void ProcessRequestToInvalidActivation(
Message message,
GrainAddress oldAddress,
SiloAddress forwardingAddress,
GrainAddress? oldAddress,
SiloAddress? forwardingAddress,
string failedOperation,
Exception exc = null,
Exception? exc = null,
bool rejectMessages = false)
{
Debug.Assert(!message.IsLocalOnly);
Expand All @@ -344,20 +345,20 @@ private void ProcessRequestToInvalidActivation(
}
else
{
GrainAddress destination = forwardingAddress switch
GrainAddress? destination = forwardingAddress switch
{
null => null,
_ => new()
{
GrainId = oldAddress.GrainId,
GrainId = oldAddress?.GrainId ?? default,
SiloAddress = forwardingAddress,
}
};
this.TryForwardRequest(message, oldAddress, destination, failedOperation, exc);
}
}

private void TryForwardRequest(Message message, GrainAddress oldAddress, GrainAddress destination, string failedOperation = null, Exception exc = null)
private void TryForwardRequest(Message message, GrainAddress? oldAddress, GrainAddress? destination, string? failedOperation = null, Exception? exc = null)
{
Debug.Assert(!message.IsLocalOnly);

Expand Down Expand Up @@ -415,7 +416,7 @@ internal void RerouteMessage(Message message)
ResendMessageImpl(message);
}

private bool TryForwardMessage(Message message, SiloAddress forwardingAddress)
private bool TryForwardMessage(Message message, SiloAddress? forwardingAddress)
{
if (!MayForward(message, this.messagingOptions)) return false;

Expand All @@ -425,7 +426,7 @@ private bool TryForwardMessage(Message message, SiloAddress forwardingAddress)
return true;
}

private void ResendMessageImpl(Message message, SiloAddress forwardingAddress = null)
private void ResendMessageImpl(Message message, SiloAddress? forwardingAddress = null)
{
if (log.IsEnabled(LogLevel.Debug)) log.LogDebug("Resend {Message}", message);

Expand Down Expand Up @@ -543,7 +544,7 @@ public void ReceiveMessage(Message msg)
}

targetActivation.ReceiveMessage(msg);
_messageStatisticsSink.RecordMessage(msg);
_messageObserver?.Invoke(msg);
}
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#nullable enable
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -109,7 +110,9 @@ private async Task ProcessPendingEdges(CancellationToken cancellationToken)
}
}

public void RecordMessage(Message message)
public Action<Message>? GetMessageObserver() => RecordMessage;

private void RecordMessage(Message message)
{
if (!_enableMessageSampling || message.IsSystemMessage)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#nullable enable
using Orleans;
using Orleans.Metadata;
using System;
using System.Collections.Concurrent;
Expand Down

0 comments on commit a85e337

Please sign in to comment.