Skip to content

Commit

Permalink
rename IConsumes to IHandles
Browse files Browse the repository at this point in the history
  • Loading branch information
lsfera committed Jul 4, 2024
1 parent b59d386 commit 71c1f14
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 29 deletions.
8 changes: 0 additions & 8 deletions src/Blumchen/Subscriptions/IConsume.cs

This file was deleted.

8 changes: 8 additions & 0 deletions src/Blumchen/Subscriptions/IHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Blumchen.Subscriptions;

public interface IHandler;

public interface IHandler<in T>: IHandler where T : class
{
Task Handle(T value);
}
4 changes: 2 additions & 2 deletions src/Blumchen/Subscriptions/ISubscriptionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void Deconstruct(
out ReplicationSlotSetupOptions replicationSlotSetupOptions,
out IErrorProcessor errorProcessor,
out IReplicationDataMapper dataMapper,
out Dictionary<Type, IConsume> registry);
out Dictionary<Type, IHandler> registry);
}

internal record SubscriptionOptions(
Expand All @@ -28,4 +28,4 @@ internal record SubscriptionOptions(
ReplicationSlotSetupOptions ReplicationOptions,
IErrorProcessor ErrorProcessor,
IReplicationDataMapper DataMapper,
Dictionary<Type, IConsume> Registry): ISubscriptionOptions;
Dictionary<Type, IHandler> Registry): ISubscriptionOptions;
12 changes: 6 additions & 6 deletions src/Blumchen/Subscriptions/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public async IAsyncEnumerable<IEnvelope> Subscribe(

private static async IAsyncEnumerable<T> ProcessEnvelope<T>(
IEnvelope envelope,
Dictionary<Type, IConsume> registry,
Dictionary<Type, IHandler> registry,
IErrorProcessor errorProcessor
) where T:class
{
Expand All @@ -109,22 +109,22 @@ IErrorProcessor errorProcessor
}
}

private static readonly Dictionary<Type, (IConsume consumer, MethodInfo methodInfo)> Cache = [];
private static readonly Dictionary<Type, (IHandler consumer, MethodInfo methodInfo)> Cache = [];


private static (IConsume consumer, MethodInfo methodInfo) Memoize
private static (IHandler consumer, MethodInfo methodInfo) Memoize
(
Dictionary<Type, IConsume> registry,
Dictionary<Type, IHandler> registry,
Type objType,
Func<Dictionary<Type, IConsume>, Type, (IConsume consumer, MethodInfo methodInfo)> func
Func<Dictionary<Type, IHandler>, Type, (IHandler consumer, MethodInfo methodInfo)> func
)
{
if (!Cache.TryGetValue(objType, out var entry))
entry = func(registry, objType);
Cache[objType] = entry;
return entry;
}
private static (IConsume consumer, MethodInfo methodInfo) Consumer(Dictionary<Type, IConsume> registry, Type objType)
private static (IHandler consumer, MethodInfo methodInfo) Consumer(Dictionary<Type, IHandler> registry, Type objType)
{
var consumer = registry[objType] ?? throw new NotSupportedException($"Unregistered type for {objType.AssemblyQualifiedName}");
var methodInfos = consumer.GetType().GetMethods(BindingFlags.Instance|BindingFlags.Public);
Expand Down
10 changes: 5 additions & 5 deletions src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public sealed class SubscriptionOptionsBuilder
private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions;
private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions;
private static IReplicationDataMapper? _dataMapper;
private readonly Dictionary<Type, IConsume> _registry = [];
private readonly Dictionary<Type, IHandler> _registry = [];
private IErrorProcessor? _errorProcessor;
private INamingPolicy? _namingPolicy;
private JsonSerializerContext? _jsonSerializerContext;
Expand Down Expand Up @@ -74,10 +74,10 @@ public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManageme
}

[UsedImplicitly]
public SubscriptionOptionsBuilder Consumes<T, TU>(TU consumer) where T : class
where TU : class, IConsumes<T>
public SubscriptionOptionsBuilder Handles<T, TU>(TU handler) where T : class
where TU : class, IHandler<T>
{
_registry.TryAdd(typeof(T), consumer);
_registry.TryAdd(typeof(T), handler);
return this;
}

Expand Down Expand Up @@ -119,7 +119,7 @@ static void Ensure(Func<IEnumerable<Type>> evalFn, string formattedMsg)
}
}

public class ObjectTracingConsumer: IConsumes<object>
public class ObjectTracingConsumer: IHandler<object>
{
private static ulong _counter = 0;
public Task Handle(object value)
Expand Down
8 changes: 4 additions & 4 deletions src/Subscriber/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
)
.NamingPolicy(new AttributeNamingPolicy())
.JsonContext(SourceGenerationContext.Default)
.Consumes<UserCreatedContract, Consumer>(consumer)
.Consumes<UserDeletedContract, Consumer>(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct
.Handles<UserCreatedContract, Consumer>(consumer)
.Handles<UserDeletedContract, Consumer>(consumer), LoggerFactory.Create(builder => builder.AddConsole()), ct
).GetAsyncEnumerator(ct);
await using var cursor1 = cursor.ConfigureAwait(false);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested);
Expand All @@ -46,8 +46,8 @@
namespace Subscriber
{
internal class Consumer:
IConsumes<UserCreatedContract>,
IConsumes<UserDeletedContract>
IHandler<UserCreatedContract>,
IHandler<UserDeletedContract>
{
public Task Handle(UserCreatedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserCreatedContract));
public Task Handle(UserDeletedContract value) => Console.Out.WriteLineAsync(JsonSerialization.ToJson(value, SourceGenerationContext.Default.UserDeletedContract));
Expand Down
8 changes: 4 additions & 4 deletions src/Tests/DatabaseFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public abstract class DatabaseFixture(ITestOutputHelper output): IAsyncLifetime
{
protected ITestOutputHelper Output { get; } = output;
protected readonly Func<CancellationTokenSource> TimeoutTokenSource = () => new(Debugger.IsAttached ? TimeSpan.FromHours(1) : TimeSpan.FromSeconds(2));
protected class TestConsumer<T>(Action<string> log, JsonTypeInfo info): IConsumes<T> where T : class
protected class TestHandler<T>(Action<string> log, JsonTypeInfo info): IHandler<T> where T : class
{
public async Task Handle(T value)
{
Expand Down Expand Up @@ -67,7 +67,7 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri
await command.ExecuteNonQueryAsync(ct);
}

protected (TestConsumer<T> consumer, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor<T>(
protected (TestHandler<T> handler, SubscriptionOptionsBuilder subscriptionOptionsBuilder) SetupFor<T>(
string connectionString,
string eventsTable,
JsonSerializerContext info,
Expand All @@ -78,13 +78,13 @@ protected static async Task InsertPoisoningMessage(string connectionString, stri
{
var jsonTypeInfo = info.GetTypeInfo(typeof(T));
ArgumentNullException.ThrowIfNull(jsonTypeInfo);
var consumer = new TestConsumer<T>(log, jsonTypeInfo);
var consumer = new TestHandler<T>(log, jsonTypeInfo);
var subscriptionOptionsBuilder = new SubscriptionOptionsBuilder()
.WithErrorProcessor(new TestOutErrorProcessor(Output))
.ConnectionString(connectionString)
.JsonContext(info)
.NamingPolicy(namingPolicy)
.Consumes<T, TestConsumer<T>>(consumer)
.Handles<T, TestHandler<T>>(consumer)
.WithTable(o => o.Name(eventsTable))
.WithPublicationOptions(
new PublicationManagement.PublicationSetupOptions(PublicationName: publicationName ?? Randomise("events_pub"))
Expand Down

0 comments on commit 71c1f14

Please sign in to comment.