Skip to content

Introduced DI #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ab10cc1
FS clean up
lsfera Jul 4, 2024
977f203
suppress CS1591 (Missing XML comment for publicly visible type or me…
lsfera Jul 4, 2024
ad74d3c
made UseTable() optional
lsfera Jul 4, 2024
b59d386
enforce AOT
lsfera Jul 4, 2024
71c1f14
rename IConsumes to IHandles
lsfera Jul 4, 2024
86656ec
Use double quote
lsfera Jul 4, 2024
438cf21
use ILKE. Replication slot name is forced to lcase
lsfera Jul 4, 2024
b24c01f
unused directive
lsfera Jul 4, 2024
b024c6f
remove static variables to enable multiple instances on same process
lsfera Jul 4, 2024
8cc1fed
Added DependencyIbjection project
lsfera Jul 4, 2024
380d06a
Added demo projrct for DI
lsfera Jul 4, 2024
acf5f4d
Enhanced logging
lsfera Jul 5, 2024
e918dfa
bumped version to 0.1.1
lsfera Jul 5, 2024
2055fed
allow tableDescriptor access
lsfera Jul 5, 2024
eadba73
renamed vars
lsfera Jul 5, 2024
c097047
expose NpgsqlDataSource along with connection string - close #16
lsfera Jul 15, 2024
ef9ac95
rename extension method
lsfera Jul 15, 2024
345cad6
move to files
lsfera Jul 15, 2024
645ec1a
simplified di registration
lsfera Jul 16, 2024
ca28054
collapse project
lsfera Jul 16, 2024
f4cfe0c
rename folder
lsfera Jul 16, 2024
f9bf48f
mark as implicit usage
lsfera Jul 17, 2024
7fff1b4
switch to prepared statement
lsfera Jul 17, 2024
f0c4160
dispose resources
lsfera Jul 17, 2024
8db4187
explicit defaults
lsfera Jul 17, 2024
c446721
add PublisherOptions
lsfera Jul 17, 2024
e71999b
Expose shortcut for table validation when bootstrapping publisher
lsfera Jul 17, 2024
2351af1
explain different available to publisher for validating table
lsfera Jul 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Blumchen.sln
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8A
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberWorker", "src\SubscriberWorker\SubscriberWorker.csproj", "{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -69,6 +71,10 @@ Global
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.Build.0 = Release|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -79,6 +85,7 @@ Global
{F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7}
{C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92} = {A4044484-FE08-4399-8239-14AABFA30AD7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr
```shell
docker-compose up
```
2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.
2. Run(order doesn't matter) Publisher and (Subscriber or SubscriberWorker) apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.

## Testing (against default docker instance)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIX: Let's merge it into the regular project. I'm fine with having hosting.abstractions and Polly dependency in the main project. It's much more accessible when user installs just a package and have all available (of course, unless that's enforcing too much from the peer dependency side).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've no strong opinion against it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lsfera, could you change that in the PR?

<VersionPrefix>0.1.1</VersionPrefix>
<TargetFramework>net8.0</TargetFramework>
<GenerateAssemblyTitleAttribute>true</GenerateAssemblyTitleAttribute>
<GenerateAssemblyDescriptionAttribute>true</GenerateAssemblyDescriptionAttribute>
<GenerateAssemblyProductAttribute>true</GenerateAssemblyProductAttribute>
<GenerateAssemblyCopyrightAttribute>false</GenerateAssemblyCopyrightAttribute>
<GenerateAssemblyVersionAttribute>true</GenerateAssemblyVersionAttribute>
<GenerateAssemblyFileVersionAttribute>true</GenerateAssemblyFileVersionAttribute>
<GenerateAssemblyInformationalVersionAttribute>true</GenerateAssemblyInformationalVersionAttribute>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<LangVersion>12.0</LangVersion>
<Authors>Oskar Dudycz</Authors>
<!-- <PackageIconUrl>https://github.com/event-driven-io/Blumchen/content/images/emblem.png</PackageIconUrl>-->
<PackageProjectUrl>https://github.com/event-driven-io/Blumchen</PackageProjectUrl>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<RepositoryUrl>https://github.com/event-driven-io/Blumchen.git</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<Product>Blumchen</Product>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<RootNamespace>Blumchen</RootNamespace>
<PublishAot>true</PublishAot>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1591</NoWarn>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<NoWarn>1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0">
<PrivateAssets>all</PrivateAssets>
<ExcludeAssets>none</ExcludeAssets>
<IncludeAssets>all</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Polly" Version="8.4.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Blumchen\Blumchen.csproj" />
</ItemGroup>

</Project>
19 changes: 17 additions & 2 deletions src/Blumchen/Blumchen.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<VersionPrefix>0.1.0</VersionPrefix>
<VersionPrefix>0.1.1</VersionPrefix>
<TargetFramework>net8.0</TargetFramework>
<GenerateAssemblyTitleAttribute>true</GenerateAssemblyTitleAttribute>
<GenerateAssemblyDescriptionAttribute>true</GenerateAssemblyDescriptionAttribute>
Expand All @@ -25,20 +25,35 @@
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<RootNamespace>Blumchen</RootNamespace>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1591</NoWarn>
<WarningsNotAsErrors></WarningsNotAsErrors>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<NoWarn>1591</NoWarn>
<WarningsNotAsErrors></WarningsNotAsErrors>
</PropertyGroup>
<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Tests</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Blumchen.DependencyInjection</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0" >
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0">
<PrivateAssets>all</PrivateAssets>
<ExcludeAssets>none</ExcludeAssets>
<IncludeAssets>all</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
<PackageReference Include="Npgsql" Version="8.0.3" />
<PackageReference Include="Polly.Core" Version="8.4.1" />
</ItemGroup>

</Project>
2 changes: 0 additions & 2 deletions src/Blumchen/Database/Run.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
using Blumchen.Subscriptions.ReplicationMessageHandlers;
using Npgsql;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

namespace Blumchen.Database;

public static class Run
Expand Down
23 changes: 23 additions & 0 deletions src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Blumchen.Subscriptions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

#pragma warning disable IL2091

namespace Blumchen.DependencyInjection;

public static class ServiceCollectionExtensions
{

public static IServiceCollection AddBlumchen<T>(
this IServiceCollection service,
Func<IServiceProvider, IWorkerOptionsBuilder, IWorkerOptionsBuilder> workerOptions)
where T : class, IHandler =>
service
.AddKeyedSingleton(typeof(T), (provider, _) => workerOptions(provider, new WorkerOptionsBuilder()).Build())
.AddHostedService(provider =>
new Worker<T>(workerOptions(provider, new WorkerOptionsBuilder()).Build(),
ServiceProviderServiceExtensions.GetRequiredService<ILogger<Worker<T>>>(provider)));


}
41 changes: 41 additions & 0 deletions src/Blumchen/DependencyInjection/Worker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Collections.Concurrent;
using Blumchen.Subscriptions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Blumchen.DependencyInjection;

public class Worker<T>(
WorkerOptions options,
ILogger<Worker<T>> logger): BackgroundService where T : class, IHandler
{
private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>";
private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> LoggingActions = new(StringComparer.OrdinalIgnoreCase);
private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters)
{
static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled) =>
(ll, enabled) switch
{
(LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters),
(LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters),
(_, _) => (_, _, _) => { }
};
LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await options.ResiliencePipeline.ExecuteAsync(async token =>
{
await using var subscription = new Subscription();
await using var cursor = subscription.Subscribe(options.SubscriptionOptions, ct: token)
.GetAsyncEnumerator(token);
Notify(logger, LogLevel.Information,"{WorkerName} started", WorkerName);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !token.IsCancellationRequested)
Notify(logger, LogLevel.Debug, "{cursor.Current} processed", cursor.Current);

}, stoppingToken).ConfigureAwait(false);
Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
}

}
37 changes: 37 additions & 0 deletions src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Blumchen.Subscriptions;
using Polly;

namespace Blumchen.DependencyInjection;

public record WorkerOptions(ResiliencePipeline ResiliencePipeline, ISubscriptionOptions SubscriptionOptions);

public interface IWorkerOptionsBuilder
{
IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline);
IWorkerOptionsBuilder Subscription(Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder>? builder);
WorkerOptions Build();
}

internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder
{
private ResiliencePipeline? _resiliencePipeline = default;
private Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder>? _builder;

public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline)
{
_resiliencePipeline = resiliencePipeline;
return this;
}public IWorkerOptionsBuilder Subscription(Func<SubscriptionOptionsBuilder, SubscriptionOptionsBuilder>? builder)
{
_builder = builder;
return this;
}

public WorkerOptions Build()
{
ArgumentNullException.ThrowIfNull(_resiliencePipeline);
ArgumentNullException.ThrowIfNull(_builder);
return new(_resiliencePipeline, _builder(new SubscriptionOptionsBuilder()).Build());
}
}

5 changes: 4 additions & 1 deletion src/Blumchen/MessageTableOptions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using Blumchen.Subscriptions;
using JetBrains.Annotations;
using NpgsqlTypes;

namespace Blumchen;

#pragma warning disable CS1591
public record TableDescriptorBuilder
{
private MessageTable TableDescriptor { get; set; } = new();
Expand Down Expand Up @@ -34,6 +34,9 @@ public TableDescriptorBuilder MessageType(string name, int dimension = 250)
return this;
}

[UsedImplicitly]
public TableDescriptorBuilder UseDefaults() => this;

public record MessageTable(string Name = MessageTable.DefaultName)
{
internal const string DefaultName = "outbox";
Expand Down
25 changes: 13 additions & 12 deletions src/Blumchen/Publications/MessageAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
using Npgsql;

namespace Blumchen.Publications;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public static class MessageAppender
{
public static async Task AppendAsync<T>(T @input
, (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) resolver
, PublisherOptions resolver
, NpgsqlConnection connection
, NpgsqlTransaction transaction
, CancellationToken ct
Expand All @@ -19,10 +18,10 @@ public static async Task AppendAsync<T>(T @input
case null:
throw new ArgumentNullException(nameof(@input));
case IEnumerable inputs:
await AppendBatchAsyncOfT(inputs, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
await AppendBatchAsyncOfT(inputs, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
break;
default:
await AppendAsyncOfT(input, resolver.tableDescriptor, resolver.jsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
await AppendAsyncOfT(input, resolver.TableDescriptor, resolver.JsonTypeResolver, connection, transaction, ct).ConfigureAwait(false);
break;
}
}
Expand All @@ -36,30 +35,32 @@ private static async Task AppendAsyncOfT<T>(T input
{
var (typeName, jsonTypeInfo) = typeResolver.Resolve(typeof(T));
var data = JsonSerialization.ToJson(@input, jsonTypeInfo);
var command = new NpgsqlCommand(

await using var command = new NpgsqlCommand(
$"INSERT INTO {tableDescriptor.Name}({tableDescriptor.MessageType.Name}, {tableDescriptor.Data.Name}) values ('{typeName}', '{data}')",
connection,
transaction
);
await command.PrepareAsync(ct).ConfigureAwait(false);
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

public static async Task AppendAsync<T>(T input
, (TableDescriptorBuilder.MessageTable tableDescriptor, IJsonTypeResolver resolver) options
, PublisherOptions options
, string connectionString
, CancellationToken ct)
where T: class
{
var type = typeof(T);
var (typeName, jsonTypeInfo) = options.resolver.Resolve(type);
var (typeName, jsonTypeInfo) = options.JsonTypeResolver.Resolve(type);
var data = JsonSerialization.ToJson(input, jsonTypeInfo);

var connection = new NpgsqlConnection(connectionString);
await using var connection1 = connection.ConfigureAwait(false);
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync(ct).ConfigureAwait(false);
var command = connection.CreateCommand();
await using var command = connection.CreateCommand();
command.CommandText =
$"INSERT INTO {options.tableDescriptor.Name}({options.tableDescriptor.MessageType.Name}, {options.tableDescriptor.Data.Name}) values ('{typeName}', '{data}')";
$"INSERT INTO {options.TableDescriptor.Name}({options.TableDescriptor.MessageType.Name}, {options.TableDescriptor.Data.Name}) values ('{typeName}', '{data}')";
await command.PrepareAsync(ct).ConfigureAwait(false);
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

Expand All @@ -70,7 +71,7 @@ private static async Task AppendBatchAsyncOfT<T>(T inputs
, NpgsqlTransaction transaction
, CancellationToken ct) where T : class, IEnumerable
{
var batch = new NpgsqlBatch(connection, transaction);
await using var batch = new NpgsqlBatch(connection, transaction);
foreach (var input in inputs)
{
var (typeName, jsonTypeInfo) = resolver.Resolve(input.GetType());
Expand Down
20 changes: 20 additions & 0 deletions src/Blumchen/Publications/PublisherOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Blumchen.Database;
using Blumchen.Serialization;
using Npgsql;

namespace Blumchen.Publications;

public record PublisherOptions(TableDescriptorBuilder.MessageTable TableDescriptor, IJsonTypeResolver JsonTypeResolver);

public static class PublisherOptionsExtensions
{
public static async Task<PublisherOptions> EnsureTable(this PublisherOptions publisherOptions, NpgsqlDataSource dataSource, CancellationToken ct)
{
await dataSource.EnsureTableExists(publisherOptions.TableDescriptor, ct);
return publisherOptions;
}

public static Task<PublisherOptions> EnsureTable(this PublisherOptions publisherOptions,
string connectionString, CancellationToken ct)
=> EnsureTable(publisherOptions, new NpgsqlDataSourceBuilder(connectionString).Build(), ct);
}
5 changes: 2 additions & 3 deletions src/Blumchen/Publications/PublisherSetupOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

namespace Blumchen.Publications;

#pragma warning disable CS1591
public class PublisherSetupOptionsBuilder
{
private INamingPolicy? _namingPolicy;
Expand Down Expand Up @@ -34,7 +33,7 @@ public PublisherSetupOptionsBuilder WithTable(Func<TableDescriptorBuilder, Table
return this;
}

public (MessageTable tableDescriptor, IJsonTypeResolver jsonTypeResolver) Build()
public PublisherOptions Build()
{
ArgumentNullException.ThrowIfNull(_jsonSerializerContext);
ArgumentNullException.ThrowIfNull(_namingPolicy);
Expand All @@ -49,6 +48,6 @@ public PublisherSetupOptionsBuilder WithTable(Func<TableDescriptorBuilder, Table
while (typeEnum.MoveNext())
jsonTypeResolver.WhiteList(typeEnum.Current);

return (_tableDescriptor,jsonTypeResolver);
return new(_tableDescriptor,jsonTypeResolver);
}
}
1 change: 0 additions & 1 deletion src/Blumchen/Serialization/IDictionaryExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
namespace Blumchen.Serialization;


Expand Down
Loading
Loading