Skip to content

Commit 71f6e07

Browse files
authored
refactor: Make processor generic (#22)
1 parent 38f502d commit 71f6e07

33 files changed

+485
-122
lines changed

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
11
# underground
22

3-
work in progress...
3+
work in progress...
4+
5+
## Example
6+
7+
Run example:
8+
9+
```bash
10+
dotnet run --project example/ConsoleApp/
11+
```

example/ConsoleApp/AppDbContext.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
namespace ConsoleApp;
66

7-
sealed class AppDbContext(DbContextOptions<AppDbContext> options) : DbContext(options), IOutboxDbContext
7+
sealed class AppDbContext(DbContextOptions<AppDbContext> options) : DbContext(options), IOutboxDbContext, IInboxDbContext
88
{
99
public DbSet<OutboxMessage> OutboxMessages { get; set; }
10+
public DbSet<InboxMessage> InboxMessages { get; set; }
1011

1112
// example on how to apply custom configuration to the outbox model
1213
protected override void OnModelCreating(ModelBuilder modelBuilder)

example/ConsoleApp/ExampleMessageHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ public class ExampleMessageHandler : IOutboxMessageHandler<ExampleMessage>
77
{
88
public Task HandleAsync(ExampleMessage message, CancellationToken cancellationToken)
99
{
10-
Console.WriteLine("received: " + message.Id);
10+
Console.WriteLine("received outbox: " + message.Id);
1111
return Task.CompletedTask;
1212
}
1313
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
2+
using Underground.Outbox;
3+
4+
namespace ConsoleApp;
5+
6+
public class InboxMessageHandler : IInboxMessageHandler<ExampleMessage>
7+
{
8+
public Task HandleAsync(ExampleMessage message, CancellationToken cancellationToken)
9+
{
10+
Console.WriteLine("received inbox: " + message.Id);
11+
return Task.CompletedTask;
12+
}
13+
}

example/ConsoleApp/Program.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,15 @@
2626
{
2727
cfg.AddHandler<ExampleMessageHandler>();
2828
});
29+
builder.Services.AddInboxServices<AppDbContext>(cfg =>
30+
{
31+
cfg.AddHandler<InboxMessageHandler>();
32+
});
2933

3034
IHost host = builder.Build();
3135

3236
var outbox = host.Services.GetRequiredService<IOutbox>();
37+
var inbox = host.Services.GetRequiredService<IInbox>();
3338
var dbContext = host.Services.GetRequiredService<AppDbContext>();
3439
await dbContext.Database.EnsureCreatedAsync();
3540

@@ -40,6 +45,9 @@
4045
var partition = (i % 3).ToString();
4146
var message = new OutboxMessage(Guid.NewGuid(), DateTime.UtcNow, new ExampleMessage($"partition {partition}: {i}"), partition);
4247
await outbox.AddMessageAsync(dbContext, message, CancellationToken.None);
48+
49+
var inboxMessage = new InboxMessage(Guid.NewGuid(), DateTime.UtcNow, new ExampleMessage($"inbox message: {i}"));
50+
await inbox.AddMessageAsync(dbContext, inboxMessage, CancellationToken.None);
4351
}
4452

4553
await transaction.CommitAsync();

src/Underground.Outbox/Configuration/ConfigureOutboxServices.cs

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,34 +23,60 @@ Action<OutboxServiceConfiguration> configuration
2323
var serviceConfig = new OutboxServiceConfiguration();
2424
configuration.Invoke(serviceConfig);
2525

26-
// register all assigned handlers
27-
services.TryAddEnumerable(serviceConfig.HandlersWithLifetime);
28-
2926
services.AddScoped<IOutboxDbContext>(sp => sp.GetRequiredService<TContext>());
3027
services.AddScoped<AddMessageToOutbox>();
3128
services.AddScoped<IOutbox, Outbox>();
32-
services.AddScoped<IMessageDispatcher, DirectInvocationDispatcher>();
33-
services.AddScoped<IMessageExceptionHandler, DiscardMessageOnExceptionHandler>();
34-
services.AddScoped<ProcessExceptionFromHandler>();
29+
services.AddScoped<IMessageDispatcher<OutboxMessage>, OutboxDispatcher>();
30+
31+
AddGenericServices<OutboxMessage, IOutboxDbContext>(services, serviceConfig);
32+
33+
return services;
34+
}
35+
36+
public static IServiceCollection AddInboxServices<TContext>(
37+
this IServiceCollection services,
38+
Action<InboxServiceConfiguration> configuration
39+
) where TContext : DbContext, IInboxDbContext
40+
{
41+
var serviceConfig = new InboxServiceConfiguration();
42+
configuration.Invoke(serviceConfig);
43+
44+
services.AddScoped<IInboxDbContext>(sp => sp.GetRequiredService<TContext>());
45+
services.AddScoped<AddMessageToInbox>();
46+
services.AddScoped<IInbox, Inbox>();
47+
services.AddScoped<IMessageDispatcher<InboxMessage>, InboxDispatcher>();
48+
49+
AddGenericServices<InboxMessage, IInboxDbContext>(services, serviceConfig);
50+
51+
return services;
52+
}
53+
54+
private static void AddGenericServices<TEntity, TContext>(this IServiceCollection services, ServiceConfiguration serviceConfig)
55+
where TEntity : class, IMessage
56+
where TContext : IDbContext
57+
{
58+
// register all assigned handlers
59+
services.TryAddEnumerable(serviceConfig.HandlersWithLifetime);
60+
61+
services.AddScoped<IMessageExceptionHandler<TEntity>, DiscardMessageOnExceptionHandler<TEntity>>();
62+
services.AddScoped<ProcessExceptionFromHandler<TEntity>>();
3563
services.AddSingleton(
36-
provider => new OutboxProcessor(
64+
provider => new Processor<TEntity>(
3765
serviceConfig,
3866
provider.GetRequiredService<IServiceScopeFactory>(),
39-
provider.GetRequiredService<IMessageDispatcher>(),
40-
provider.GetRequiredService<ILogger<OutboxProcessor>>()
67+
provider.GetRequiredService<IMessageDispatcher<TEntity>>(),
68+
provider.GetRequiredService<ILogger<Processor<TEntity>>>()
4169
)
4270
);
43-
services.AddHostedService<OutboxBackgroundService>();
71+
services.AddHostedService<BackgroundService<TEntity>>();
4472

4573
var serviceProvider = services.BuildServiceProvider();
46-
var dbContext = serviceProvider.GetRequiredService<IOutboxDbContext>();
74+
var dbContext = serviceProvider.GetRequiredService<TContext>();
4775
var connectionString = dbContext.Database.GetConnectionString();
4876
if (string.IsNullOrEmpty(connectionString))
4977
{
5078
throw new ArgumentException("Database connection string is not set. Please ensure the DbContext is properly configured.");
5179
}
5280
services.AddSingleton<IDistributedLockProvider>(_ => new PostgresDistributedSynchronizationProvider(connectionString));
53-
54-
return services;
5581
}
5682
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
3+
namespace Underground.Outbox.Configuration;
4+
5+
public class InboxServiceConfiguration : ServiceConfiguration
6+
{
7+
public override ServiceConfiguration AddHandler(HandlerType messageHandlerType, ServiceLifetime serviceLifetime = ServiceLifetime.Transient)
8+
{
9+
var interfaceType = messageHandlerType.GetInterface("Underground.Outbox.IInboxMessageHandler`1");
10+
11+
if (interfaceType?.IsGenericType == true)
12+
{
13+
Console.WriteLine($"Added handler for {interfaceType} with {messageHandlerType} ");
14+
HandlersWithLifetime.Add(new ServiceDescriptor(interfaceType, messageHandlerType, serviceLifetime));
15+
}
16+
17+
return this;
18+
}
19+
}

src/Underground.Outbox/Configuration/OutboxServiceConfiguration.cs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,9 @@
22

33
namespace Underground.Outbox.Configuration;
44

5-
public class OutboxServiceConfiguration
5+
public class OutboxServiceConfiguration : ServiceConfiguration
66
{
7-
/// <summary>
8-
/// Number of messages to process in a single batch.
9-
/// The whole batch is processed within a single transaction. If you want to have a transaction per message, set this to 1.
10-
/// </summary>
11-
public int BatchSize { get; set; } = 5;
12-
13-
public int ParallelProcessingOfPartitions { get; set; } = 4;
14-
15-
internal List<ServiceDescriptor> HandlersWithLifetime = [];
16-
17-
public OutboxServiceConfiguration AddHandler<TMessageHandlerType>()
18-
{
19-
return AddHandler(typeof(TMessageHandlerType));
20-
}
21-
22-
public OutboxServiceConfiguration AddHandler<TMessageHandlerType>(ServiceLifetime serviceLifetime)
23-
{
24-
return AddHandler(typeof(TMessageHandlerType), serviceLifetime);
25-
}
26-
27-
public OutboxServiceConfiguration AddHandler(HandlerType messageHandlerType, ServiceLifetime serviceLifetime = ServiceLifetime.Transient)
7+
public override ServiceConfiguration AddHandler(HandlerType messageHandlerType, ServiceLifetime serviceLifetime = ServiceLifetime.Transient)
288
{
299
var interfaceType = messageHandlerType.GetInterface("Underground.Outbox.IOutboxMessageHandler`1");
3010

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
3+
namespace Underground.Outbox.Configuration;
4+
5+
public abstract class ServiceConfiguration
6+
{
7+
/// <summary>
8+
/// Number of messages to process in a single batch.
9+
/// The whole batch is processed within a single transaction. If you want to have a transaction per message, set this to 1.
10+
/// </summary>
11+
public int BatchSize { get; set; } = 5;
12+
13+
public int ParallelProcessingOfPartitions { get; set; } = 4;
14+
15+
internal List<ServiceDescriptor> HandlersWithLifetime = [];
16+
17+
// TODO: add constraint on the generic type? like IInboxMessageHandler?
18+
public ServiceConfiguration AddHandler<TMessageHandlerType>()
19+
{
20+
return AddHandler(typeof(TMessageHandlerType));
21+
}
22+
23+
public ServiceConfiguration AddHandler<TMessageHandlerType>(ServiceLifetime serviceLifetime)
24+
{
25+
return AddHandler(typeof(TMessageHandlerType), serviceLifetime);
26+
}
27+
28+
#pragma warning disable CA1716 // Identifiers should not match keywords
29+
public abstract ServiceConfiguration AddHandler(HandlerType messageHandlerType, ServiceLifetime serviceLifetime = ServiceLifetime.Transient);
30+
#pragma warning restore CA1716 // Identifiers should not match keywords
31+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using Microsoft.EntityFrameworkCore;
2+
using Microsoft.EntityFrameworkCore.ChangeTracking;
3+
using Microsoft.EntityFrameworkCore.Infrastructure;
4+
5+
namespace Underground.Outbox.Data;
6+
7+
public interface IDbContext : IAsyncDisposable
8+
{
9+
public DatabaseFacade Database { get; }
10+
public ChangeTracker ChangeTracker { get; }
11+
#pragma warning disable CA1716 // Identifiers should not match keywords
12+
public DbSet<TEntity> Set<TEntity>() where TEntity : class;
13+
#pragma warning restore CA1716 // Identifiers should not match keywords
14+
15+
/// <summary>
16+
/// Asynchronously saves all changes made in this context to the database.
17+
/// </summary>
18+
/// <param name="cancellationToken">A <see cref="CancellationToken" /> to observe while waiting for the task to complete.</param>
19+
/// <returns>
20+
/// A task that represents the asynchronous save operation. The task result contains the
21+
/// number of state entries written to the database.
22+
/// </returns>
23+
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
24+
}

0 commit comments

Comments
 (0)