Skip to content

Commit

Permalink
queue managements
Browse files Browse the repository at this point in the history
  • Loading branch information
ppossanzini committed Nov 15, 2024
1 parent 1f8a3e9 commit 122d162
Show file tree
Hide file tree
Showing 39 changed files with 74 additions and 588 deletions.
6 changes: 3 additions & 3 deletions Arbitrer.GRPC/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public bool CanDispatch<TRequest>()
var result = await grpcClient.ManageArbitrerMessageAsync(new RequestMessage
{
Body = message,
ArbitrerType = queueName ?? typeof(TRequest).TypeQueueName(arbitrerOptions)
ArbitrerType = queueName ?? typeof(TRequest).ArbitrerTypeName(arbitrerOptions)
});
return JsonConvert.DeserializeObject<Messages.ResponseMessage<TResponse>>(result.Body, options.SerializerSettings);
}
Expand All @@ -109,15 +109,15 @@ public Task Notify<TRequest>(TRequest request, string queueName = null, Cancella
{
var message = JsonConvert.SerializeObject(request, options.SerializerSettings);

logger.LogInformation($"Sending notifications of: {typeof(TRequest).Name}/{queueName ?? request.GetType().TypeQueueName(arbitrerOptions)}");
logger.LogInformation($"Sending notifications of: {typeof(TRequest).Name}/{queueName ?? request.GetType().ArbitrerTypeName(arbitrerOptions)}");

foreach (var channel in DestinationChannels)
{
var grpcClient = GetClientFor<TRequest>();
grpcClient.ManageArbitrerNotificationAsync(new NotifyMessage()
{
Body = message,
ArbitrerType = queueName ?? typeof(TRequest).TypeQueueName(arbitrerOptions)
ArbitrerType = queueName ?? typeof(TRequest).ArbitrerTypeName(arbitrerOptions)
});
}

Expand Down
2 changes: 1 addition & 1 deletion Arbitrer.GRPC/RequestsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public RequestsManager(IOptions<MessageDispatcherOptions> options, ILogger<Reque
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
this._provider = provider;

_typeMappings = requestsManagerOptions.Value.AcceptMessageTypes.ToDictionary(k => k.TypeQueueName(arbitrerOptions.Value), v => v);
_typeMappings = requestsManagerOptions.Value.AcceptMessageTypes.ToDictionary(k => k.ArbitrerTypeName(arbitrerOptions.Value), v => v);
}


Expand Down
4 changes: 2 additions & 2 deletions Arbitrer.Kafka/Arbitrer.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.*" />
<PackageReference Include="Newtonsoft.Json" Version="13.*" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="7.*" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.*" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="MediatR" Version="12.*" />
</ItemGroup>

Expand Down
6 changes: 3 additions & 3 deletions Arbitrer.Kafka/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public bool CanDispatch<TRequest>()
_callbackMapper.TryAdd(correlationId, tcs);

await _producer.ProduceAsync(
topic: queueName ?? typeof(TRequest).TypeQueueName(_arbitrerOptions),
topic: queueName ?? typeof(TRequest).ArbitrerTypeName(_arbitrerOptions),
message: new Message<Null, string> { Value = message }, cancellationToken);

cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out var tmp));
Expand All @@ -149,10 +149,10 @@ public async Task Notify<TRequest>(TRequest request, string queueName = null, Ca
{
var message = JsonConvert.SerializeObject(request, _options.SerializerSettings);

_logger.LogInformation($"Sending message to: {Consts.ArbitrerExchangeName}/{queueName ?? request.GetType().TypeQueueName(_arbitrerOptions)}");
_logger.LogInformation($"Sending message to: {Consts.ArbitrerExchangeName}/{queueName ?? request.GetType().ArbitrerTypeName(_arbitrerOptions)}");

await _producer.ProduceAsync(
topic: queueName ?? typeof(TRequest).TypeQueueName(_arbitrerOptions),
topic: queueName ?? typeof(TRequest).ArbitrerTypeName(_arbitrerOptions),
message: new Message<Null, string> { Value = message }, cancellationToken);
}

Expand Down
14 changes: 7 additions & 7 deletions Arbitrer.Kafka/RequestsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,24 @@ public Task StartAsync(CancellationToken cancellationToken)
foreach (var t in _arbitrer.GetLocalRequestsTypes())
{
if (t is null) continue;
_provider.CreateTopicAsync(_options, t.TypeQueueName(_arbitrerOptions));
var isNotification = typeof(INotification).IsAssignableFrom(t);
_provider.CreateTopicAsync(_options, t.ArbitrerTypeName(_arbitrerOptions));

if (isNotification)

if (t.IsNotification())
{
notificationsSubscriptions.Add(t.TypeQueueName(_arbitrerOptions));
notificationsSubscriptions.Add(t.ArbitrerTypeName(_arbitrerOptions));
var consumerMethod = typeof(RequestsManager)
.GetMethod("ConsumeChannelNotification", BindingFlags.Instance | BindingFlags.NonPublic)?
.MakeGenericMethod(t);
_methods.Add(t.TypeQueueName(_arbitrerOptions), consumerMethod);
_methods.Add(t.ArbitrerTypeName(_arbitrerOptions), consumerMethod);
}
else
{
requestSubscriptions.Add(t.TypeQueueName(_arbitrerOptions));
requestSubscriptions.Add(t.ArbitrerTypeName(_arbitrerOptions));
var consumerMethod = typeof(RequestsManager)
.GetMethod("ConsumeChannelMessage", BindingFlags.Instance | BindingFlags.NonPublic)?
.MakeGenericMethod(t);
_methods.Add(t.TypeQueueName(_arbitrerOptions), consumerMethod);
_methods.Add(t.ArbitrerTypeName(_arbitrerOptions), consumerMethod);
}
}

Expand Down
4 changes: 2 additions & 2 deletions Arbitrer.RabbitMQ/Arbitrer.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.*" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="7.*" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.*" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="MediatR" Version="12.*" />
</ItemGroup>
<ItemGroup>
Expand Down
13 changes: 13 additions & 0 deletions Arbitrer.RabbitMQ/Extensions/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ public static IServiceCollection AddArbitrerRabbitMQMessageDispatcher(this IServ
return services;
}

public static string ArbitrerQueueName(this Type t, ArbitrerOptions options, StringBuilder sb = null)
{
if (options.QueueNames.TryGetValue(t, out string queueName)) return queueName;

sb = sb ?? new StringBuilder();
sb.Append(t.ArbitrerTypeName(options));

sb.Append("$");
if (t.IsNotification())
sb.Append(Guid.NewGuid().ToString());
return sb.ToString();
}

public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Assembly>> assemblySelect)
{
Expand Down
6 changes: 3 additions & 3 deletions Arbitrer.RabbitMQ/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public bool CanDispatch<TRequest>()

_sendChannel.BasicPublish(
exchange: Constants.ArbitrerExchangeName,
routingKey: queueName ?? typeof(TRequest).TypeQueueName(arbitrerOptions),
routingKey: queueName ?? typeof(TRequest).ArbitrerTypeName(arbitrerOptions),
mandatory: true,
body: Encoding.UTF8.GetBytes(message),
basicProperties: GetBasicProperties(correlationId));
Expand All @@ -197,11 +197,11 @@ public Task Notify<TRequest>(TRequest request, string queueName = null, Cancella
{
var message = JsonConvert.SerializeObject(request, options.SerializerSettings);

logger.LogInformation($"Sending message to: {Constants.ArbitrerExchangeName}/{queueName ?? request.GetType().TypeQueueName(arbitrerOptions)}");
logger.LogInformation($"Sending message to: {Constants.ArbitrerExchangeName}/{queueName ?? request.GetType().ArbitrerTypeName(arbitrerOptions)}");

_sendChannel.BasicPublish(
exchange: Constants.ArbitrerExchangeName,
routingKey: queueName ?? request.GetType().TypeQueueName(arbitrerOptions),
routingKey: queueName ?? request.GetType().ArbitrerTypeName(arbitrerOptions),
mandatory: false,
body: Encoding.UTF8.GetBytes(message)
);
Expand Down
22 changes: 1 addition & 21 deletions Arbitrer.RabbitMQ/MessageDispatcherOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,12 @@ public class MessageDispatcherOptions
public ushort PerConsumerQos { get; set; } = 1;

public string ClientName { get; set; }
/// <summary>
/// Gets or sets the time-to-live value for deduplication.
/// </summary>
/// <remarks>
/// The DeDuplicationTTL property determines the amount of time, in milliseconds, that an item can be considered duplicate
/// before it is removed from the deduplication cache. The default value is 5000 milliseconds (5 seconds).
/// </remarks>
/// <value>
/// The time-to-live value for deduplication.
/// </value>
public int DeDuplicationTTL { get; set; } = 5000;

/// <summary>
/// Gets or sets a value indicating whether duplicate entries are enabled for deduplication.
/// </summary>
/// <value>
/// <c>true</c> if deduplication is enabled; otherwise, <c>false</c>.
/// </value>
public bool DeDuplicationEnabled { get; set; } = false;

/// <summary>
/// Gets or sets the serializer settings for JSON serialization and deserialization.
/// </summary>
public JsonSerializerSettings SerializerSettings { get; set; }

public bool UseRoundRobinNotificationDistribution { get; set; } = false;


public HashSet<Type> DispatchOnly { get; private set; } = new HashSet<Type>();
public HashSet<Type> DontDispatch { get; private set; } = new HashSet<Type>();
Expand Down
37 changes: 3 additions & 34 deletions Arbitrer.RabbitMQ/RequestsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public class RequestsManager : IHostedService
/// </summary>
private IModel _channel = null;

/// <summary>
/// A HashSet used for deduplication cache.
/// </summary>
private readonly HashSet<string> _deduplicationcache = new HashSet<string>();

/// <summary>
/// Represents a SHA256 hash algorithm instance used for hashing data.
Expand Down Expand Up @@ -114,10 +110,8 @@ public Task StartAsync(CancellationToken cancellationToken)
foreach (var t in _arbitrer.GetLocalRequestsTypes())
{
if (t is null) continue;
var isNotification = typeof(INotification).IsAssignableFrom(t) && !typeof(IBaseRequest).IsAssignableFrom(t);
var isExclusive = isNotification && !_options.UseRoundRobinNotificationDistribution;
var queueName = $"{t.TypeQueueName(_arbitrerOptions)}$" +
$"{(isNotification ? (_options.UseRoundRobinNotificationDistribution ? Assembly.GetEntryAssembly()?.FullName : Guid.NewGuid().ToString()) : "")}";
var isNotification = t.IsNotification();
var queueName = t.ArbitrerQueueName(_arbitrerOptions);

var arguments = new Dictionary<string, object>();
var timeout = t.QueueTimeout();
Expand All @@ -128,9 +122,8 @@ public Task StartAsync(CancellationToken cancellationToken)


_channel.QueueDeclare(queue: queueName, durable: _options.Durable,
exclusive: isExclusive,
autoDelete: _options.AutoDelete, arguments: arguments);
_channel.QueueBind(queueName, Constants.ArbitrerExchangeName, t.TypeQueueName(_arbitrerOptions));
_channel.QueueBind(queueName, Constants.ArbitrerExchangeName, t.ArbitrerTypeName(_arbitrerOptions));


var consumer = new AsyncEventingBasicConsumer(_channel);
Expand Down Expand Up @@ -203,30 +196,6 @@ private async Task ConsumeChannelNotification<T>(object sender, BasicDeliverEven
{
var msg = ea.Body.ToArray();

if (_options.DeDuplicationEnabled)
{
var hash = msg.GetHash(_hasher);
lock (_deduplicationcache)
if (_deduplicationcache.Contains(hash))
{
_logger.LogDebug($"duplicated message received : {ea.Exchange}/{ea.RoutingKey}");
return;
}

lock (_deduplicationcache)
_deduplicationcache.Add(hash);

// Do not await this task
#pragma warning disable CS4014
Task.Run(async () =>
{
await Task.Delay(_options.DeDuplicationTTL);
lock (_deduplicationcache)
_deduplicationcache.Remove(hash);
});
#pragma warning restore CS4014
}

_logger.LogDebug("Elaborating notification : {0}", Encoding.UTF8.GetString(msg));
var message = JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(msg), _options.SerializerSettings);

Expand Down
8 changes: 4 additions & 4 deletions Arbitrer/Arbitrer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public HandlerLocation GetLocation(Type t)
/// <returns>The response object.</returns>
public async Task<TResponse> InvokeRemoteHandler<TRequest, TResponse>(TRequest request, string queueName = null)
{
_logger.LogDebug($"Invoking remote handler for: {queueName ?? typeof(TRequest).TypeQueueName(_options)}");
_logger.LogDebug($"Invoking remote handler for: {queueName ?? typeof(TRequest).ArbitrerTypeName(_options)}");

ResponseMessage<TResponse> result = null;
foreach (var dispatcher in this._messageDispatchers)
Expand All @@ -109,7 +109,7 @@ public async Task<TResponse> InvokeRemoteHandler<TRequest, TResponse>(TRequest r
break;
}

_logger.LogDebug($"Remote request for {queueName ?? typeof(TRequest).TypeQueueName(_options)} completed!");
_logger.LogDebug($"Remote request for {queueName ?? typeof(TRequest).ArbitrerTypeName(_options)} completed!");

if (result == null)
{
Expand All @@ -132,10 +132,10 @@ public async Task<TResponse> InvokeRemoteHandler<TRequest, TResponse>(TRequest r
/// <returns>A task representing the asynchronous operation.</returns>
public Task SendRemoteNotification<TRequest>(TRequest request, string queueName = null) where TRequest : INotification
{
_logger.LogDebug($"Invoking remote handler for: {queueName ?? typeof(TRequest).TypeQueueName(_options)}");
_logger.LogDebug($"Invoking remote handler for: {queueName ?? typeof(TRequest).ArbitrerTypeName(_options)}");
Task.WaitAll(_messageDispatchers.Select(i => i.Notify(request, queueName)).ToArray());

_logger.LogDebug($"Remote request for {queueName ?? typeof(TRequest).TypeQueueName(_options)} completed!");
_logger.LogDebug($"Remote request for {queueName ?? typeof(TRequest).ArbitrerTypeName(_options)} completed!");
return Task.CompletedTask;
}

Expand Down
6 changes: 3 additions & 3 deletions Arbitrer/Arbitrer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
<ItemGroup>
<PackageReference Include="MediatR" Version="12.*" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.*" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="7.*" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.*" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.*" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion Arbitrer/ArbitrerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ArbitrerOptions
/// <summary>
/// Get the prefix of remote queue
/// </summary>
public Dictionary<string, string> QueuePrefixes { get; private set; } = new Dictionary<string, string>();
public Dictionary<string, string> TypePrefixes { get; private set; } = new Dictionary<string, string>();

public Dictionary<Type, string> QueueNames { get; private set; } = new Dictionary<Type, string>();
}
Expand Down
Loading

0 comments on commit 122d162

Please sign in to comment.