From 10b521e349456119c35721cd89b694bf1ab10c05 Mon Sep 17 00:00:00 2001 From: Paolo Possanzini Date: Thu, 11 Jul 2024 14:08:55 +0200 Subject: [PATCH 1/4] added logging during configurations --- Arbitrer/extensions/ArbitrerExtensions.cs | 124 +++++++++++++++++----- 1 file changed, 96 insertions(+), 28 deletions(-) diff --git a/Arbitrer/extensions/ArbitrerExtensions.cs b/Arbitrer/extensions/ArbitrerExtensions.cs index 0e17a61..4c45898 100644 --- a/Arbitrer/extensions/ArbitrerExtensions.cs +++ b/Arbitrer/extensions/ArbitrerExtensions.cs @@ -7,6 +7,7 @@ using MediatR; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Arbitrer @@ -57,8 +58,11 @@ public static IServiceCollection AddArbitrer(this ServiceCollection services, IE /// /// The existing arbitrer options. /// The assemblies to search for request handlers. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration /// The updated arbitrer options with the inferred local requests. - public static ArbitrerOptions InferLocalRequests(this ArbitrerOptions options, IEnumerable assemblies, string queuePrefix = null) + public static ArbitrerOptions InferLocalRequests(this ArbitrerOptions options, IEnumerable assemblies, string queuePrefix = null, + ILogger logger = null) { var localRequests = assemblies.SelectMany(a => a .GetTypes() @@ -66,7 +70,7 @@ public static ArbitrerOptions InferLocalRequests(this ArbitrerOptions options, I .Where(i => i.FullName != null && i.FullName.StartsWith("MediatR.IRequestHandler")) .Select(i => i.GetGenericArguments()[0]).ToArray() )); - options.SetAsLocalRequests(localRequests.ToArray, queuePrefix); + options.SetAsLocalRequests(localRequests.ToArray, queuePrefix, logger); return options; } @@ -76,10 +80,13 @@ public static ArbitrerOptions InferLocalRequests(this ArbitrerOptions options, I /// /// The object. /// The collection of objects to infer local notifications from. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration /// /// The object with inferred local notifications set. /// - public static ArbitrerOptions InferLocalNotifications(this ArbitrerOptions options, IEnumerable assemblies, string queuePrefix = null) + public static ArbitrerOptions InferLocalNotifications(this ArbitrerOptions options, IEnumerable assemblies, string queuePrefix = null, + ILogger logger = null) { var localNotifications = assemblies.SelectMany(a => a .GetTypes() @@ -88,7 +95,8 @@ public static ArbitrerOptions InferLocalNotifications(this ArbitrerOptions optio .Select(i => i.GetGenericArguments()[0]).ToArray() )); - options.SetAsLocalRequests(() => localNotifications, queuePrefix); + options.SetAsLocalRequests(() => localNotifications, queuePrefix, logger); + return options; } @@ -97,25 +105,39 @@ public static ArbitrerOptions InferLocalNotifications(this ArbitrerOptions optio /// /// The type of the request to set as local. /// The ArbitrerOptions object to modify. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration /// The modified ArbitrerOptions object. - public static ArbitrerOptions SetAsLocalRequest(this ArbitrerOptions options, string queuePrefix = null) where T : IBaseRequest + public static ArbitrerOptions SetAsLocalRequest(this ArbitrerOptions options, string queuePrefix = null, ILogger logger = null) where T : IBaseRequest { options.LocalRequests.Add(typeof(T)); - if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName)) - options.QueuePrefixes.Add(typeof(T).FullName, queuePrefix); + if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName!)) + { + options.QueuePrefixes.Add(typeof(T).FullName!, queuePrefix); + logger?.LogInformation($"Added prefix to request ${typeof(T).FullName}"); + } + return options; } /// - /// Listens for a notification and adds it to the local requests list in the ArbitrerOptions instance. The type of the notification to listen for. It must implement the INotification interface. The ArbitrerOptions instance to add the notification to. The updated ArbitrerOptions instance with the notification added to the local requests list. + /// Listens for a notification and adds it to the local requests list in the ArbitrerOptions instance. The type of the notification to listen for. It must implement the INotification interface. The ArbitrerOptions instance to add the notification to. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration + /// The updated ArbitrerOptions instance with the notification added to the local requests list. /// / - public static ArbitrerOptions ListenForNotification(this ArbitrerOptions options, string queuePrefix = null) where T : INotification + public static ArbitrerOptions ListenForNotification(this ArbitrerOptions options, string queuePrefix = null, ILogger logger = null) + where T : INotification { options.LocalRequests.Add(typeof(T)); - if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName)) - options.QueuePrefixes.Add(typeof(T).FullName, queuePrefix); + if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName!)) + { + options.QueuePrefixes.Add(typeof(T).FullName!, queuePrefix); + logger?.LogInformation($"Added prefix to request ${typeof(T).FullName}"); + } + return options; } @@ -124,13 +146,19 @@ public static ArbitrerOptions ListenForNotification(this ArbitrerOptions opti /// /// The type of the remote request. It must implement the interface. /// The instance to modify. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration /// The modified instance. - public static ArbitrerOptions SetAsRemoteRequest(this ArbitrerOptions options, string queuePrefix = null) where T : IBaseRequest + public static ArbitrerOptions SetAsRemoteRequest(this ArbitrerOptions options, string queuePrefix = null, ILogger logger = null) where T : IBaseRequest { options.RemoteRequests.Add(typeof(T)); - if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName)) - options.QueuePrefixes.Add(typeof(T).FullName, queuePrefix); + if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName!)) + { + options.QueuePrefixes.Add(typeof(T).FullName!, queuePrefix); + logger?.LogInformation($"Added prefix to request ${typeof(T).FullName}"); + } + return options; } @@ -140,8 +168,11 @@ public static ArbitrerOptions SetAsRemoteRequest(this ArbitrerOptions options /// /// The to modify. /// A function that selects the assemblies to retrieve types from. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration /// The modified . - public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, Func> assemblySelect, string queuePrefix = null) + public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, Func> assemblySelect, string queuePrefix = null, + ILogger logger = null) { var types = (from a in assemblySelect() from t in a.GetTypes() @@ -153,8 +184,11 @@ where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssign if (!string.IsNullOrWhiteSpace(queuePrefix)) foreach (var t in types) - if (!options.QueuePrefixes.ContainsKey(t.FullName)) + if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + { options.QueuePrefixes.Add(t.FullName, queuePrefix); + logger?.LogInformation($"Added prefix to request ${t.FullName}"); + } return options; } @@ -164,16 +198,22 @@ where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssign /// /// The object. /// A function that returns an enumerable collection of types to be set as local requests. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration /// The updated object. - public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, Func> typesSelect, string queuePrefix = null) + public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, Func> typesSelect, string queuePrefix = null, + ILogger logger = null) { foreach (var t in typesSelect()) options.LocalRequests.Add(t); if (!string.IsNullOrWhiteSpace(queuePrefix)) foreach (var t in typesSelect()) - if (!options.QueuePrefixes.ContainsKey(t.FullName)) + if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + { options.QueuePrefixes.Add(t.FullName, queuePrefix); + logger?.LogInformation($"Added prefix to request ${t.FullName}"); + } return options; } @@ -183,8 +223,11 @@ public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, F /// /// The to set as remote requests. /// The function to select the assemblies. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration /// The updated with remote requests set. - public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, Func> assemblySelect, string queuePrefix = null) + public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, Func> assemblySelect, string queuePrefix = null, + ILogger logger = null) { var types = (from a in assemblySelect() from t in a.GetTypes() @@ -195,8 +238,12 @@ where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssign if (!string.IsNullOrWhiteSpace(queuePrefix)) foreach (var t in types) - if (!options.QueuePrefixes.ContainsKey(t.FullName)) + if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + { options.QueuePrefixes.Add(t.FullName, queuePrefix); + logger?.LogInformation($"Added prefix to request ${t.FullName}"); + } + return options; } @@ -205,16 +252,23 @@ where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssign /// /// The ArbitrerOptions object. /// The function that returns IEnumerable of Type objects. + /// Prefix for Exchange and queues for requests + /// Logger instance to allow information during configuration /// The modified ArbitrerOptions object. - public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, Func> typesSelect, string queuePrefix = null) + public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, Func> typesSelect, string queuePrefix = null, + ILogger logger = null) { foreach (var t in typesSelect()) options.RemoteRequests.Add(t); if (!string.IsNullOrWhiteSpace(queuePrefix)) foreach (var t in typesSelect()) - if (!options.QueuePrefixes.ContainsKey(t.FullName)) + if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + { options.QueuePrefixes.Add(t.FullName, queuePrefix); + logger?.LogInformation($"Added prefix to request ${t.FullName}"); + } + return options; } @@ -223,13 +277,20 @@ public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, /// /// The ArbitrerOptions object. /// The function that returns IEnumerable of Type objects. + /// Prefix for Exchange and queues for notification + /// Logger instance to allow information during configuration /// The modified ArbitrerOptions object. - public static ArbitrerOptions SetNotificationPrefix(this ArbitrerOptions options, Func> typesSelect, string queuePrefix) + public static ArbitrerOptions SetNotificationPrefix(this ArbitrerOptions options, Func> typesSelect, string queuePrefix, + ILogger logger = null) { if (!string.IsNullOrWhiteSpace(queuePrefix)) foreach (var t in typesSelect().Where(t => typeof(INotification).IsAssignableFrom(t))) - if (!options.QueuePrefixes.ContainsKey(t.FullName)) + if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + { options.QueuePrefixes.Add(t.FullName, queuePrefix); + logger?.LogInformation($"Added prefix to notification ${t.FullName}"); + } + return options; } @@ -238,8 +299,11 @@ public static ArbitrerOptions SetNotificationPrefix(this ArbitrerOptions options /// /// The ArbitrerOptions object. /// The function to select the assemblies. + /// Prefix for Exchange and queues for notification + /// Logger instance to allow information during configuration /// The modified ArbitrerOptions object. - public static ArbitrerOptions SetNotificationPrefix(this ArbitrerOptions options, Func> assemblySelect, string queuePrefix) + public static ArbitrerOptions SetNotificationPrefix(this ArbitrerOptions options, Func> assemblySelect, string queuePrefix, + ILogger logger = null) { var types = (from a in assemblySelect() from t in a.GetTypes() @@ -247,8 +311,12 @@ where typeof(INotification).IsAssignableFrom(t) select t).AsEnumerable(); foreach (var t in types) - if (!options.QueuePrefixes.ContainsKey(t.FullName)) + if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + { options.QueuePrefixes.Add(t.FullName, queuePrefix); + logger?.LogInformation($"Added prefix to notification ${t.FullName}"); + } + return options; } @@ -267,7 +335,7 @@ public static string TypeQueueName(this Type t, ArbitrerOptions options, StringB } // var prefix = options.DefaultQueuePrefix; - options.QueuePrefixes.TryGetValue(t.FullName, out var prefix); + options.QueuePrefixes.TryGetValue(t.FullName!, out var prefix); prefix = prefix ?? options.DefaultQueuePrefix; sb = sb ?? new StringBuilder(); @@ -275,7 +343,7 @@ public static string TypeQueueName(this Type t, ArbitrerOptions options, StringB if (!string.IsNullOrWhiteSpace(prefix)) sb.Append($"{prefix}."); sb.Append($"{t.Namespace}.{t.Name}"); - if (t.GenericTypeArguments != null && t.GenericTypeArguments.Length > 0) + if (t.GenericTypeArguments is { Length: > 0 }) { sb.Append("["); foreach (var ta in t.GenericTypeArguments) From 5bd79de6a3b4f731cca892ab0b6f2ac4b4bcac4b Mon Sep 17 00:00:00 2001 From: Paolo Possanzini Date: Mon, 15 Jul 2024 09:43:11 +0200 Subject: [PATCH 2/4] typo fix --- Arbitrer.RabbitMQ/MessageDispatcherOptions.cs | 2 +- Arbitrer/Arbitrer.csproj | 1 - Arbitrer/extensions/ArbitrerExtensions.cs | 47 ++++++++++++------- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/Arbitrer.RabbitMQ/MessageDispatcherOptions.cs b/Arbitrer.RabbitMQ/MessageDispatcherOptions.cs index e55266b..b1215e1 100644 --- a/Arbitrer.RabbitMQ/MessageDispatcherOptions.cs +++ b/Arbitrer.RabbitMQ/MessageDispatcherOptions.cs @@ -60,7 +60,7 @@ public class MessageDispatcherOptions /// If Durable is set to true, it means the object is durable, otherwise, it is not. /// /// - /// Durability is a characteristic that specifies whether an object is able to withstand wear, decay, or damage over time. + /// Durability is a Capability that specifies whether an object is able to withstand wear, decay, or damage over time. /// Setting Durable to true indicates that the object is designed to be long-lasting and can resist various forms of deterioration. /// Conversely, setting Durable to false suggests that the object is not intended to have a long lifespan or may be susceptible to damage. /// diff --git a/Arbitrer/Arbitrer.csproj b/Arbitrer/Arbitrer.csproj index fdad447..b080a35 100644 --- a/Arbitrer/Arbitrer.csproj +++ b/Arbitrer/Arbitrer.csproj @@ -17,7 +17,6 @@ true Arbitrer netstandard2.0;net6.0 - https://github.com/teamdev-it/arbitrer diff --git a/Arbitrer/extensions/ArbitrerExtensions.cs b/Arbitrer/extensions/ArbitrerExtensions.cs index 4c45898..11c3f49 100644 --- a/Arbitrer/extensions/ArbitrerExtensions.cs +++ b/Arbitrer/extensions/ArbitrerExtensions.cs @@ -10,6 +10,8 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +// ReSharper disable AssignNullToNotNullAttribute + namespace Arbitrer { /// @@ -112,9 +114,9 @@ public static ArbitrerOptions SetAsLocalRequest(this ArbitrerOptions options, { options.LocalRequests.Add(typeof(T)); - if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName!)) + if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName)) { - options.QueuePrefixes.Add(typeof(T).FullName!, queuePrefix); + options.QueuePrefixes.Add(typeof(T).FullName, queuePrefix); logger?.LogInformation($"Added prefix to request ${typeof(T).FullName}"); } @@ -132,9 +134,9 @@ public static ArbitrerOptions ListenForNotification(this ArbitrerOptions opti { options.LocalRequests.Add(typeof(T)); - if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName!)) + if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName)) { - options.QueuePrefixes.Add(typeof(T).FullName!, queuePrefix); + options.QueuePrefixes.Add(typeof(T).FullName, queuePrefix); logger?.LogInformation($"Added prefix to request ${typeof(T).FullName}"); } @@ -153,9 +155,9 @@ public static ArbitrerOptions SetAsRemoteRequest(this ArbitrerOptions options { options.RemoteRequests.Add(typeof(T)); - if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName!)) + if (!string.IsNullOrWhiteSpace(queuePrefix) && !options.QueuePrefixes.ContainsKey(typeof(T).FullName)) { - options.QueuePrefixes.Add(typeof(T).FullName!, queuePrefix); + options.QueuePrefixes.Add(typeof(T).FullName, queuePrefix); logger?.LogInformation($"Added prefix to request ${typeof(T).FullName}"); } @@ -184,7 +186,7 @@ where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssign if (!string.IsNullOrWhiteSpace(queuePrefix)) foreach (var t in types) - if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + if (!options.QueuePrefixes.ContainsKey(t.FullName)) { options.QueuePrefixes.Add(t.FullName, queuePrefix); logger?.LogInformation($"Added prefix to request ${t.FullName}"); @@ -209,7 +211,7 @@ public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, F if (!string.IsNullOrWhiteSpace(queuePrefix)) foreach (var t in typesSelect()) - if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + if (!options.QueuePrefixes.ContainsKey(t.FullName)) { options.QueuePrefixes.Add(t.FullName, queuePrefix); logger?.LogInformation($"Added prefix to request ${t.FullName}"); @@ -238,7 +240,7 @@ where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssign if (!string.IsNullOrWhiteSpace(queuePrefix)) foreach (var t in types) - if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + if (!options.QueuePrefixes.ContainsKey(t.FullName)) { options.QueuePrefixes.Add(t.FullName, queuePrefix); logger?.LogInformation($"Added prefix to request ${t.FullName}"); @@ -258,12 +260,16 @@ where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssign public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, Func> typesSelect, string queuePrefix = null, ILogger logger = null) { - foreach (var t in typesSelect()) + var types = typesSelect(); + if (!types.Any()) + logger?.LogWarning("SetAsRemoteRequests : No Requests classes found in assemblies"); + + foreach (var t in types) options.RemoteRequests.Add(t); if (!string.IsNullOrWhiteSpace(queuePrefix)) - foreach (var t in typesSelect()) - if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + foreach (var t in types) + if (!options.QueuePrefixes.ContainsKey(t.FullName)) { options.QueuePrefixes.Add(t.FullName, queuePrefix); logger?.LogInformation($"Added prefix to request ${t.FullName}"); @@ -283,9 +289,13 @@ public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, public static ArbitrerOptions SetNotificationPrefix(this ArbitrerOptions options, Func> typesSelect, string queuePrefix, ILogger logger = null) { + var types = typesSelect().Where(t => typeof(INotification).IsAssignableFrom(t)); + if (!types.Any()) + logger?.LogWarning("SetNotificationPrefix : No Notification classes found in assemblies"); + if (!string.IsNullOrWhiteSpace(queuePrefix)) - foreach (var t in typesSelect().Where(t => typeof(INotification).IsAssignableFrom(t))) - if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + foreach (var t in types) + if (!options.QueuePrefixes.ContainsKey(t.FullName)) { options.QueuePrefixes.Add(t.FullName, queuePrefix); logger?.LogInformation($"Added prefix to notification ${t.FullName}"); @@ -310,8 +320,11 @@ from t in a.GetTypes() where typeof(INotification).IsAssignableFrom(t) select t).AsEnumerable(); + if (!types.Any()) + logger?.LogWarning("SetNotificationPrefix : No Notification classes found in assemblies"); + foreach (var t in types) - if (!options.QueuePrefixes.ContainsKey(t.FullName!)) + if (!options.QueuePrefixes.ContainsKey(t.FullName)) { options.QueuePrefixes.Add(t.FullName, queuePrefix); logger?.LogInformation($"Added prefix to notification ${t.FullName}"); @@ -335,7 +348,7 @@ public static string TypeQueueName(this Type t, ArbitrerOptions options, StringB } // var prefix = options.DefaultQueuePrefix; - options.QueuePrefixes.TryGetValue(t.FullName!, out var prefix); + options.QueuePrefixes.TryGetValue(t.FullName, out var prefix); prefix = prefix ?? options.DefaultQueuePrefix; sb = sb ?? new StringBuilder(); @@ -343,7 +356,7 @@ public static string TypeQueueName(this Type t, ArbitrerOptions options, StringB if (!string.IsNullOrWhiteSpace(prefix)) sb.Append($"{prefix}."); sb.Append($"{t.Namespace}.{t.Name}"); - if (t.GenericTypeArguments is { Length: > 0 }) + if (t.GenericTypeArguments != null && t.GenericTypeArguments.Length > 0) { sb.Append("["); foreach (var ta in t.GenericTypeArguments) From 6ba2376c7cdc53b814025c3525fd432feb584868 Mon Sep 17 00:00:00 2001 From: Paolo Possanzini Date: Mon, 15 Jul 2024 09:52:30 +0200 Subject: [PATCH 3/4] removed legacy project --- .../Arbitrer.9.Autofac.csproj | 37 ---- .../Arbitrer.9.Autofac/AutofacExtensions.cs | 32 --- .../Arbitrer.9.DependencyInjection.csproj | 31 --- .../DependecyInjectionExtension.cs | 21 -- .../Arbitrer.9.Kafka/Arbitrer.9.Kafka.csproj | 57 ----- .../Arbitrer.9.Kafka/MessageDispatcher.cs | 149 ------------- .../Arbitrer.9.RabbitMQ.Autofac.csproj | 46 ---- .../Extensions/Extensions.cs | 58 ----- .../MessageDispatcher.cs | 151 ------------- .../RequestsManager.cs | 209 ------------------ .../Arbitrer.9.RabbitMQ.csproj | 48 ---- .../Arbitrer.9.RabbitMQ/MessageDispatcher.cs | 150 ------------- Arbitrer.Legacy/Arbitrer.9/ArbitredMediatr.cs | 55 ----- Arbitrer.Legacy/Arbitrer.9/Arbitrer.9.csproj | 59 ----- Arbitrer.Legacy/Arbitrer.9/Arbitrer.cs | 77 ------- Arbitrer.Legacy/Arbitrer.9/IArbitrer.cs | 23 -- .../Arbitrer.9/IExternalMessageDispatcher.cs | 14 -- .../extensions/ArbitrerExtensions.cs | 157 ------------- .../Arbitrer.9/pipelines/ArbitrerPipeline.cs | 60 ----- arbitrer.sln | 46 ---- 20 files changed, 1480 deletions(-) delete mode 100644 Arbitrer.Legacy/Arbitrer.9.Autofac/Arbitrer.9.Autofac.csproj delete mode 100644 Arbitrer.Legacy/Arbitrer.9.Autofac/AutofacExtensions.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9.DependencyInjection/Arbitrer.9.DependencyInjection.csproj delete mode 100644 Arbitrer.Legacy/Arbitrer.9.DependencyInjection/DependecyInjectionExtension.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9.Kafka/Arbitrer.9.Kafka.csproj delete mode 100644 Arbitrer.Legacy/Arbitrer.9.Kafka/MessageDispatcher.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/Arbitrer.9.RabbitMQ.Autofac.csproj delete mode 100644 Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/Extensions/Extensions.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/MessageDispatcher.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/RequestsManager.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9.RabbitMQ/Arbitrer.9.RabbitMQ.csproj delete mode 100644 Arbitrer.Legacy/Arbitrer.9.RabbitMQ/MessageDispatcher.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9/ArbitredMediatr.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9/Arbitrer.9.csproj delete mode 100644 Arbitrer.Legacy/Arbitrer.9/Arbitrer.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9/IArbitrer.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9/IExternalMessageDispatcher.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9/extensions/ArbitrerExtensions.cs delete mode 100644 Arbitrer.Legacy/Arbitrer.9/pipelines/ArbitrerPipeline.cs diff --git a/Arbitrer.Legacy/Arbitrer.9.Autofac/Arbitrer.9.Autofac.csproj b/Arbitrer.Legacy/Arbitrer.9.Autofac/Arbitrer.9.Autofac.csproj deleted file mode 100644 index 3adaccd..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.Autofac/Arbitrer.9.Autofac.csproj +++ /dev/null @@ -1,37 +0,0 @@ - - - - Paolo Possanzini,Teamdev s.r.l. - RPC extensions for mediatr via pipeline - Copyright Paolo Possanzini - strict - mediator;request;response;queries;commands;notifications;rpc - - false - v - Apache-2.0 - true - true - snupkg - true - true - Arbitrer - netstandard2.0 - Arbitrer.Autofac - - https://github.com/teamdev-it/arbitrer - - - - - - - - - - - - - - diff --git a/Arbitrer.Legacy/Arbitrer.9.Autofac/AutofacExtensions.cs b/Arbitrer.Legacy/Arbitrer.9.Autofac/AutofacExtensions.cs deleted file mode 100644 index 3359ea1..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.Autofac/AutofacExtensions.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System; -using Autofac; -using MediatR; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace Arbitrer -{ - public static class AutofacExtensions - { - public static ContainerBuilder AddArbitrer(this ContainerBuilder builder, Action configure, ILoggerFactory loggerFactory) - { - if (configure != null) - { - var options = new ArbitrerOptions(); - configure(options); - var opt = Options.Create(options); - - builder.RegisterInstance(opt).SingleInstance(); - } - - builder.RegisterType().As().SingleInstance(); - builder.RegisterType().As(); - builder.RegisterGeneric(typeof(Pipelines.ArbitrerPipeline<,>)).As(typeof(IPipelineBehavior<,>)); - - builder.RegisterInstance(LoggerFactoryExtensions.CreateLogger(loggerFactory)).As>(); - builder.RegisterInstance(LoggerFactoryExtensions.CreateLogger(loggerFactory)).As>(); - - return builder; - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9.DependencyInjection/Arbitrer.9.DependencyInjection.csproj b/Arbitrer.Legacy/Arbitrer.9.DependencyInjection/Arbitrer.9.DependencyInjection.csproj deleted file mode 100644 index 0c67ba3..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.DependencyInjection/Arbitrer.9.DependencyInjection.csproj +++ /dev/null @@ -1,31 +0,0 @@ - - - - Paolo Possanzini,Teamdev s.r.l. - RPC extensions for mediatr via pipeline - Copyright Paolo Possanzini - strict - mediator;request;response;queries;commands;notifications;rpc - - false - v - Apache-2.0 - true - true - snupkg - true - true - Arbitrer - netstandard2.0 - Arbitrer.DependencyInjection - - https://github.com/teamdev-it/arbitrer - - - - - - - - diff --git a/Arbitrer.Legacy/Arbitrer.9.DependencyInjection/DependecyInjectionExtension.cs b/Arbitrer.Legacy/Arbitrer.9.DependencyInjection/DependecyInjectionExtension.cs deleted file mode 100644 index f1cfc94..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.DependencyInjection/DependecyInjectionExtension.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using Arbitrer; -using MediatR; -using Microsoft.Extensions.DependencyInjection; - -namespace Arbitrer -{ - public static class DependecyInjectionExtension - { - public static IServiceCollection AddArbitrer(this IServiceCollection services, Action configure = null) - { - if (configure != null) - services.Configure(configure); - services.AddScoped(typeof(IPipelineBehavior<,>), typeof(Pipelines.ArbitrerPipeline<,>)); - services.AddSingleton(); - - services.AddTransient(); - return services; - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9.Kafka/Arbitrer.9.Kafka.csproj b/Arbitrer.Legacy/Arbitrer.9.Kafka/Arbitrer.9.Kafka.csproj deleted file mode 100644 index 6ee869d..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.Kafka/Arbitrer.9.Kafka.csproj +++ /dev/null @@ -1,57 +0,0 @@ - - - - Paolo Possanzini, Alessandro Esposito, Teamdev s.r.l. - RPC extensions for mediatr via pipeline using Apache Kafka - Copyright Paolo Possanzini - netstandard2.0 - strict - mediator;request;response;queries;commands;notifications;rpc;kafka - - false - v - Apache-2.0 - true - true - snupkg - true - true - Arbitrer.Kafka - Arbitrer.Kafka - - https://github.com/teamdev-it/arbitrer - - - - - - - - - - - - - - consts.cs - - - Extensions\Extensions.cs - - - KafkaMessage.cs - - - MessageDispatcherOptions.cs - - - RequestsManager.cs - - - - - - - - diff --git a/Arbitrer.Legacy/Arbitrer.9.Kafka/MessageDispatcher.cs b/Arbitrer.Legacy/Arbitrer.9.Kafka/MessageDispatcher.cs deleted file mode 100644 index 31f480e..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.Kafka/MessageDispatcher.cs +++ /dev/null @@ -1,149 +0,0 @@ -using MediatR; -using Microsoft.Extensions.Options; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using System.Text; -using System.Collections.Concurrent; -using System.Diagnostics; -using Confluent.Kafka; -using admin = Confluent.Kafka.Admin; -using System.Collections.Generic; -using System.Threading.Tasks; -using System; -using Arbitrer.Messages; -using System.Threading; - -namespace Arbitrer.Kafka -{ - public class MessageDispatcher : IExternalMessageDispatcher, IDisposable - { - private Thread _consumerThread; - private readonly MessageDispatcherOptions _options; - private readonly ILogger _logger; - private readonly IServiceProvider _provider; - private readonly ArbitrerOptions _arbitrerOptions; - private IProducer _producer; - private IConsumer _consumer; - private string _replyTopicName; - private readonly ConcurrentDictionary> _callbackMapper = new ConcurrentDictionary>(); - - public MessageDispatcher(IOptions options, ILogger logger, IServiceProvider provider, - IOptions arbitrerOptions) - { - this._options = options.Value; - this._logger = logger; - _provider = provider; - _arbitrerOptions = arbitrerOptions.Value; - this.InitConnection(); - } - - - public void InitConnection() - { - _logger.LogInformation($"Creating Kafka Connection to '{_options.BootstrapServers}'..."); - - // Ensuring we have a connection object - - _replyTopicName = $"{Process.GetCurrentProcess().Id}.{DateTime.Now.Ticks}"; - var config = this._options.GetConsumerConfig(); - config.GroupId = _replyTopicName; - - _producer = new ProducerBuilder(this._options.GetProducerConfig()).Build(); - _consumer = new ConsumerBuilder(config).Build(); - - _provider.CreateTopicAsync(_options, _replyTopicName); - - _consumer.Subscribe(_replyTopicName); - _consumerThread = new Thread(() => - { - while (true) - { - var consumeResult = _consumer.Consume(); - if (consumeResult != null) - { - - _logger.LogDebug("Response Message: {Msg}", consumeResult.Message.Value); - var reply = JsonConvert.DeserializeObject(consumeResult.Message.Value, this._options.SerializerSettings); - - if (reply != null) - if (_callbackMapper.TryRemove(reply.CorrelationId, out var tcs)) - tcs?.TrySetResult(consumeResult.Message.Value); - } - } - } - ); - _consumerThread.IsBackground = true; - _consumerThread.Start(); - } - - public async Task> Dispatch(TRequest request, CancellationToken cancellationToken = default) - where TRequest: IRequest - { - var correlationId = Guid.NewGuid().ToString(); - var message = JsonConvert.SerializeObject(new KafkaMessage - { - Message = request, - CorrelationId = correlationId, - ReplyTo = _replyTopicName - }, _options.SerializerSettings); - - - var tcs = new TaskCompletionSource(); - _callbackMapper.TryAdd(correlationId, tcs); - - await _producer.ProduceAsync( - topic: typeof(TRequest).TypeQueueName(_arbitrerOptions), - message: new Message {Value = message}, cancellationToken); - - cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out var tmp)); - var result = await tcs.Task; - - var response = JsonConvert.DeserializeObject>>(result, this._options.SerializerSettings); - return response.Reply; - } - - public async Task Notify(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification - { - var message = JsonConvert.SerializeObject(request, _options.SerializerSettings); - - _logger.LogInformation($"Sending message to: {Consts.ArbitrerExchangeName}/{request.GetType().TypeQueueName(_arbitrerOptions)}"); - - await _producer.ProduceAsync( - topic: typeof(TRequest).TypeQueueName(_arbitrerOptions), - message: new Message {Value = message}, cancellationToken); - } - - public void Dispose() - { - try - { - } - finally - { - try - { - _producer.Dispose(); - } - catch - { - } - - try - { - DisposeConsumer(); - } - catch - { - } - } - } - - private void DisposeConsumer() - { - _provider.DeleteTopicAsync(this._options, this._replyTopicName); - _consumer.Unsubscribe(); - _consumer.Close(); - _consumer.Dispose(); - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/Arbitrer.9.RabbitMQ.Autofac.csproj b/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/Arbitrer.9.RabbitMQ.Autofac.csproj deleted file mode 100644 index dc3d0a5..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/Arbitrer.9.RabbitMQ.Autofac.csproj +++ /dev/null @@ -1,46 +0,0 @@ - - - Paolo Possanzini,Teamdev s.r.l. - RPC extensions for mediatr via pipeline using RabbitMQ - Copyright Paolo Possanzini - strict - mediator;request;response;queries;commands;notifications;rpc;rabbitmq - - false - v - Apache-2.0 - true - true - snupkg - true - true - Arbitrer.RabbitMQ.Autofac - netstandard2.0 - Arbitrer.RabbitMQ.Autofac - - https://github.com/teamdev-it/arbitrer - - - - - - - - - - - - consts.cs - - - Extensions\Extensions.cs - - - MessageDispatcherOptions.cs - - - - - - \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/Extensions/Extensions.cs b/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/Extensions/Extensions.cs deleted file mode 100644 index ed40590..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/Extensions/Extensions.cs +++ /dev/null @@ -1,58 +0,0 @@ -using System; -using System.Security.Cryptography; -using System.Text; -using Arbitrer.RabbitMQ; -using Autofac; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace Arbitrer -{ - public static class Extensions - { - public static ContainerBuilder AddArbitrerRabbitMQMessageDispatcher(this ContainerBuilder services, Action config, - ILoggerFactory loggerFactory) - { - var options = new MessageDispatcherOptions(); - config(options); - services.RegisterInstance(Options.Create(options)).SingleInstance(); - services.RegisterType().As().SingleInstance(); - - services.RegisterInstance(LoggerFactoryExtensions.CreateLogger(loggerFactory)).As>(); - services.RegisterInstance(LoggerFactoryExtensions.CreateLogger(loggerFactory)).As>(); - return services; - } - - public static ContainerBuilder ResolveArbitrerCalls(this ContainerBuilder services) - { - services.RegisterType().AsSelf().As().SingleInstance(); - return services; - } - - public static string GetHash(this string input, HashAlgorithm hashAlgorithm) - { - byte[] data = hashAlgorithm.ComputeHash(Encoding.UTF8.GetBytes(input)); - var sBuilder = new StringBuilder(); - for (int i = 0; i < data.Length; i++) - { - sBuilder.Append(data[i].ToString("x2")); - } - - return sBuilder.ToString(); - } - - public static string GetHash(this byte[] input, HashAlgorithm hashAlgorithm) - { - byte[] data = hashAlgorithm.ComputeHash(input); - var sBuilder = new StringBuilder(); - for (int i = 0; i < data.Length; i++) - { - sBuilder.Append(data[i].ToString("x2")); - } - - return sBuilder.ToString(); - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/MessageDispatcher.cs b/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/MessageDispatcher.cs deleted file mode 100644 index 1b90f53..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/MessageDispatcher.cs +++ /dev/null @@ -1,151 +0,0 @@ -using System.Threading.Tasks; -using MediatR; -using Microsoft.Extensions.Options; -using Microsoft.Extensions.Logging; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using Newtonsoft.Json; -using System.Threading; -using System.Text; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Security.Cryptography; -using Arbitrer.Messages; -using Newtonsoft.Json.Linq; -using Newtonsoft.Json.Serialization; - -namespace Arbitrer.RabbitMQ -{ - public class MessageDispatcher : IExternalMessageDispatcher, IDisposable - { - private readonly MessageDispatcherOptions options; - private readonly ILogger logger; - private readonly ArbitrerOptions _arbitrerOptions; - private IConnection _connection = null; - private IModel _sendChannel = null; - private string _replyQueueName = null; - private AsyncEventingBasicConsumer _sendConsumer = null; - private string _consumerId = null; - private readonly ConcurrentDictionary> _callbackMapper = new ConcurrentDictionary>(); - - - public MessageDispatcher(IOptions options, ILogger logger, IOptions arbitrerOptions) - { - this.options = options.Value; - this.logger = logger; - _arbitrerOptions = arbitrerOptions.Value; - - this.InitConnection(); - } - - private void InitConnection() - { - // Ensuring we have a connetion object - if (_connection == null) - { - logger.LogInformation($"Creating RabbitMQ Connection to '{options.HostName}'..."); - var factory = new ConnectionFactory - { - HostName = options.HostName, - UserName = options.UserName, - Password = options.Password, - VirtualHost = options.VirtualHost, - Port = options.Port, - DispatchConsumersAsync = true, - ClientProvidedName = options.ClientName - }; - - _connection = factory.CreateConnection(); - } - - _sendChannel = _connection.CreateModel(); - _sendChannel.ExchangeDeclare(Constants.ArbitrerExchangeName, ExchangeType.Topic); - // _channel.ConfirmSelect(); - - var queueName = $"{options.QueueName}.{Process.GetCurrentProcess().Id}.{DateTime.Now.Ticks}"; - _replyQueueName = _sendChannel.QueueDeclare(queue: queueName).QueueName; - _sendConsumer = new AsyncEventingBasicConsumer(_sendChannel); - _sendConsumer.Received += (s, ea) => - { - if (!_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs)) - return Task.CompletedTask; - var body = ea.Body.ToArray(); - var response = Encoding.UTF8.GetString(body); - tcs.TrySetResult(response); - return Task.CompletedTask; - }; - - _sendChannel.BasicReturn += (s, ea) => - { - if (!_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs)) return; - tcs.TrySetException(new Exception($"Unable to deliver required action: {ea.RoutingKey}")); - }; - - this._consumerId = _sendChannel.BasicConsume(queue: _replyQueueName, autoAck: true, consumer: _sendConsumer); - } - - - public async Task> Dispatch(TRequest request, CancellationToken cancellationToken = default) - where TRequest : IRequest - { - var message = JsonConvert.SerializeObject(request, options.SerializerSettings); - - var correlationId = Guid.NewGuid().ToString(); - - var tcs = new TaskCompletionSource(); - _callbackMapper.TryAdd(correlationId, tcs); - - _sendChannel.BasicPublish( - exchange: Constants.ArbitrerExchangeName, - routingKey: typeof(TRequest).TypeQueueName(_arbitrerOptions), - mandatory: true, - body: Encoding.UTF8.GetBytes(message), - basicProperties: GetBasicProperties(correlationId)); - - cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out var tmp)); - var result = await tcs.Task; - - return JsonConvert.DeserializeObject>(result, options.SerializerSettings); - } - - public Task Notify(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification - { - var message = JsonConvert.SerializeObject(request, options.SerializerSettings); - - logger.LogInformation($"Sending message to: {Constants.ArbitrerExchangeName}/{request.GetType().TypeQueueName(_arbitrerOptions)}"); - - _sendChannel.BasicPublish( - exchange: Constants.ArbitrerExchangeName, - routingKey: request.GetType().TypeQueueName(_arbitrerOptions), - mandatory: false, - body: Encoding.UTF8.GetBytes(message) - ); - - return Task.CompletedTask; - } - - - private IBasicProperties GetBasicProperties(string correlationId) - { - var props = _sendChannel.CreateBasicProperties(); - props.CorrelationId = correlationId; - props.ReplyTo = _replyQueueName; - return props; - } - - public void Dispose() - { - try - { - _sendChannel?.BasicCancel(_consumerId); - _sendChannel?.Close(); - _connection.Close(); - } - catch - { - } - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/RequestsManager.cs b/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/RequestsManager.cs deleted file mode 100644 index 9f10900..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ.Autofac/RequestsManager.cs +++ /dev/null @@ -1,209 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Security.Cryptography; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Autofac; -using Autofac.Core.Lifetime; -using MediatR; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Newtonsoft.Json; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; - -namespace Arbitrer.RabbitMQ -{ - public class RequestsManager : IHostedService - { - private readonly ILogger _logger; - private readonly IArbitrer _arbitrer; - private readonly ILifetimeScope _provider; - private readonly ArbitrerOptions _arbitrerOptions; - - private IConnection _connection = null; - private IModel _channel = null; - - private readonly HashSet _deDuplicationCache = new HashSet(); - private readonly SHA256 _hasher = SHA256.Create(); - - private readonly MessageDispatcherOptions _options; - - public RequestsManager(IOptions options, ILogger logger, IArbitrer arbitrer, ILifetimeScope provider, - IOptions arbitrerOptions) - { - this._options = options.Value; - this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); - this._arbitrer = arbitrer; - this._provider = provider; - this._arbitrerOptions = arbitrerOptions.Value; - } - - public Task StartAsync(CancellationToken cancellationToken) - { - if (_connection == null) - { - _logger.LogInformation($"ARBITRER: Creating RabbitMQ Connection to '{_options.HostName}'..."); - var factory = new ConnectionFactory - { - HostName = _options.HostName, - UserName = _options.UserName, - Password = _options.Password, - VirtualHost = _options.VirtualHost, - Port = _options.Port, - DispatchConsumersAsync = true, - ClientProvidedName = _options.ClientName - }; - - _connection = factory.CreateConnection(); - _channel = _connection.CreateModel(); - _channel.ExchangeDeclare(Constants.ArbitrerExchangeName, ExchangeType.Topic); - - _logger.LogInformation("ARBITRER: ready !"); - } - - - foreach (var t in _arbitrer.GetLocalRequestsTypes()) - { - var isNotification = typeof(INotification).IsAssignableFrom(t); - var queuename = $"{t.TypeQueueName(_arbitrerOptions)}${(isNotification ? Guid.NewGuid().ToString() : "")}"; - - _channel.QueueDeclare(queue: queuename, durable: _options.Durable, exclusive: isNotification, autoDelete: _options.AutoDelete, arguments: null); - _channel.QueueBind(queuename, Constants.ArbitrerExchangeName, t.TypeQueueName(_arbitrerOptions)); - - - var consumer = new AsyncEventingBasicConsumer(_channel); - - var consumermethod = typeof(RequestsManager) - .GetMethod(isNotification ? "ConsumeChannelNotification" : "ConsumeChannelMessage", BindingFlags.Instance | BindingFlags.NonPublic) - .MakeGenericMethod(t); - - consumer.Received += async (s, ea) => - { - try - { - await (Task)consumermethod.Invoke(this, new object[] { s, ea }); - } - catch (Exception e) - { - _logger.LogError(e, e.Message); - throw; - } - }; - _channel.BasicConsume(queue: queuename, autoAck: isNotification, consumer: consumer); - } - - _channel.BasicQos(0, 1, false); - return Task.CompletedTask; - } - - private async Task ConsumeChannelNotification(object sender, BasicDeliverEventArgs ea) - { - var msg = ea.Body.ToArray(); - - if (_options.DeDuplicationEnabled) - { - var hash = msg.GetHash(_hasher); - lock (_deDuplicationCache) - if (_deDuplicationCache.Contains(hash)) - { - _logger.LogDebug($"duplicated message received : {ea.Exchange}/{ea.RoutingKey}"); - return; - } - - lock (_deDuplicationCache) - _deDuplicationCache.Add(hash); - - // Do not await this task -#pragma warning disable CS4014 - Task.Run(async () => - { - await Task.Delay(_options.DeDuplicationTTL); - lock (_deDuplicationCache) - _deDuplicationCache.Remove(hash); - }); -#pragma warning restore CS4014 - } - - _logger.LogDebug("Elaborating notification : {0}", Encoding.UTF8.GetString(msg)); - var message = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(msg), _options.SerializerSettings); - - var replyProps = _channel.CreateBasicProperties(); - replyProps.CorrelationId = ea.BasicProperties.CorrelationId; - try - { - if (!_provider.BeginLifetimeScope(MatchingScopeLifetimeTags.RequestLifetimeScopeTag).TryResolve(out var mediator)) - mediator = _provider.BeginLifetimeScope().Resolve(); - - var arbitrer = mediator as ArbitredMediatr; - arbitrer?.StopPropagating(); - await mediator.Publish(message); - arbitrer?.ResetPropagating(); - } - catch (Exception ex) - { - _logger.LogError(ex, $"Error executing message of type {typeof(T)} from external service"); - } - } - - private async Task ConsumeChannelMessage(object sender, BasicDeliverEventArgs ea) - { - var msg = ea.Body.ToArray(); - _logger.LogDebug("Elaborating message : {0}", Encoding.UTF8.GetString(msg)); - var message = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(msg), _options.SerializerSettings); - - var replyProps = _channel.CreateBasicProperties(); - replyProps.CorrelationId = ea.BasicProperties.CorrelationId; - string responseMsg = null; - try - { - if (!_provider.BeginLifetimeScope(MatchingScopeLifetimeTags.RequestLifetimeScopeTag).TryResolve(out var mediator)) - mediator = _provider.BeginLifetimeScope().Resolve(); - - var response = await mediator.Send(message); - responseMsg = JsonConvert.SerializeObject(new Messages.ResponseMessage { Content = response, Status = Messages.StatusEnum.Ok }, - _options.SerializerSettings); - _logger.LogDebug("Elaborating sending response : {0}", responseMsg); - } - catch (Exception ex) - { - responseMsg = JsonConvert.SerializeObject(new Messages.ResponseMessage { Exception = ex, Status = Messages.StatusEnum.Exception }, - _options.SerializerSettings); - _logger.LogError(ex, $"Error executing message of type {typeof(T)} from external service"); - } - finally - { - _channel.BasicPublish(exchange: "", routingKey: ea.BasicProperties.ReplyTo, basicProperties: replyProps, - body: Encoding.UTF8.GetBytes(responseMsg ?? "")); - _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); - } - } - - public Task StopAsync(CancellationToken cancellationToken) - { - try - { - _channel?.Close(); - } - catch - { - // ignored - } - - try - { - _connection?.Close(); - } - catch - { - // ignored - } - - return Task.CompletedTask; - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ/Arbitrer.9.RabbitMQ.csproj b/Arbitrer.Legacy/Arbitrer.9.RabbitMQ/Arbitrer.9.RabbitMQ.csproj deleted file mode 100644 index ecc8b53..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ/Arbitrer.9.RabbitMQ.csproj +++ /dev/null @@ -1,48 +0,0 @@ - - - Paolo Possanzini,Teamdev s.r.l. - RPC extensions for mediatr via pipeline using RabbitMQ - Copyright Paolo Possanzini - strict - mediator;request;response;queries;commands;notifications;rpc;rabbitmq - - false - v - Apache-2.0 - true - true - snupkg - true - true - Arbitrer.RabbitMQ - netstandard2.0 - Arbitrer.RabbitMQ - - https://github.com/teamdev-it/arbitrer - - - - - - - - - - - consts.cs - - - Extensions\Extensions.cs - - - MessageDispatcherOptions.cs - - - RequestsManager.cs - - - - - - \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ/MessageDispatcher.cs b/Arbitrer.Legacy/Arbitrer.9.RabbitMQ/MessageDispatcher.cs deleted file mode 100644 index 1034f15..0000000 --- a/Arbitrer.Legacy/Arbitrer.9.RabbitMQ/MessageDispatcher.cs +++ /dev/null @@ -1,150 +0,0 @@ -using System.Threading.Tasks; -using MediatR; -using Microsoft.Extensions.Options; -using Microsoft.Extensions.Logging; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using Newtonsoft.Json; -using System.Threading; -using System.Text; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Security.Cryptography; -using Arbitrer.Messages; -using Newtonsoft.Json.Linq; -using Newtonsoft.Json.Serialization; - -namespace Arbitrer.RabbitMQ -{ - public class MessageDispatcher : IExternalMessageDispatcher, IDisposable - { - private readonly MessageDispatcherOptions options; - private readonly ILogger logger; - private readonly ArbitrerOptions _arbitrerOptions; - private IConnection _connection = null; - private IModel _sendChannel = null; - private string _replyQueueName = null; - private AsyncEventingBasicConsumer _sendConsumer = null; - private string _consumerId = null; - private readonly ConcurrentDictionary> _callbackMapper = new ConcurrentDictionary>(); - - - public MessageDispatcher(IOptions options, ILogger logger, IOptions arbitrerOptions) - { - this.options = options.Value; - this.logger = logger; - _arbitrerOptions = arbitrerOptions.Value; - - this.InitConnection(); - } - - private void InitConnection() - { - // Ensuring we have a connetion object - if (_connection == null) - { - logger.LogInformation($"Creating RabbitMQ Connection to '{options.HostName}'..."); - var factory = new ConnectionFactory - { - HostName = options.HostName, - UserName = options.UserName, - Password = options.Password, - VirtualHost = options.VirtualHost, - Port = options.Port, - DispatchConsumersAsync = true, - }; - - _connection = factory.CreateConnection(); - } - - _sendChannel = _connection.CreateModel(); - _sendChannel.ExchangeDeclare(Constants.ArbitrerExchangeName, ExchangeType.Topic); - // _channel.ConfirmSelect(); - - var queueName = $"{options.QueueName}.{Process.GetCurrentProcess().Id}.{DateTime.Now.Ticks}"; - _replyQueueName = _sendChannel.QueueDeclare(queue: queueName).QueueName; - _sendConsumer = new AsyncEventingBasicConsumer(_sendChannel); - _sendConsumer.Received += (s, ea) => - { - if (!_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs)) - return Task.CompletedTask; - var body = ea.Body.ToArray(); - var response = Encoding.UTF8.GetString(body); - tcs.TrySetResult(response); - return Task.CompletedTask; - }; - - _sendChannel.BasicReturn += (s, ea) => - { - if (!_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs)) return; - tcs.TrySetException(new Exception($"Unable to deliver required action: {ea.RoutingKey}")); - }; - - this._consumerId = _sendChannel.BasicConsume(queue: _replyQueueName, autoAck: true, consumer: _sendConsumer); - } - - - public async Task> Dispatch(TRequest request, CancellationToken cancellationToken = default) - where TRequest : IRequest - { - var message = JsonConvert.SerializeObject(request, options.SerializerSettings); - - var correlationId = Guid.NewGuid().ToString(); - - var tcs = new TaskCompletionSource(); - _callbackMapper.TryAdd(correlationId, tcs); - - _sendChannel.BasicPublish( - exchange: Constants.ArbitrerExchangeName, - routingKey: typeof(TRequest).TypeQueueName(_arbitrerOptions), - mandatory: true, - body: Encoding.UTF8.GetBytes(message), - basicProperties: GetBasicProperties(correlationId)); - - cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out var tmp)); - var result = await tcs.Task; - - return JsonConvert.DeserializeObject>(result, options.SerializerSettings); - } - - public Task Notify(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification - { - var message = JsonConvert.SerializeObject(request, options.SerializerSettings); - - logger.LogInformation($"Sending message to: {Constants.ArbitrerExchangeName}/{request.GetType().TypeQueueName(_arbitrerOptions)}"); - - _sendChannel.BasicPublish( - exchange: Constants.ArbitrerExchangeName, - routingKey: request.GetType().TypeQueueName(_arbitrerOptions), - mandatory: false, - body: Encoding.UTF8.GetBytes(message) - ); - - return Task.CompletedTask; - } - - - private IBasicProperties GetBasicProperties(string correlationId) - { - var props = _sendChannel.CreateBasicProperties(); - props.CorrelationId = correlationId; - props.ReplyTo = _replyQueueName; - return props; - } - - public void Dispose() - { - try - { - _sendChannel?.BasicCancel(_consumerId); - _sendChannel?.Close(); - _connection.Close(); - } - catch - { - } - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9/ArbitredMediatr.cs b/Arbitrer.Legacy/Arbitrer.9/ArbitredMediatr.cs deleted file mode 100644 index 2379892..0000000 --- a/Arbitrer.Legacy/Arbitrer.9/ArbitredMediatr.cs +++ /dev/null @@ -1,55 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Net; -using System.Threading; -using System.Threading.Tasks; -using MediatR; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; - -namespace Arbitrer -{ - public class ArbitredMediatr : Mediator - { - private readonly IArbitrer arbitrer; - private readonly ILogger logger; - private bool allowRemoteRequest = true; - - public ArbitredMediatr(ServiceFactory serviceFactory, IArbitrer arbitrer, ILogger logger) : base( - serviceFactory) - { - this.arbitrer = arbitrer; - this.logger = logger; - } - - public void StopPropagating() - { - allowRemoteRequest = false; - } - - public void ResetPropagating() - { - allowRemoteRequest = true; - } - - protected override async Task PublishCore(IEnumerable> allHandlers, INotification notification, - CancellationToken cancellationToken) - { - try - { - if (allowRemoteRequest) - { - logger.LogDebug("Propagating: {Json}", JsonConvert.SerializeObject(notification)); - await arbitrer.SendRemoteNotification(notification); - } - else - await base.PublishCore(allHandlers, notification, cancellationToken); - } - catch (Exception ex) - { - logger.LogError(ex, ex.Message); - throw; - } - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9/Arbitrer.9.csproj b/Arbitrer.Legacy/Arbitrer.9/Arbitrer.9.csproj deleted file mode 100644 index 4811957..0000000 --- a/Arbitrer.Legacy/Arbitrer.9/Arbitrer.9.csproj +++ /dev/null @@ -1,59 +0,0 @@ - - - Paolo Possanzini,Teamdev s.r.l. - RPC extensions for mediatr via pipeline - Copyright Paolo Possanzini - strict - mediator;request;response;queries;commands;notifications;rpc - - false - v - Apache-2.0 - true - true - snupkg - true - true - Arbitrer - netstandard2.0 - Arbitrer - - https://github.com/teamdev-it/arbitrer - - - - <_Parameter1>arbitrer.rabbitmq - - - - - - - - - - - - - ArbitrerOptions.cs - - - Attributes\ArbitrerQueueNameAttribute.cs - - - Attributes\ArbitrerQueueTimeoutAttribute.cs - - - InvalidHandlerException.cs - - - messages\ResponseMessage.cs - - - - - ..\..\..\..\.nuget\packages\microsoft.extensions.logging\6.0.0\lib\netstandard2.0\Microsoft.Extensions.Logging.dll - - - \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9/Arbitrer.cs b/Arbitrer.Legacy/Arbitrer.9/Arbitrer.cs deleted file mode 100644 index d617be1..0000000 --- a/Arbitrer.Legacy/Arbitrer.9/Arbitrer.cs +++ /dev/null @@ -1,77 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Threading.Tasks; -using MediatR; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Newtonsoft.Json.Linq; - -namespace Arbitrer -{ - public class Arbitrer : IArbitrer - { - private readonly ArbitrerOptions options; - private readonly IExternalMessageDispatcher messageDispatcher; - private readonly ILogger logger; - - public Arbitrer(IOptions options, IExternalMessageDispatcher messageDispatcher, ILogger logger) - { - this.options = options.Value; - this.messageDispatcher = messageDispatcher; - this.logger = logger; - } - - public bool HasLocalHandler() where T : IBaseRequest => this.HasLocalHandler(typeof(T)); - public bool HasLocalHandler(Type t) => this.options.LocalRequests.Any(i => i == t); - - public bool HasRemoteHandler() where T : IBaseRequest => this.HasRemoteHandler(typeof(T)); - public bool HasRemoteHandler(Type t) => this.options.RemoteRequests.Any(i => i == t); - - - public HandlerLocation GetLocation() where T : IBaseRequest => this.GetLocation(typeof(T)); - - public HandlerLocation GetLocation(Type t) - { - switch (options.Behaviour) - { - case ArbitrerBehaviourEnum.ImplicitLocal: return this.HasRemoteHandler(t) ? HandlerLocation.Remote : HandlerLocation.Local; - case ArbitrerBehaviourEnum.ImplicitRemote: return this.HasLocalHandler(t) ? HandlerLocation.Local : HandlerLocation.Remote; - default: return this.HasLocalHandler(t) ? HandlerLocation.Local : this.HasRemoteHandler(t) ? HandlerLocation.Remote : HandlerLocation.NotFound; - } - } - - public async Task InvokeRemoteHandler(TRequest request) where TRequest : IRequest - { - logger.LogDebug($"Invoking remote handler for: {typeof(TRequest).TypeQueueName(options)}"); - var result = await messageDispatcher.Dispatch(request); - logger.LogDebug($"Remote request for {typeof(TRequest).TypeQueueName(options)} completed!"); - - if (result.Status == Messages.StatusEnum.Exception) - { - throw (result.Exception ?? new Exception("Error executing remote command")) as Exception; - } - - return (TResponse) result.Content; - } - - public async Task SendRemoteNotification(TRequest request) where TRequest : INotification - { - logger.LogDebug($"Invoking remote handler for: {typeof(TRequest).TypeQueueName(options)}"); - await messageDispatcher.Notify(request); - logger.LogDebug($"Remote request for {typeof(TRequest).TypeQueueName(options)} completed!"); - } - - public IEnumerable GetLocalRequestsTypes() => options.LocalRequests; - - public IEnumerable GetRemoteRequestsTypes() => options.RemoteRequests; - } - - public enum HandlerLocation - { - NotFound, - Local, - Remote, - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9/IArbitrer.cs b/Arbitrer.Legacy/Arbitrer.9/IArbitrer.cs deleted file mode 100644 index 99fc795..0000000 --- a/Arbitrer.Legacy/Arbitrer.9/IArbitrer.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Threading.Tasks; -using MediatR; - -namespace Arbitrer -{ - public interface IArbitrer - { - bool HasLocalHandler() where T : IBaseRequest; - bool HasLocalHandler(Type t); - bool HasRemoteHandler() where T : IBaseRequest; - bool HasRemoteHandler(Type t); - HandlerLocation GetLocation(Type t); - HandlerLocation GetLocation() where T : IBaseRequest; - Task InvokeRemoteHandler(TRequest request) where TRequest : IRequest; - Task SendRemoteNotification(TRequest request) where TRequest : INotification; - - IEnumerable GetLocalRequestsTypes(); - IEnumerable GetRemoteRequestsTypes(); - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9/IExternalMessageDispatcher.cs b/Arbitrer.Legacy/Arbitrer.9/IExternalMessageDispatcher.cs deleted file mode 100644 index b7a54d6..0000000 --- a/Arbitrer.Legacy/Arbitrer.9/IExternalMessageDispatcher.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; -using MediatR; - -namespace Arbitrer -{ - public interface IExternalMessageDispatcher - { - Task> Dispatch(TRequest request, CancellationToken cancellationToken = default) - where TRequest : IRequest; - - Task Notify(TRequest request, CancellationToken cancellationToken = default) where TRequest : INotification; - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9/extensions/ArbitrerExtensions.cs b/Arbitrer.Legacy/Arbitrer.9/extensions/ArbitrerExtensions.cs deleted file mode 100644 index 4f87ebc..0000000 --- a/Arbitrer.Legacy/Arbitrer.9/extensions/ArbitrerExtensions.cs +++ /dev/null @@ -1,157 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; -using MediatR; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; - -namespace Arbitrer -{ - public static class ArbitrerExtensions - { - public static ArbitrerOptions InferLocalRequests(this ArbitrerOptions options, IEnumerable assemblies) - { - var localRequests = assemblies.SelectMany(a => a - .GetTypes() - .SelectMany(t => t.GetInterfaces() - .Where(i => i.FullName != null && i.FullName.StartsWith("MediatR.IRequestHandler")) - .Select(i => i.GetGenericArguments()[0]).ToArray() - )); - options.SetAsLocalRequests(() => localRequests); - return options; - } - - // [Obsolete("This registration is no longer needed", false)] - // public static ArbitrerOptions InferPublishedNotifications(this ArbitrerOptions options, IEnumerable assemblies) - // { - // var localNotifications = assemblies.SelectMany(a => a - // .GetTypes() - // .SelectMany(t => t.GetInterfaces() - // .Where(i => i.FullName != null && i.FullName.StartsWith("MediatR.INotification") && !i.FullName.StartsWith("MediatR.INotificationHandler")) - // .ToArray() - // )); - // - // options.SetAsRemoteRequests(() => localNotifications); - // return options; - // } - - public static ArbitrerOptions InferLocalNotifications(this ArbitrerOptions options, IEnumerable assemblies) - { - var localNotifications = assemblies.SelectMany(a => a - .GetTypes() - .SelectMany(t => t.GetInterfaces() - .Where(i => i.FullName != null && i.FullName.StartsWith("MediatR.INotificationHandler")) - .Select(i => i.GetGenericArguments()[0]).ToArray() - )); - - options.SetAsLocalRequests(() => localNotifications); - return options; - } - - public static ArbitrerOptions SetAsLocalRequest(this ArbitrerOptions options) where T : IBaseRequest - { - options.LocalRequests.Add(typeof(T)); - return options; - } - - public static ArbitrerOptions ListenForNotification(this ArbitrerOptions options) where T : INotification - { - options.LocalRequests.Add(typeof(T)); - return options; - } - - - public static ArbitrerOptions SetAsRemoteRequest(this ArbitrerOptions options) where T : IBaseRequest - { - options.RemoteRequests.Add(typeof(T)); - return options; - } - - // [Obsolete("This registration is no longer needed", false)] - // public static ArbitrerOptions PropagateNotification(this ArbitrerOptions options) where T : INotification - // { - // options.RemoteRequests.Add(typeof(T)); - // return options; - // } - - public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, Func> assemblySelect) - { - var types = (from a in assemblySelect() - from t in a.GetTypes() - where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssignableFrom(t) - select t).AsEnumerable(); - foreach (var t in types) - options.LocalRequests.Add(t); - return options; - } - - public static ArbitrerOptions SetAsLocalRequests(this ArbitrerOptions options, Func> typesSelect) - { - foreach (var t in typesSelect()) - options.LocalRequests.Add(t); - return options; - } - - public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, Func> assemblySelect) - { - var types = (from a in assemblySelect() - from t in a.GetTypes() - where typeof(IBaseRequest).IsAssignableFrom(t) || typeof(INotification).IsAssignableFrom(t) - select t).AsEnumerable(); - foreach (var t in types) - options.RemoteRequests.Add(t); - return options; - } - - public static ArbitrerOptions SetAsRemoteRequests(this ArbitrerOptions options, Func> typesSelect) - { - foreach (var t in typesSelect()) - options.RemoteRequests.Add(t); - return options; - } - - public static string TypeQueueName(this Type t, ArbitrerOptions options, StringBuilder sb = null) - { - if (t.CustomAttributes.Any()) - { - var attr = t.GetCustomAttribute(); - if (attr != null) return $"{t.Namespace}.{attr.Name}".Replace(".", "_"); - } - - options.QueuePrefixes.TryGetValue(t.FullName, out var prefix); - prefix = prefix ?? options.DefaultQueuePrefix; - - sb = sb ?? new StringBuilder(); - - if (!string.IsNullOrWhiteSpace(prefix)) sb.Append($"{prefix}."); - sb.Append($"{t.Namespace}.{t.Name}"); - - if (t.GenericTypeArguments?.Length > 0) - { - sb.Append("["); - foreach (var ta in t.GenericTypeArguments) - { - ta.TypeQueueName(options, sb); - sb.Append(","); - } - - sb.Append("]"); - } - - return sb.ToString().Replace(",]", "]").Replace(".", "_"); - } - - public static int? QueueTimeout(this Type t) - { - if (t.CustomAttributes.Any()) - { - var attr = t.GetCustomAttribute(); - if (attr != null) return attr.ConsumerTimeout; - } - - return null; - } - } -} \ No newline at end of file diff --git a/Arbitrer.Legacy/Arbitrer.9/pipelines/ArbitrerPipeline.cs b/Arbitrer.Legacy/Arbitrer.9/pipelines/ArbitrerPipeline.cs deleted file mode 100644 index f2f528c..0000000 --- a/Arbitrer.Legacy/Arbitrer.9/pipelines/ArbitrerPipeline.cs +++ /dev/null @@ -1,60 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using MediatR; -using Microsoft.Extensions.Logging; - -namespace Arbitrer.Pipelines -{ - public class ArbitrerPipeline : IPipelineBehavior where TRequest : IRequest - { - private readonly IArbitrer _arbitrer; - private readonly ILogger _logger; - - public ArbitrerPipeline(IArbitrer arbitrer, ILogger logger) - { - this._arbitrer = arbitrer; - _logger = logger; - } - - // Implementation for legacy version for .netstandard 2.0 compatibility - public async Task Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate next) - { - switch (_arbitrer.GetLocation()) - { - case HandlerLocation.Local: return await next().ConfigureAwait(false); - case HandlerLocation.Remote: - try - { - return await _arbitrer.InvokeRemoteHandler(request); - } - catch (Exception e) - { - _logger.LogError(e, e.Message); - throw; - } - default: throw new InvalidHandlerException(); - } - } - - // Implementation for version > 11 - public async Task Handle(TRequest request, RequestHandlerDelegate next, CancellationToken cancellationToken) - { - switch (_arbitrer.GetLocation()) - { - case HandlerLocation.Local: return await next().ConfigureAwait(false); - case HandlerLocation.Remote: - try - { - return await _arbitrer.InvokeRemoteHandler(request); - } - catch (Exception ex) - { - _logger.LogError(ex, ex.Message); - throw; - } - default: throw new InvalidHandlerException(); - } - } - } -} \ No newline at end of file diff --git a/arbitrer.sln b/arbitrer.sln index 5dda5b0..74c802c 100644 --- a/arbitrer.sln +++ b/arbitrer.sln @@ -17,26 +17,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "sender", "tests\sender\send EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Arbitrer.Kafka", "Arbitrer.Kafka\Arbitrer.Kafka.csproj", "{52E67D98-DE21-4434-AB34-5FF4FB70C5E7}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{CC9A79DB-205E-4CA0-B8C8-FFDDACF0FB80}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.Autofac", "Arbitrer.Autofac\Arbitrer.Autofac.csproj", "{87C76B91-D332-4CFF-A304-E287F2657515}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.RabbitMQ.Autofac", "Arbitrer.RabbitMQ.Autofac\Arbitrer.RabbitMQ.Autofac.csproj", "{F1DFD692-10B7-4D6C-88C1-E716B1C065B8}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Legacy", "Legacy", "{C46C78FD-AFE1-4696-848D-A770C04BAB8A}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.9", "Arbitrer.Legacy\Arbitrer.9\Arbitrer.9.csproj", "{2924F360-093F-4F6B-8506-632062FB2506}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.9.Autofac", "Arbitrer.Legacy\Arbitrer.9.Autofac\Arbitrer.9.Autofac.csproj", "{06E4F3AB-7642-4ED3-8992-BF23B15BBDCC}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.9.DependencyInjection", "Arbitrer.Legacy\Arbitrer.9.DependencyInjection\Arbitrer.9.DependencyInjection.csproj", "{60C9135C-111D-416D-BA46-04DECB8E96BA}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.9.Kafka", "Arbitrer.Legacy\Arbitrer.9.Kafka\Arbitrer.9.Kafka.csproj", "{B82D673B-D921-45FC-9200-18C0A2010DE1}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.9.RabbitMQ", "Arbitrer.Legacy\Arbitrer.9.RabbitMQ\Arbitrer.9.RabbitMQ.csproj", "{B40710BC-0775-43A7-82BE-A6545C338823}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.9.RabbitMQ.Autofac", "Arbitrer.Legacy\Arbitrer.9.RabbitMQ.Autofac\Arbitrer.9.RabbitMQ.Autofac.csproj", "{321A1107-CE4A-4C8E-9C92-09F085115108}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Arbitrer.GRPC", "Arbitrer.GRPC\Arbitrer.GRPC.csproj", "{26449499-FBBC-4A2A-AE93-EB97A354BAA6}" EndProject Global @@ -77,30 +61,6 @@ Global {F1DFD692-10B7-4D6C-88C1-E716B1C065B8}.Debug|Any CPU.Build.0 = Debug|Any CPU {F1DFD692-10B7-4D6C-88C1-E716B1C065B8}.Release|Any CPU.ActiveCfg = Release|Any CPU {F1DFD692-10B7-4D6C-88C1-E716B1C065B8}.Release|Any CPU.Build.0 = Release|Any CPU - {2924F360-093F-4F6B-8506-632062FB2506}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {2924F360-093F-4F6B-8506-632062FB2506}.Debug|Any CPU.Build.0 = Debug|Any CPU - {2924F360-093F-4F6B-8506-632062FB2506}.Release|Any CPU.ActiveCfg = Release|Any CPU - {2924F360-093F-4F6B-8506-632062FB2506}.Release|Any CPU.Build.0 = Release|Any CPU - {06E4F3AB-7642-4ED3-8992-BF23B15BBDCC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {06E4F3AB-7642-4ED3-8992-BF23B15BBDCC}.Debug|Any CPU.Build.0 = Debug|Any CPU - {06E4F3AB-7642-4ED3-8992-BF23B15BBDCC}.Release|Any CPU.ActiveCfg = Release|Any CPU - {06E4F3AB-7642-4ED3-8992-BF23B15BBDCC}.Release|Any CPU.Build.0 = Release|Any CPU - {60C9135C-111D-416D-BA46-04DECB8E96BA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {60C9135C-111D-416D-BA46-04DECB8E96BA}.Debug|Any CPU.Build.0 = Debug|Any CPU - {60C9135C-111D-416D-BA46-04DECB8E96BA}.Release|Any CPU.ActiveCfg = Release|Any CPU - {60C9135C-111D-416D-BA46-04DECB8E96BA}.Release|Any CPU.Build.0 = Release|Any CPU - {B82D673B-D921-45FC-9200-18C0A2010DE1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {B82D673B-D921-45FC-9200-18C0A2010DE1}.Debug|Any CPU.Build.0 = Debug|Any CPU - {B82D673B-D921-45FC-9200-18C0A2010DE1}.Release|Any CPU.ActiveCfg = Release|Any CPU - {B82D673B-D921-45FC-9200-18C0A2010DE1}.Release|Any CPU.Build.0 = Release|Any CPU - {B40710BC-0775-43A7-82BE-A6545C338823}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {B40710BC-0775-43A7-82BE-A6545C338823}.Debug|Any CPU.Build.0 = Debug|Any CPU - {B40710BC-0775-43A7-82BE-A6545C338823}.Release|Any CPU.ActiveCfg = Release|Any CPU - {B40710BC-0775-43A7-82BE-A6545C338823}.Release|Any CPU.Build.0 = Release|Any CPU - {321A1107-CE4A-4C8E-9C92-09F085115108}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {321A1107-CE4A-4C8E-9C92-09F085115108}.Debug|Any CPU.Build.0 = Debug|Any CPU - {321A1107-CE4A-4C8E-9C92-09F085115108}.Release|Any CPU.ActiveCfg = Release|Any CPU - {321A1107-CE4A-4C8E-9C92-09F085115108}.Release|Any CPU.Build.0 = Release|Any CPU {26449499-FBBC-4A2A-AE93-EB97A354BAA6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {26449499-FBBC-4A2A-AE93-EB97A354BAA6}.Debug|Any CPU.Build.0 = Debug|Any CPU {26449499-FBBC-4A2A-AE93-EB97A354BAA6}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -113,12 +73,6 @@ Global {A3CB6BBD-11F3-4F4F-A2F6-812BF6C62858} = {3AA78ADA-0BD6-43C1-A215-E2608D090B27} {ABCBBD73-93A4-41B6-88AA-66B7EC20B9EA} = {3AA78ADA-0BD6-43C1-A215-E2608D090B27} {D5052EFA-40F7-4F73-BE0D-F0BDA3130DCB} = {3AA78ADA-0BD6-43C1-A215-E2608D090B27} - {2924F360-093F-4F6B-8506-632062FB2506} = {C46C78FD-AFE1-4696-848D-A770C04BAB8A} - {06E4F3AB-7642-4ED3-8992-BF23B15BBDCC} = {C46C78FD-AFE1-4696-848D-A770C04BAB8A} - {60C9135C-111D-416D-BA46-04DECB8E96BA} = {C46C78FD-AFE1-4696-848D-A770C04BAB8A} - {B82D673B-D921-45FC-9200-18C0A2010DE1} = {C46C78FD-AFE1-4696-848D-A770C04BAB8A} - {B40710BC-0775-43A7-82BE-A6545C338823} = {C46C78FD-AFE1-4696-848D-A770C04BAB8A} - {321A1107-CE4A-4C8E-9C92-09F085115108} = {C46C78FD-AFE1-4696-848D-A770C04BAB8A} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {EA2AB956-E48F-42AE-B669-D12BB549726A} From 4dcad3825f3fc1112824596ed984705ba486bc92 Mon Sep 17 00:00:00 2001 From: Paolo Possanzini Date: Tue, 23 Jul 2024 14:52:23 +0200 Subject: [PATCH 4/4] RoundRobin Notification Distribution Mode --- Arbitrer.RabbitMQ/MessageDispatcherOptions.cs | 2 ++ Arbitrer.RabbitMQ/RequestsManager.cs | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Arbitrer.RabbitMQ/MessageDispatcherOptions.cs b/Arbitrer.RabbitMQ/MessageDispatcherOptions.cs index b1215e1..7d1d857 100644 --- a/Arbitrer.RabbitMQ/MessageDispatcherOptions.cs +++ b/Arbitrer.RabbitMQ/MessageDispatcherOptions.cs @@ -95,6 +95,8 @@ public class MessageDispatcherOptions /// public JsonSerializerSettings SerializerSettings { get; set; } + public bool UseRoundRobinNotificationDistribution { get; set; } = false; + /// Represents the options for message dispatcher. /// / public MessageDispatcherOptions() diff --git a/Arbitrer.RabbitMQ/RequestsManager.cs b/Arbitrer.RabbitMQ/RequestsManager.cs index 72d6678..31d9b50 100644 --- a/Arbitrer.RabbitMQ/RequestsManager.cs +++ b/Arbitrer.RabbitMQ/RequestsManager.cs @@ -115,7 +115,9 @@ public Task StartAsync(CancellationToken cancellationToken) { if (t is null) continue; var isNotification = typeof(INotification).IsAssignableFrom(t); - var queueName = $"{t.TypeQueueName(_arbitrerOptions)}${(isNotification ? Guid.NewGuid().ToString() : "")}"; + var isExclusive = isNotification && !_options.UseRoundRobinNotificationDistribution; + var queueName = $"{t.TypeQueueName(_arbitrerOptions)}$" + + $"{(isNotification ? (_options.UseRoundRobinNotificationDistribution ? Assembly.GetEntryAssembly()?.FullName : Guid.NewGuid().ToString()) : "")}"; var arguments = new Dictionary(); var timeout = t.QueueTimeout(); @@ -125,7 +127,9 @@ public Task StartAsync(CancellationToken cancellationToken) } - _channel.QueueDeclare(queue: queueName, durable: _options.Durable, exclusive: isNotification, autoDelete: _options.AutoDelete, arguments: arguments); + _channel.QueueDeclare(queue: queueName, durable: _options.Durable, + exclusive: isExclusive, + autoDelete: _options.AutoDelete, arguments: arguments); _channel.QueueBind(queueName, Constants.ArbitrerExchangeName, t.TypeQueueName(_arbitrerOptions));