Skip to content

Commit

Permalink
Replaced MediatR event bus with the custom one to enable scoping, tra…
Browse files Browse the repository at this point in the history
…cing etc.

Plugged that fully into Marten and EventStoreDB
  • Loading branch information
oskardudycz committed Mar 5, 2022
1 parent 810ae76 commit a28a3e6
Show file tree
Hide file tree
Showing 32 changed files with 188 additions and 176 deletions.
5 changes: 2 additions & 3 deletions Core.ElasticSearch/Projections/ElasticSearchProjection.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
using Core.ElasticSearch.Indices;
using Core.Events;
using Core.Events.NoMediator;
using Core.Projections;
using Elasticsearch.Net;
using Microsoft.Extensions.DependencyInjection;
using Nest;

namespace Core.ElasticSearch.Projections;

public class ElasticSearchProjection<TEvent, TView> : INoMediatorEventHandler<EventEnvelope<TEvent>>
public class ElasticSearchProjection<TEvent, TView> : IEventHandler<EventEnvelope<TEvent>>
where TView : class, IProjection
where TEvent : IEvent
{
Expand Down Expand Up @@ -49,7 +48,7 @@ public static IServiceCollection Project<TEvent, TView>(this IServiceCollection
where TView : class, IProjection
where TEvent : IEvent
{
services.AddTransient<INoMediatorEventHandler<EventEnvelope<TEvent>>>(sp =>
services.AddTransient<IEventHandler<EventEnvelope<TEvent>>>(sp =>
{
var session = sp.GetRequiredService<IElasticClient>();
Expand Down
5 changes: 1 addition & 4 deletions Core.ElasticSearch/Repository/ElasticSearchRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@ namespace Core.ElasticSearch.Repository;
public class ElasticSearchRepository<T>: IElasticSearchRepository<T> where T : class, IAggregate, new()
{
private readonly IElasticClient elasticClient;
private readonly IEventBus eventBus;

public ElasticSearchRepository(
IElasticClient elasticClient,
IEventBus eventBus
IElasticClient elasticClient
)
{
this.elasticClient = elasticClient ?? throw new ArgumentNullException(nameof(elasticClient));
this.eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
}

public async Task<T?> Find(Guid id, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void DeSerialize_ForNonEmptyEventMetadata_ShouldSucceed()
var json = $"{{\"$correlationId\":\"{correlationId.Value}\",\"$causationId\":\"{causationId.Value}\"}}";

// When
var eventMetadata = JsonConvert.DeserializeObject<EventMetadata>(json, jsonConverter);
var eventMetadata = JsonConvert.DeserializeObject<TraceMetadata>(json, jsonConverter);


eventMetadata.Should().Be(expectedEventMetadata);
Expand Down
2 changes: 0 additions & 2 deletions Core.EventStoreDB/Config.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Core.BackgroundWorkers;
using Core.Events.NoMediator;
using Core.EventStoreDB.OptimisticConcurrency;
using Core.EventStoreDB.Subscriptions;
using EventStore.Client;
Expand Down Expand Up @@ -27,7 +26,6 @@ public static IServiceCollection AddEventStoreDB(this IServiceCollection service
var eventStoreDBConfig = config.GetSection(DefaultConfigKey).Get<EventStoreDBConfig>();

services
.AddEventBus()
.AddSingleton(new EventStoreClient(EventStoreClientSettings.Create(eventStoreDBConfig.ConnectionString)))
.AddEventStoreDBAppendScope()
.AddTransient<EventStoreDBSubscriptionToAll, EventStoreDBSubscriptionToAll>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class EventStoreDBEventMetadataJsonConverter : JsonConverter

public override bool CanConvert(Type objectType)
{
return objectType == typeof(EventMetadata);
return objectType == typeof(TraceMetadata);
}

public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Core.Events;
using Core.Events.NoMediator;
using Core.EventStoreDB.Events;
using Core.Threading;
using EventStore.Client;
Expand All @@ -23,7 +22,7 @@ public class EventStoreDBSubscriptionToAllOptions

public class EventStoreDBSubscriptionToAll
{
private readonly INoMediatorEventBus eventBus;
private readonly IEventBus eventBus;
private readonly EventStoreClient eventStoreClient;
private readonly ISubscriptionCheckpointRepository checkpointRepository;
private readonly ILogger<EventStoreDBSubscriptionToAll> logger;
Expand All @@ -34,7 +33,7 @@ public class EventStoreDBSubscriptionToAll

public EventStoreDBSubscriptionToAll(
EventStoreClient eventStoreClient,
INoMediatorEventBus eventBus,
IEventBus eventBus,
ISubscriptionCheckpointRepository checkpointRepository,
ILogger<EventStoreDBSubscriptionToAll> logger
)
Expand Down
6 changes: 5 additions & 1 deletion Core.Marten/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Marten.Events.Projections;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Weasel.Core;

namespace Core.Marten;
Expand Down Expand Up @@ -71,7 +72,10 @@ private static StoreOptions SetStoreOptions(
);

options.Projections.Add(
new MartenSubscription(new[] { new MartenEventPublisher(serviceProvider) }),
new MartenSubscription(
new[] { new MartenEventPublisher(serviceProvider) },
serviceProvider.GetRequiredService<ILogger<MartenSubscription>>()
),
ProjectionLifecycle.Async,
"MartenSubscription"
);
Expand Down
13 changes: 7 additions & 6 deletions Core.Marten/ExternalProjections/MartenExternalProjection.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
using Core.Events;
using Core.Events.NoMediator;
using Core.Projections;
using Marten;
using Microsoft.Extensions.DependencyInjection;

namespace Core.Marten.ExternalProjections;

public class MartenExternalProjection<TEvent, TView>: INoMediatorEventHandler<EventEnvelope<TEvent>>
public class MartenExternalProjection<TEvent, TView>: IEventHandler<EventEnvelope<TEvent>>
where TView : IVersionedProjection
where TEvent : notnull
{
Expand All @@ -24,15 +23,17 @@ Func<TEvent, Guid> getId

public async Task Handle(EventEnvelope<TEvent> eventEnvelope, CancellationToken ct)
{
var entity = await session.LoadAsync<TView>(getId(eventEnvelope.Data), ct) ??
var (@event, eventMetadata) = eventEnvelope;

var entity = await session.LoadAsync<TView>(getId(@event), ct) ??
(TView)Activator.CreateInstance(typeof(TView), true)!;

var eventLogPosition = eventEnvelope.Metadata.LogPosition;
var eventLogPosition = eventMetadata.LogPosition;

if (entity.LastProcessedPosition >= eventLogPosition)
return;

entity.When(eventEnvelope.Data);
entity.When(@event);

entity.LastProcessedPosition = eventLogPosition;

Expand All @@ -59,7 +60,7 @@ public static IServiceCollection Project<TEvent, TView>(this IServiceCollection
where TEvent : notnull
{
services
.AddTransient<INoMediatorEventHandler<EventEnvelope<TEvent>>>(sp =>
.AddTransient<IEventHandler<EventEnvelope<TEvent>>>(sp =>
{
var session = sp.GetRequiredService<IDocumentSession>();
Expand Down
21 changes: 17 additions & 4 deletions Core.Marten/Subscriptions/MartenSubscription.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Microsoft.Extensions.Logging;

namespace Core.Marten.Subscriptions;

public class MartenSubscription: IProjection
{
private readonly IEnumerable<IMartenEventsConsumer> consumers;
private readonly ILogger<MartenSubscription> logger;

public MartenSubscription(IEnumerable<IMartenEventsConsumer> consumers)
public MartenSubscription(
IEnumerable<IMartenEventsConsumer> consumers,
ILogger<MartenSubscription> logger
)
{
this.consumers = consumers;
this.logger = logger;
}

public void Apply(
Expand All @@ -25,14 +31,21 @@ public async Task ApplyAsync(
CancellationToken ct
)
{
foreach (var consumer in consumers)
try
{
foreach (var consumer in consumers)
{
await consumer.ConsumeAsync(operations, streams, ct);
}
}
catch (Exception exc)
{
await consumer.ConsumeAsync(operations, streams, ct);
logger.LogError("Error while processing Marten Subscription: {ExceptionMessage}", exc.Message);
throw;
}
}
}


public interface IMartenEventsConsumer
{
Task ConsumeAsync(
Expand Down
4 changes: 1 addition & 3 deletions Core.Testing/ApiFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
using Core.Events.External;
using Core.Requests;
using FluentAssertions;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using IEventBus = Core.Events.IEventBus;

namespace Core.Testing;

Expand All @@ -22,7 +20,7 @@ public override TestContext CreateTestContext() =>
{
SetupServices?.Invoke(services);
services.AddSingleton(eventsLog);
services.AddSingleton(typeof(INotificationHandler<>), typeof(EventListener<>));
services.AddSingleton(typeof(IEventHandler<>), typeof(EventListener<>));
services.AddSingleton<IExternalEventProducer>(externalEventProducer);
services.AddSingleton<IExternalCommandBus>(externalCommandBus);
services.AddSingleton<IExternalEventConsumer, DummyExternalEventConsumer>();
Expand Down
7 changes: 3 additions & 4 deletions Core.Testing/EventListener.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
using Core.Events;
using MediatR;

namespace Core.Testing;

public class EventsLog
{
public List<IEvent> PublishedEvents { get; } = new List<IEvent>();
public List<IEvent> PublishedEvents { get; } = new();
}

public class EventListener<TEvent>: INotificationHandler<TEvent>
public class EventListener<TEvent>: IEventHandler<TEvent>
where TEvent : IEvent
{
private readonly EventsLog eventsLog;
Expand All @@ -24,4 +23,4 @@ public Task Handle(TEvent @event, CancellationToken cancellationToken)

return Task.CompletedTask;
}
}
}
3 changes: 0 additions & 3 deletions Core.WebApi/Tracing/TracingMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using Core.Tracing;
using Core.Tracing.Causation;
using Core.Tracing.Correlation;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;

namespace Core.WebApi.Tracing;
Expand Down
4 changes: 2 additions & 2 deletions Core/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public static IServiceCollection AddCoreServices(this IServiceCollection service
services.AddMediatR()
.AddScoped<ICommandBus, CommandBus>()
.AddScoped<IQueryBus, QueryBus>()
.AddTracing();
.AddTracing()
.AddEventBus();

services.TryAddScoped<IEventBus, EventBus>();
services.TryAddScoped<IExternalEventProducer, NulloExternalEventProducer>();
services.TryAddScoped<IExternalCommandBus, ExternalCommandBus>();

Expand Down
7 changes: 3 additions & 4 deletions Core/Events/Config.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;

namespace Core.Events;

Expand All @@ -11,8 +10,8 @@ this IServiceCollection services
where TEvent : IEvent
where TEventHandler : class, IEventHandler<TEvent>
{
return services.AddTransient<TEventHandler>()
.AddTransient<INotificationHandler<TEvent>>(sp => sp.GetRequiredService<TEventHandler>())
return services
.AddTransient<TEventHandler>()
.AddTransient<IEventHandler<TEvent>>(sp => sp.GetRequiredService<TEventHandler>());
}
}
79 changes: 62 additions & 17 deletions Core/Events/EventBus.cs
Original file line number Diff line number Diff line change
@@ -1,41 +1,86 @@
using Core.Events.External;
using MediatR;
using System.Collections.Concurrent;
using System.Reflection;
using Core.Tracing;
using Microsoft.Extensions.DependencyInjection;
using Polly;

namespace Core.Events;

public interface IEventBus
{
Task Publish(IEvent @event, CancellationToken ct);
Task Publish(IEvent[] events, CancellationToken ct);
Task Publish(object @event, CancellationToken ct);
}

public class EventBus: IEventBus
{
private readonly IMediator mediator;
private readonly IExternalEventProducer externalEventProducer;
private readonly IServiceProvider serviceProvider;
private readonly Func<IServiceProvider, EventEnvelope?, TracingScope> createTracingScope;
private readonly AsyncPolicy retryPolicy;
private static readonly ConcurrentDictionary<Type, MethodInfo> PublishMethods = new();

public EventBus(
IMediator mediator,
IExternalEventProducer externalEventProducer
IServiceProvider serviceProvider,
Func<IServiceProvider, EventEnvelope?, TracingScope> createTracingScope,
AsyncPolicy retryPolicy
)
{
this.mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
this.externalEventProducer = externalEventProducer?? throw new ArgumentNullException(nameof(externalEventProducer));
this.serviceProvider = serviceProvider;
this.createTracingScope = createTracingScope;
this.retryPolicy = retryPolicy;
}

public async Task Publish(IEvent[] events, CancellationToken ct)
private async Task Publish<TEvent>(TEvent @event, CancellationToken ct)
{
foreach (var @event in events)
var eventEnvelope = @event as EventEnvelope;
// You can consider adding here a retry policy for event handling
using var scope = serviceProvider.CreateScope();
using var tracingScope = createTracingScope(serviceProvider, eventEnvelope);

var eventHandlers =
scope.ServiceProvider.GetServices<IEventHandler<TEvent>>();

foreach (var eventHandler in eventHandlers)
{
await Publish(@event, ct);
await retryPolicy.ExecuteAsync(async token =>
{
await eventHandler.Handle(@event, token);
}, ct);
}
}

public async Task Publish(IEvent @event, CancellationToken ct)
public async Task Publish(object @event, CancellationToken ct)
{
// if it's an event envelope, publish also just event data
// thanks to that both handlers with envelope and without will be called
if (@event is EventEnvelope(var data, _))
await (Task)GetGenericPublishFor(data)
.Invoke(this, new[] { data, ct })!;

await (Task)GetGenericPublishFor(@event)
.Invoke(this, new[] { @event, ct })!;
}

private static MethodInfo GetGenericPublishFor(object @event) =>
PublishMethods.GetOrAdd(@event.GetType(), eventType =>
typeof(EventBus)
.GetMethods(BindingFlags.Instance | BindingFlags.NonPublic)
.Single(m => m.Name == nameof(Publish) && m.GetGenericArguments().Any())
.MakeGenericMethod(eventType)
);
}

public static class EventBusExtensions
{
public static IServiceCollection AddEventBus(this IServiceCollection services, AsyncPolicy? asyncPolicy = null)
{
await mediator.Publish(@event, ct);
services.AddScoped<IEventBus, EventBus>(sp =>
new EventBus(
sp,
sp.GetRequiredService<ITracingScopeFactory>().CreateTraceScope,
asyncPolicy ?? Policy.NoOpAsync()
)
);

if (@event is IExternalEvent externalEvent)
await externalEventProducer.Publish(externalEvent, ct);
return services;
}
}
Loading

0 comments on commit a28a3e6

Please sign in to comment.