Skip to content

Commit

Permalink
Added demo projrct for DI
Browse files Browse the repository at this point in the history
  • Loading branch information
lsfera committed Jul 5, 2024
1 parent 8cc1fed commit 380d06a
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Blumchen.sln
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-F
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Blumchen.DependencyInjection", "src\Blumchen.DependencyInjection\Blumchen.DependencyInjection.csproj", "{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberWorker", "src\SubscriberWorker\SubscriberWorker.csproj", "{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -75,6 +77,10 @@ Global
{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A07167E3-4CF7-40EF-8E55-A37A0F57B89D}.Release|Any CPU.Build.0 = Release|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -85,6 +91,7 @@ Global
{F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7}
{C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92} = {A4044484-FE08-4399-8239-14AABFA30AD7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr
```shell
docker-compose up
```
2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.
2. Run(order doesn't matter) Publisher and (Subscriber or SubscriberWorker) apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.

## Testing (against default docker instance)

Expand Down
22 changes: 22 additions & 0 deletions src/SubscriberWorker/Contracts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.Text.Json.Serialization;
using Blumchen.Serialization;

namespace SubscriberWorker
{
[MessageUrn("user-created:v1")]
public record UserCreatedContract(
Guid Id,
string Name
);

[MessageUrn("user-deleted:v1")]
public record UserDeletedContract(
Guid Id,
string Name
);

[JsonSourceGenerationOptions(WriteIndented = true)]
[JsonSerializable(typeof(UserCreatedContract))]
[JsonSerializable(typeof(UserDeletedContract))]
internal partial class SourceGenerationContext: JsonSerializerContext;
}
25 changes: 25 additions & 0 deletions src/SubscriberWorker/Handler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Blumchen.Subscriptions;
using Microsoft.Extensions.Logging;
#pragma warning disable CS9113 // Parameter is unread.

namespace SubscriberWorker;


public class Handler<T>(ILoggerFactory loggerFactory): IHandler<T> where T : class
{
private readonly ILogger _logger = loggerFactory.CreateLogger<Handler<T>>();
private Task ReportSuccess(int count)
{
if(_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug($"Read #{count} messages {typeof(T).FullName}");
return Task.CompletedTask;
}

private int _counter;
private int _completed;
public Task Handle(T value)
=> Interlocked.Increment(ref _counter) % 10 == 0
//Simulating some exception on out of process dependencies
? Task.FromException(new Exception($"Error on publishing {nameof(T)}"))
: ReportSuccess(Interlocked.Increment(ref _completed));
}
59 changes: 59 additions & 0 deletions src/SubscriberWorker/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System.Text.Json.Serialization;
using Blumchen.Configuration;
using Blumchen.Serialization;
using Blumchen.Subscriptions;
using Blumchen.Workers;
using Commons;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Polly.Retry;
using Polly;
using SubscriberWorker;


#pragma warning disable CS8601 // Possible null reference assignment.
Console.Title = typeof(Program).Assembly.GetName().Name;
#pragma warning restore CS8601 // Possible null reference assignment.



AppDomain.CurrentDomain.UnhandledException += (_, e) => Console.Out.WriteLine(e.ExceptionObject.ToString());
TaskScheduler.UnobservedTaskException += (_, e) => Console.Out.WriteLine(e.Exception.ToString());

var cancellationTokenSource = new CancellationTokenSource();
var builder = Host.CreateApplicationBuilder(args);

builder.Services
.AddBlumchen<SubscriberWorker<UserCreatedContract>, UserCreatedContract>()
.AddSingleton<IHandler<UserCreatedContract>, Handler<UserCreatedContract>>()
.AddBlumchen<SubscriberWorker<UserDeletedContract>, UserDeletedContract>()
.AddSingleton<IHandler<UserDeletedContract>, Handler<UserDeletedContract>>()

.AddSingleton<INamingPolicy, AttributeNamingPolicy>()
.AddSingleton<IErrorProcessor, ConsoleOutErrorProcessor>()
.AddSingleton<JsonSerializerContext, SourceGenerationContext>()
.AddSingleton(new DatabaseOptions(Settings.ConnectionString))
.AddResiliencePipeline("default",(pipelineBuilder,context) =>
pipelineBuilder
.AddRetry(new RetryStrategyOptions
{
BackoffType = DelayBackoffType.Constant,
Delay = TimeSpan.FromSeconds(5),
MaxRetryAttempts = int.MaxValue
}).Build())
.AddLogging(loggingBuilder =>
{
loggingBuilder
.AddFilter("Microsoft", LogLevel.Warning)
.AddFilter("System", LogLevel.Warning)
.AddFilter("Npgsql", LogLevel.Information)
.AddFilter("Blumchen", LogLevel.Debug)
.AddFilter("SubscriberWorker", LogLevel.Debug)
.AddSimpleConsole();
});

await builder
.Build()
.RunAsync(cancellationTokenSource.Token)
.ConfigureAwait(false);
28 changes: 28 additions & 0 deletions src/SubscriberWorker/SubscriberWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Text.Json.Serialization;
using Blumchen.Configuration;
using Blumchen.Serialization;
using Blumchen.Subscriptions;
using Blumchen.Subscriptions.Management;
using Blumchen.Workers;
using Microsoft.Extensions.Logging;
using Polly.Registry;
// ReSharper disable ClassNeverInstantiated.Global

namespace SubscriberWorker;
public class SubscriberWorker<T>(
DatabaseOptions databaseOptions,
IHandler<T> handler,
JsonSerializerContext jsonSerializerContext,
ResiliencePipelineProvider<string> pipelineProvider,
INamingPolicy namingPolicy,
IErrorProcessor errorProcessor,
ILoggerFactory loggerFactory
): Worker<T>(databaseOptions
, handler
, jsonSerializerContext
, errorProcessor
, pipelineProvider.GetPipeline("default")
, namingPolicy
, new PublicationManagement.PublicationSetupOptions($"{typeof(T).Name}_pub")
, new ReplicationSlotManagement.ReplicationSlotSetupOptions($"{typeof(T).Name}_slot")
, loggerFactory) where T : class;
23 changes: 23 additions & 0 deletions src/SubscriberWorker/SubscriberWorker.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PublishAot>true</PublishAot>
<InvariantGlobalization>true</InvariantGlobalization>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Polly.Extensions" Version="8.4.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Blumchen.DependencyInjection\Blumchen.DependencyInjection.csproj" />
<ProjectReference Include="..\Commons\Commons.csproj" />
</ItemGroup>

</Project>

0 comments on commit 380d06a

Please sign in to comment.