Skip to content

Commit

Permalink
simplified di registration
Browse files Browse the repository at this point in the history
  • Loading branch information
lsfera committed Jul 16, 2024
1 parent 345cad6 commit 645ec1a
Show file tree
Hide file tree
Showing 16 changed files with 186 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0" >
<PrivateAssets>all</PrivateAssets>
<ExcludeAssets>none</ExcludeAssets>
<IncludeAssets>all</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Polly" Version="8.4.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
using Blumchen.Subscriptions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Polly;

#pragma warning disable IL2091

namespace Blumchen.Workers;

public static class ServiceCollectionExtensions
{
public static IServiceCollection AddBlumchen<T,TU>(this IServiceCollection service, T? instance = default)
where T : Worker<TU> where TU : class =>
instance is null
? service.AddHostedService<T>()
: service.AddHostedService(_=>instance);

public static IServiceCollection AddBlumchen<T>(
this IServiceCollection service,
Func<IServiceProvider, IWorkerOptionsBuilder, IWorkerOptionsBuilder> workerOptions)
where T : class, IHandler =>
service
.AddKeyedSingleton(typeof(T), (provider, _) => workerOptions(provider, new WorkerOptionsBuilder()).Build())
.AddHostedService(provider =>
new Worker<T>(workerOptions(provider, new WorkerOptionsBuilder()).Build(),
provider.GetRequiredService<ILogger<Worker<T>>>()));


}
38 changes: 6 additions & 32 deletions src/Blumchen.DependencyInjection/Workers/Worker.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,13 @@
using System.Collections.Concurrent;
using System.Text.Json.Serialization;
using Blumchen.Serialization;
using Blumchen.Subscriptions;
using Blumchen.Subscriptions.Management;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Npgsql;
using Polly;


namespace Blumchen.Workers;

public abstract class Worker<T>(
NpgsqlDataSource dataSource,
string connectionString,
IHandler<T> handler,
JsonSerializerContext jsonSerializerContext,
IErrorProcessor errorProcessor,
ResiliencePipeline pipeline,
INamingPolicy namingPolicy,
PublicationManagement.PublicationSetupOptions publicationSetupOptions,
ReplicationSlotManagement.ReplicationSlotSetupOptions replicationSlotSetupOptions,
Func<TableDescriptorBuilder,TableDescriptorBuilder> tableDescriptorBuilder,
ILogger logger): BackgroundService where T : class
public class Worker<T>(
WorkerOptions options,
ILogger<Worker<T>> logger): BackgroundService where T : class, IHandler
{
private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>";
private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> LoggingActions = new(StringComparer.OrdinalIgnoreCase);
Expand All @@ -40,28 +25,17 @@ static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled)

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await pipeline.ExecuteAsync(async token =>
await options.ResiliencePipeline.ExecuteAsync(async token =>
{
await using var subscription = new Subscription();
await using var cursor = subscription.Subscribe(builder =>
builder
.DataSource(dataSource)
.ConnectionString(connectionString)
.WithTable(tableDescriptorBuilder)
.WithErrorProcessor(errorProcessor)
.Consumes<T, IHandler<T>>(handler)
.NamingPolicy(namingPolicy)
.JsonContext(jsonSerializerContext)
.WithPublicationOptions(publicationSetupOptions)
.WithReplicationOptions(replicationSlotSetupOptions)
, ct: token).GetAsyncEnumerator(token);
await using var cursor = subscription.Subscribe(options.SubscriptionOptions, ct: token)
.GetAsyncEnumerator(token);
Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested)
Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);
}, stoppingToken).ConfigureAwait(false);
Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
return;
}

}
37 changes: 37 additions & 0 deletions src/Blumchen.DependencyInjection/Workers/WorkerOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Blumchen.Subscriptions;
using Polly;

namespace Blumchen.Workers;

public record WorkerOptions(ResiliencePipeline ResiliencePipeline, ISubscriptionOptions SubscriptionOptions);

public interface IWorkerOptionsBuilder
{
IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline);
IWorkerOptionsBuilder Subscription(Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder>? builder);
WorkerOptions Build();
}

internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder
{
private ResiliencePipeline? _resiliencePipeline = default;
private Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder>? _builder;

public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline)
{
_resiliencePipeline = resiliencePipeline;
return this;
}public IWorkerOptionsBuilder Subscription(Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder>? builder)
{
_builder = builder;
return this;
}

public WorkerOptions Build()
{
ArgumentNullException.ThrowIfNull(_resiliencePipeline);
ArgumentNullException.ThrowIfNull(_builder);
return new(_resiliencePipeline, _builder(new SubscriptionOptionsBuilder()).Build());
}
}

3 changes: 3 additions & 0 deletions src/Blumchen/Blumchen.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Tests</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Blumchen.DependencyInjection</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Blumchen/Subscriptions/ISubscriptionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Blumchen.Subscriptions;

internal interface ISubscriptionOptions
public interface ISubscriptionOptions
{
[UsedImplicitly] NpgsqlDataSource DataSource { get; }
[UsedImplicitly] NpgsqlConnectionStringBuilder ConnectionStringBuilder { get; }
Expand Down
20 changes: 14 additions & 6 deletions src/Blumchen/Subscriptions/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Blumchen.Subscriptions.Management;
using Blumchen.Subscriptions.ReplicationMessageHandlers;
using Blumchen.Subscriptions.SnapshotReader;
using Microsoft.Extensions.Logging;
using Npgsql;
using Npgsql.Replication;
using Npgsql.Replication.PgOutput;
Expand Down Expand Up @@ -33,9 +32,18 @@ public async IAsyncEnumerable<IEnvelope> Subscribe(
[EnumeratorCancellation] CancellationToken ct = default
)
{
_options = builder(_builder).Build();
var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = _options;

await foreach (var _ in Subscribe(builder(_builder).Build(), ct))
yield return _;
}

internal async IAsyncEnumerable<IEnvelope> Subscribe(
ISubscriptionOptions subscriptionOptions,
[EnumeratorCancellation] CancellationToken ct = default
)
{
_options = subscriptionOptions;
var (dataSource, connectionStringBuilder, publicationSetupOptions, replicationSlotSetupOptions, errorProcessor, replicationDataMapper, registry) = subscriptionOptions;

await dataSource.EnsureTableExists(publicationSetupOptions.TableDescriptor, ct).ConfigureAwait(false);

_connection = new LogicalReplicationConnection(connectionStringBuilder.ConnectionString);
Expand All @@ -60,8 +68,8 @@ public async IAsyncEnumerable<IEnvelope> Subscribe(
);

await foreach (var envelope in ReadExistingRowsFromSnapshot(dataSource, created.SnapshotName, _options, ct).ConfigureAwait(false))
await foreach (var subscribe in ProcessEnvelope<IEnvelope>(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false))
yield return subscribe;
await foreach (var subscribe in ProcessEnvelope<IEnvelope>(envelope, registry, errorProcessor).WithCancellation(ct).ConfigureAwait(false))
yield return subscribe;
}

await foreach (var message in
Expand Down
31 changes: 10 additions & 21 deletions src/Blumchen/Subscriptions/SubscriptionOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,23 @@ namespace Blumchen.Subscriptions;

public sealed class SubscriptionOptionsBuilder
{
private static NpgsqlConnectionStringBuilder? _connectionStringBuilder;
private static NpgsqlDataSource? _dataSource;
private static PublicationManagement.PublicationSetupOptions _publicationSetupOptions;
private static ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions;
private static IReplicationDataMapper? _dataMapper;
private NpgsqlConnectionStringBuilder? _connectionStringBuilder;
private NpgsqlDataSource? _dataSource;
private PublicationManagement.PublicationSetupOptions _publicationSetupOptions = new();
private ReplicationSlotManagement.ReplicationSlotSetupOptions? _replicationSlotSetupOptions;
private IReplicationDataMapper? _dataMapper;
private readonly Dictionary<Type, IHandler> _registry = [];
private IErrorProcessor? _errorProcessor;
private INamingPolicy? _namingPolicy;
private JsonSerializerContext? _jsonSerializerContext;
private static readonly TableDescriptorBuilder TableDescriptorBuilder = new();
private readonly TableDescriptorBuilder _tableDescriptorBuilder = new();
private TableDescriptorBuilder.MessageTable? _messageTable;


static SubscriptionOptionsBuilder()
{
_connectionStringBuilder = default;
_publicationSetupOptions = new();
_replicationSlotSetupOptions = default;
_dataMapper = default;
}


[UsedImplicitly]
public SubscriptionOptionsBuilder WithTable(
Func<TableDescriptorBuilder, TableDescriptorBuilder> builder)
{
_messageTable = builder(TableDescriptorBuilder).Build();
_messageTable = builder(_tableDescriptorBuilder).Build();
return this;
}

Expand Down Expand Up @@ -83,8 +73,7 @@ public SubscriptionOptionsBuilder WithReplicationOptions(ReplicationSlotManageme
}

[UsedImplicitly]
public SubscriptionOptionsBuilder Consumes<T, TU>(TU handler) where T : class
where TU : class, IHandler<T>
public SubscriptionOptionsBuilder Consumes<T>(IHandler<T> handler) where T : class
{
_registry.TryAdd(typeof(T), handler);
return this;
Expand All @@ -99,7 +88,7 @@ public SubscriptionOptionsBuilder WithErrorProcessor(IErrorProcessor? errorProce

internal ISubscriptionOptions Build()
{
_messageTable ??= TableDescriptorBuilder.Build();
_messageTable ??= _tableDescriptorBuilder.Build();
ArgumentNullException.ThrowIfNull(_connectionStringBuilder);
ArgumentNullException.ThrowIfNull(_dataSource);
ArgumentNullException.ThrowIfNull(_jsonSerializerContext);
Expand All @@ -121,11 +110,11 @@ internal ISubscriptionOptions Build()
_errorProcessor ?? new ConsoleOutErrorProcessor(),
_dataMapper,
_registry);

static void Ensure(Func<IEnumerable<Type>> evalFn, string formattedMsg)
{
var misses = evalFn().ToArray();
if (misses.Length > 0) throw new Exception(string.Format(formattedMsg, string.Join(", ", misses.Select(t => $"'{t.Name}'"))));
}

}
}
8 changes: 7 additions & 1 deletion src/Publisher/Contracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ internal record UserDeleted(
string Name = "Deleted"
): IContract;

[MessageUrn("user-modified:v1")] //subscription ignored
[MessageUrn("user-modified:v1")]
internal record UserModified(
Guid Id,
string Name = "Modified"
): IContract;

[MessageUrn("user-subscribed:v1")]
internal record UserSubscribed(
Guid Id,
string Name = "Subscribed"
): IContract;

[JsonSourceGenerationOptions(WriteIndented = true)]
[JsonSerializable(typeof(UserCreated))]
[JsonSerializable(typeof(UserDeleted))]
[JsonSerializable(typeof(UserModified))]
[JsonSerializable(typeof(UserSubscribed))]
internal partial class SourceGenerationContext: JsonSerializerContext;
14 changes: 10 additions & 4 deletions src/Publisher/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using UserCreated = Publisher.UserCreated;
using UserDeleted = Publisher.UserDeleted;
using UserModified = Publisher.UserModified;
using UserSubscribed = Publisher.UserSubscribed;

Console.Title = typeof(Program).Assembly.GetName().Name!;
Console.WriteLine("How many messages do you want to publish?(press CTRL+C to exit):");
Expand All @@ -22,23 +23,25 @@
if (line != null && int.TryParse(line, out var result))
{
var cts = new CancellationTokenSource();
var messages = result / 3;
var messages = result / 4;
var ct = cts.Token;
var connection = new NpgsqlConnection(Settings.ConnectionString);
await using var connection1 = connection.ConfigureAwait(false);
await connection.OpenAsync(ct).ConfigureAwait(false);
//use a command for each message
{
var @events = Enumerable.Range(0, result).Select(i =>
(i % 3) switch
(i % 4) switch
{
0 => new UserCreated(Guid.NewGuid()) as object,
1 => new UserDeleted(Guid.NewGuid()),
_ => new UserModified(Guid.NewGuid())
2 => new UserModified(Guid.NewGuid()),
_ => new UserSubscribed(Guid.NewGuid())
});
await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 0) ? 1 : 0)} {nameof(UserCreated)}");
await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 1) ? 1 : 0)} {nameof(UserDeleted)}");
await Console.Out.WriteLineAsync($"Publishing {messages} {nameof(UserModified)}");
await Console.Out.WriteLineAsync($"Publishing {messages + ((result % 3 > 2) ? 1 : 0)} {nameof(UserModified)}");
await Console.Out.WriteLineAsync($"Publishing {messages} {nameof(UserSubscribed)}");
foreach (var @event in @events)
{
var transaction = await connection.BeginTransactionAsync(ct).ConfigureAwait(false);
Expand All @@ -55,6 +58,9 @@
case UserModified m:
await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false);
break;
case UserSubscribed m:
await MessageAppender.AppendAsync(m, resolver, connection, transaction, ct).ConfigureAwait(false);
break;
}

await transaction.CommitAsync(ct).ConfigureAwait(false);
Expand Down
4 changes: 2 additions & 2 deletions src/Subscriber/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
)
.NamingPolicy(new AttributeNamingPolicy())
.JsonContext(SourceGenerationContext.Default)
.Consumes<UserCreatedContract, Consumer>(consumer)
.Consumes<UserDeletedContract, Consumer>(consumer), ct:ct
.Consumes<UserCreatedContract>(consumer)
.Consumes<UserDeletedContract>(consumer), ct:ct
).GetAsyncEnumerator(ct);
await using var cursor1 = cursor.ConfigureAwait(false);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested);
Expand Down
7 changes: 7 additions & 0 deletions src/SubscriberWorker/Contracts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@ public record UserDeletedContract(
string Name
);

[MessageUrn("user-modified:v1")] //subscription ignored
public record UserModifiedContract(
Guid Id,
string Name = "Modified"
);

[JsonSourceGenerationOptions(WriteIndented = true)]
[JsonSerializable(typeof(UserCreatedContract))]
[JsonSerializable(typeof(UserDeletedContract))]
[JsonSerializable(typeof(UserModifiedContract))]
internal partial class SourceGenerationContext: JsonSerializerContext;
}
Loading

0 comments on commit 645ec1a

Please sign in to comment.