diff --git a/src/Orleans.Core/Networking/Connection.cs b/src/Orleans.Core/Networking/Connection.cs index ae37c06933..9892458d3c 100644 --- a/src/Orleans.Core/Networking/Connection.cs +++ b/src/Orleans.Core/Networking/Connection.cs @@ -353,7 +353,7 @@ private async Task ProcessOutgoing() Exception error = default; var serializer = this.shared.ServiceProvider.GetRequiredService(); - var messageStatisticsSink = this.shared.MessageStatisticsSink; + var messageObserver = this.shared.MessageStatisticsSink.GetMessageObserver(); try { var output = this._transport.Output; @@ -375,7 +375,7 @@ private async Task ProcessOutgoing() inflight.Add(message); var (headerLength, bodyLength) = serializer.Write(output, message); RecordMessageSend(message, headerLength + bodyLength, headerLength); - messageStatisticsSink.RecordMessage(message); + messageObserver?.Invoke(message); message = null; } } diff --git a/src/Orleans.Core/Placement/Repartitioning/IMessageStatisticsSink.cs b/src/Orleans.Core/Placement/Repartitioning/IMessageStatisticsSink.cs index 1973ea9fdf..7e39589632 100644 --- a/src/Orleans.Core/Placement/Repartitioning/IMessageStatisticsSink.cs +++ b/src/Orleans.Core/Placement/Repartitioning/IMessageStatisticsSink.cs @@ -1,13 +1,15 @@ +#nullable enable +using System; using Orleans.Runtime; namespace Orleans.Placement.Repartitioning; internal interface IMessageStatisticsSink { - void RecordMessage(Message message); + Action? GetMessageObserver(); } internal sealed class NoOpMessageStatisticsSink : IMessageStatisticsSink { - public void RecordMessage(Message message) { } + public Action? GetMessageObserver() => null; } \ No newline at end of file diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index 2985a61904..614eb90430 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -139,7 +139,7 @@ internal static void AddDefaultServices(ISiloBuilder builder) services.TryAddSingleton(); services.TryAddFromExisting(); services.TryAddSingleton(FactoryUtility.Create); - services.TryAddSingleton(sp => (IConnectedClientCollection)sp.GetRequiredService().Gateway ?? new EmptyConnectedClientCollection()); + services.TryAddSingleton(sp => sp.GetRequiredService().Gateway as IConnectedClientCollection ?? new EmptyConnectedClientCollection()); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddFromExisting(); diff --git a/src/Orleans.Runtime/Messaging/MessageCenter.cs b/src/Orleans.Runtime/Messaging/MessageCenter.cs index 24eb05e86e..ddf8539414 100644 --- a/src/Orleans.Runtime/Messaging/MessageCenter.cs +++ b/src/Orleans.Runtime/Messaging/MessageCenter.cs @@ -1,3 +1,4 @@ +#nullable enable using System; using System.Collections.Generic; using System.Diagnostics; @@ -22,12 +23,12 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable private readonly SiloMessagingOptions messagingOptions; private readonly PlacementService placementService; private readonly GrainLocator _grainLocator; - private readonly IMessageStatisticsSink _messageStatisticsSink; + private readonly Action? _messageObserver; private readonly ILogger log; private readonly Catalog catalog; private bool stopped; - private HostedClient hostedClient; - private Action sniffIncomingMessageHandler; + private HostedClient? hostedClient; + private Action? sniffIncomingMessageHandler; public MessageCenter( ILocalSiloDetails siloDetails, @@ -50,7 +51,7 @@ public MessageCenter( this.messagingTrace = messagingTrace; this.placementService = placementService; _grainLocator = grainLocator; - _messageStatisticsSink = messageStatisticsSink; + _messageObserver = messageStatisticsSink.GetMessageObserver(); this.log = logger; this.messageFactory = messageFactory; this._siloAddress = siloDetails.SiloAddress; @@ -61,11 +62,11 @@ public MessageCenter( } } - public Gateway Gateway { get; } + public Gateway? Gateway { get; } internal bool IsBlockingApplicationMessages { get; private set; } - public void SetHostedClient(HostedClient client) => this.hostedClient = client; + public void SetHostedClient(HostedClient? client) => this.hostedClient = client; public bool TryDeliverToProxy(Message msg) { @@ -73,7 +74,7 @@ public bool TryDeliverToProxy(Message msg) if (this.Gateway is Gateway gateway && gateway.TryDeliverToProxy(msg) || this.hostedClient is HostedClient client && client.TryDispatchToClient(msg)) { - _messageStatisticsSink.RecordMessage(msg); + _messageObserver?.Invoke(msg); return true; } @@ -125,7 +126,7 @@ public async Task StopAcceptingClientMessages() } } - public Action SniffIncomingMessage + public Action? SniffIncomingMessage { set { @@ -251,8 +252,8 @@ static async Task SendAsync(MessageCenter messageCenter, ValueTask c public void RejectMessage( Message message, Message.RejectionTypes rejectionType, - Exception exc, - string rejectInfo = null) + Exception? exc, + string? rejectInfo = null) { if (message.Direction == Message.Directions.Request || (message.Direction == Message.Directions.OneWay && message.HasCacheInvalidationHeader)) @@ -271,20 +272,20 @@ public void RejectMessage( internal void ProcessRequestsToInvalidActivation( List messages, - GrainAddress oldAddress, - SiloAddress forwardingAddress, - string failedOperation = null, - Exception exc = null, + GrainAddress? oldAddress, + SiloAddress? forwardingAddress, + string? failedOperation = null, + Exception? exc = null, bool rejectMessages = false) { if (rejectMessages) { - GrainAddress validAddress = forwardingAddress switch + GrainAddress? validAddress = forwardingAddress switch { null => null, _ => new() { - GrainId = oldAddress.GrainId, + GrainId = oldAddress?.GrainId ?? default, SiloAddress = forwardingAddress, } }; @@ -304,12 +305,12 @@ internal void ProcessRequestsToInvalidActivation( else { this.messagingTrace.OnDispatcherForwardingMultiple(messages.Count, oldAddress, forwardingAddress, failedOperation, exc); - GrainAddress destination = forwardingAddress switch + GrainAddress? destination = forwardingAddress switch { null => null, _ => new() { - GrainId = oldAddress.GrainId, + GrainId = oldAddress?.GrainId ?? default, SiloAddress = forwardingAddress, } }; @@ -323,10 +324,10 @@ internal void ProcessRequestsToInvalidActivation( private void ProcessRequestToInvalidActivation( Message message, - GrainAddress oldAddress, - SiloAddress forwardingAddress, + GrainAddress? oldAddress, + SiloAddress? forwardingAddress, string failedOperation, - Exception exc = null, + Exception? exc = null, bool rejectMessages = false) { Debug.Assert(!message.IsLocalOnly); @@ -344,12 +345,12 @@ private void ProcessRequestToInvalidActivation( } else { - GrainAddress destination = forwardingAddress switch + GrainAddress? destination = forwardingAddress switch { null => null, _ => new() { - GrainId = oldAddress.GrainId, + GrainId = oldAddress?.GrainId ?? default, SiloAddress = forwardingAddress, } }; @@ -357,7 +358,7 @@ private void ProcessRequestToInvalidActivation( } } - private void TryForwardRequest(Message message, GrainAddress oldAddress, GrainAddress destination, string failedOperation = null, Exception exc = null) + private void TryForwardRequest(Message message, GrainAddress? oldAddress, GrainAddress? destination, string? failedOperation = null, Exception? exc = null) { Debug.Assert(!message.IsLocalOnly); @@ -415,7 +416,7 @@ internal void RerouteMessage(Message message) ResendMessageImpl(message); } - private bool TryForwardMessage(Message message, SiloAddress forwardingAddress) + private bool TryForwardMessage(Message message, SiloAddress? forwardingAddress) { if (!MayForward(message, this.messagingOptions)) return false; @@ -425,7 +426,7 @@ private bool TryForwardMessage(Message message, SiloAddress forwardingAddress) return true; } - private void ResendMessageImpl(Message message, SiloAddress forwardingAddress = null) + private void ResendMessageImpl(Message message, SiloAddress? forwardingAddress = null) { if (log.IsEnabled(LogLevel.Debug)) log.LogDebug("Resend {Message}", message); @@ -543,7 +544,7 @@ public void ReceiveMessage(Message msg) } targetActivation.ReceiveMessage(msg); - _messageStatisticsSink.RecordMessage(msg); + _messageObserver?.Invoke(msg); } } catch (Exception ex) diff --git a/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs b/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs index ba7a1ac288..0eb254bf76 100644 --- a/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs +++ b/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs @@ -1,6 +1,7 @@ #nullable enable using System; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -109,7 +110,9 @@ private async Task ProcessPendingEdges(CancellationToken cancellationToken) } } - public void RecordMessage(Message message) + public Action? GetMessageObserver() => RecordMessage; + + private void RecordMessage(Message message) { if (!_enableMessageSampling || message.IsSystemMessage) { diff --git a/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs b/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs index e5fbcb3d94..c7c73af7b9 100644 --- a/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs +++ b/src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs @@ -1,5 +1,4 @@ #nullable enable -using Orleans; using Orleans.Metadata; using System; using System.Collections.Concurrent;