Skip to content

Commit

Permalink
multiple dispatcher configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
ppossanzini committed Sep 27, 2024
1 parent eb415bf commit 24728a3
Show file tree
Hide file tree
Showing 14 changed files with 715 additions and 476 deletions.
52 changes: 52 additions & 0 deletions Arbitrer.GRPC/Extensions/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
using Arbitrer.GRPC;
using Grpc.AspNetCore.Server;
using MediatR;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Cors.Infrastructure;
using Microsoft.AspNetCore.Hosting;
Expand Down Expand Up @@ -30,6 +34,54 @@ public static void AddArbitrerGrpcCors(this CorsOptions corsOptions)
});
}

public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Assembly>> assemblySelect)
{
var types = (
from a in assemblySelect()
from t in a.GetTypes()
where typeof(IBaseRequest).IsAssignableFrom(t)
select t).AsEnumerable();

foreach (var t in types)
options.DispatchOnly.Add(t);

return options;
}

public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Type>> typesSelect)
{
foreach (var type in typesSelect().Where(t => typeof(IBaseRequest).IsAssignableFrom(t)))
options.DispatchOnly.Add(type);

return options;
}

public static MessageDispatcherOptions DenyDispatchTo(this MessageDispatcherOptions options,
Func<IEnumerable<Type>> typesSelect)
{
foreach (var type in typesSelect().Where(t => typeof(IBaseRequest).IsAssignableFrom(t)))
options.DispatchOnly.Add(type);

return options;
}

public static MessageDispatcherOptions DenyDispatchTo(this MessageDispatcherOptions options,
Func<IEnumerable<Assembly>> assemblySelect)
{
var types = (
from a in assemblySelect()
from t in a.GetTypes()
where typeof(IBaseRequest).IsAssignableFrom(t)
select t).AsEnumerable();

foreach (var t in types)
options.DontDispatch.Add(t);

return options;
}


/// <summary>
/// Add the Arbitrer RabbitMQ message dispatcher to the service collection, allowing it to be resolved and used.
Expand Down
36 changes: 32 additions & 4 deletions Arbitrer.GRPC/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public MessageDispatcher(
this.options = options.Value;
this.logger = logger;
this.arbitrerOptions = arbitrerOptions.Value;

DestinationChannels.Add(this.options.DefaultServiceUri,
GrpcChannel.ForAddress(this.options.DefaultServiceUri, this.options.ChannelOptions));
}


Expand All @@ -51,15 +54,41 @@ public GrpcChannel GetChannelFor<T>()
return DestinationChannels[service.Uri];
}

return null;
return DestinationChannels[options.DefaultServiceUri];
}

public Dictionary<string, GrpcServices.GrpcServicesClient> DestinationClients { get; set; } = new();

public GrpcServices.GrpcServicesClient GetClientFor<T>()
{
var serviceuri = options.DefaultServiceUri;
if (this.options.RemoteTypeServices.TryGetValue(typeof(T), out var service))
{
serviceuri = service.Uri;
}

if (!DestinationClients.ContainsKey(serviceuri))
DestinationClients.Add(serviceuri, new GrpcServices.GrpcServicesClient(GetChannelFor<T>()));
return DestinationClients[serviceuri];
}


public bool CanDispatch<TRequest>()
{
if (options.DispatchOnly.Count > 0)
return options.DispatchOnly.Contains(typeof(TRequest));

if (options.DontDispatch.Count > 0)
return !options.DontDispatch.Contains(typeof(TRequest));

return true;
}

public async Task<Messages.ResponseMessage<TResponse>> Dispatch<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken = default)
{
var message = JsonConvert.SerializeObject(request, options.SerializerSettings);

var grpcClient = new GrpcServices.GrpcServicesClient(this.GetChannelFor<TRequest>());
var grpcClient = GetClientFor<TRequest>();
var result = await grpcClient.ManageArbitrerMessageAsync(new RequestMessage
{
Body = message,
Expand All @@ -83,7 +112,7 @@ public Task Notify<TRequest>(TRequest request, CancellationToken cancellationTok

foreach (var channel in DestinationChannels)
{
var grpcClient = new GrpcServices.GrpcServicesClient(channel.Value);
var grpcClient = GetClientFor<TRequest>();
grpcClient.ManageArbitrerNotificationAsync(new NotifyMessage()
{
Body = message,
Expand All @@ -96,7 +125,6 @@ public Task Notify<TRequest>(TRequest request, CancellationToken cancellationTok

public void Dispose()
{

}
}
}
7 changes: 7 additions & 0 deletions Arbitrer.GRPC/MessageDispatcherOptions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using Grpc.Net.Client;
using Newtonsoft.Json;

namespace Arbitrer.GRPC
Expand All @@ -25,9 +26,15 @@ public class MessageDispatcherOptions
/// <c>true</c> if deduplication is enabled; otherwise, <c>false</c>.
/// </value>
public bool DeDuplicationEnabled { get; set; } = true;

public string DefaultServiceUri { get; set; }
public GrpcChannelOptions ChannelOptions { get; set; }

public Dictionary<Type, RemoteServiceDefinition> RemoteTypeServices { get; set; } = new();

public HashSet<Type> DispatchOnly { get; private set; } = new HashSet<Type>();
public HashSet<Type> DontDispatch { get; private set; } = new HashSet<Type>();

public JsonSerializerSettings SerializerSettings { get; set; }

public MessageDispatcherOptions()
Expand Down
53 changes: 53 additions & 0 deletions Arbitrer.Kafka/Extensions/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
using Arbitrer.Kafka;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -34,6 +38,55 @@ public static IServiceCollection ResolveArbitrerCalls(this IServiceCollection se
services.AddHostedService<RequestsManager>();
return services;
}


public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Assembly>> assemblySelect)
{
var types = (
from a in assemblySelect()
from t in a.GetTypes()
where typeof(IBaseRequest).IsAssignableFrom(t)
select t).AsEnumerable();

foreach (var t in types)
options.DispatchOnly.Add(t);

return options;
}

public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Type>> typesSelect)
{
foreach (var type in typesSelect().Where(t => typeof(IBaseRequest).IsAssignableFrom(t)))
options.DispatchOnly.Add(type);

return options;
}

public static MessageDispatcherOptions DenyDispatchTo(this MessageDispatcherOptions options,
Func<IEnumerable<Type>> typesSelect)
{
foreach (var type in typesSelect().Where(t => typeof(IBaseRequest).IsAssignableFrom(t)))
options.DispatchOnly.Add(type);

return options;
}

public static MessageDispatcherOptions DenyDispatchTo(this MessageDispatcherOptions options,
Func<IEnumerable<Assembly>> assemblySelect)
{
var types = (
from a in assemblySelect()
from t in a.GetTypes()
where typeof(IBaseRequest).IsAssignableFrom(t)
select t).AsEnumerable();

foreach (var t in types)
options.DontDispatch.Add(t);

return options;
}

/// Creates a Kafka topic asynchronously.
/// @param services The service provider to retrieve additional services from.
Expand Down
11 changes: 11 additions & 0 deletions Arbitrer.Kafka/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ public void InitConnection()
_consumerThread.Start();
}

public bool CanDispatch<TRequest>()
{
if (_options.DispatchOnly.Count > 0)
return _options.DispatchOnly.Contains(typeof(TRequest));

if(_options.DontDispatch.Count >0)
return !_options.DontDispatch.Contains(typeof(TRequest));

return true;
}

/// <summary>
/// Dispatches a request message to a Kafka topic and waits for a response.
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions Arbitrer.Kafka/MessageDispatcherOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace Arbitrer.Kafka
/// </summary>
public class MessageDispatcherOptions
{
public HashSet<Type> DispatchOnly { get; private set; } = new HashSet<Type>();
public HashSet<Type> DontDispatch { get; private set; } = new HashSet<Type>();

public int? TopicPartition { get; set; }
public string BootstrapServers { get; set; }
public Offset Offset { get; set; } = Offset.End;
Expand Down
63 changes: 58 additions & 5 deletions Arbitrer.RabbitMQ/Extensions/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
using Arbitrer.RabbitMQ;
using MediatR;
using Microsoft.Extensions.DependencyInjection;

namespace Arbitrer
Expand All @@ -17,13 +21,62 @@ public static class Extensions
/// <param name="services">The service collection to add the message dispatcher to.</param>
/// <param name="config">The configuration settings for the message dispatcher.</param>
/// <returns>The updated service collection.</returns>
public static IServiceCollection AddArbitrerRabbitMQMessageDispatcher(this IServiceCollection services, Action<MessageDispatcherOptions> config)
public static IServiceCollection AddArbitrerRabbitMQMessageDispatcher(this IServiceCollection services,
Action<MessageDispatcherOptions> config)
{
services.Configure<MessageDispatcherOptions>(config);
services.AddSingleton<IExternalMessageDispatcher, MessageDispatcher>();
services.AddKeyedSingleton<IExternalMessageDispatcher, MessageDispatcher>(Arbitrer.ArbitrerKeyServicesName);
return services;
}

public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Assembly>> assemblySelect)
{
var types = (
from a in assemblySelect()
from t in a.GetTypes()
where typeof(IBaseRequest).IsAssignableFrom(t)
select t).AsEnumerable();

foreach (var t in types)
options.DispatchOnly.Add(t);

return options;
}

public static MessageDispatcherOptions DispatchOnlyTo(this MessageDispatcherOptions options,
Func<IEnumerable<Type>> typesSelect)
{
foreach (var type in typesSelect().Where(t => typeof(IBaseRequest).IsAssignableFrom(t)))
options.DispatchOnly.Add(type);

return options;
}

public static MessageDispatcherOptions DenyDispatchTo(this MessageDispatcherOptions options,
Func<IEnumerable<Type>> typesSelect)
{
foreach (var type in typesSelect().Where(t => typeof(IBaseRequest).IsAssignableFrom(t)))
options.DispatchOnly.Add(type);

return options;
}

public static MessageDispatcherOptions DenyDispatchTo(this MessageDispatcherOptions options,
Func<IEnumerable<Assembly>> assemblySelect)
{
var types = (
from a in assemblySelect()
from t in a.GetTypes()
where typeof(IBaseRequest).IsAssignableFrom(t)
select t).AsEnumerable();

foreach (var t in types)
options.DontDispatch.Add(t);

return options;
}

/// <summary>
/// Resolves the arbitrer calls by adding the RequestManager as a hosted service to the specified service collection.
/// </summary>
Expand All @@ -47,12 +100,12 @@ public static string GetHash(this string input, HashAlgorithm hashAlgorithm)
{
byte[] data = hashAlgorithm.ComputeHash(Encoding.UTF8.GetBytes(input));
var sBuilder = new StringBuilder();

for (int i = 0; i < data.Length; i++)
{
sBuilder.Append(data[i].ToString("x2"));
}

return sBuilder.ToString();
}

Expand All @@ -72,7 +125,7 @@ public static string GetHash(this byte[] input, HashAlgorithm hashAlgorithm)
{
sBuilder.Append(data[i].ToString("x2"));
}

return sBuilder.ToString();
}
}
Expand Down
10 changes: 10 additions & 0 deletions Arbitrer.RabbitMQ/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ private void InitConnection()
}


public bool CanDispatch<TRequest>()
{
if (options.DispatchOnly.Count > 0)
return options.DispatchOnly.Contains(typeof(TRequest));

if(options.DontDispatch.Count >0)
return !options.DontDispatch.Contains(typeof(TRequest));

return true;
}
/// <summary>
/// Dispatches a request and waits for the response.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions Arbitrer.RabbitMQ/MessageDispatcherOptions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System;
using System.Collections.Generic;
using Newtonsoft.Json;

namespace Arbitrer.RabbitMQ
Expand Down Expand Up @@ -97,6 +99,9 @@ public class MessageDispatcherOptions

public bool UseRoundRobinNotificationDistribution { get; set; } = false;

public HashSet<Type> DispatchOnly { get; private set; } = new HashSet<Type>();
public HashSet<Type> DontDispatch { get; private set; } = new HashSet<Type>();

/// Represents the options for message dispatcher.
/// /
public MessageDispatcherOptions()
Expand Down
Loading

0 comments on commit 24728a3

Please sign in to comment.