Skip to content

Commit a26969c

Browse files
authored
Merge pull request #133 from cnblogs/add-event-queue
feat: add event buffer
2 parents 54ec918 + 41bcb76 commit a26969c

32 files changed

+526
-89
lines changed

src/Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection/CqrsInjector.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,17 @@ public CqrsInjector AddRemoteQueryCache<TRemote>(Action<CacheableRequestOptions>
111111
/// Use default implementation of <see cref="IFileProvider"/> that accesses file system directly.
112112
/// </summary>
113113
/// <returns></returns>
114-
public CqrsInjector UseDefaultFileProvider()
114+
public CqrsInjector AddDefaultFileProvider()
115115
{
116-
return UseFileProvider<DefaultFileProvider>();
116+
return AddFileProvider<DefaultFileProvider>();
117117
}
118118

119119
/// <summary>
120120
/// Use given implementation of <see cref="IFileProvider"/>.
121121
/// </summary>
122122
/// <typeparam name="TProvider">The implementation type.</typeparam>
123123
/// <returns></returns>
124-
public CqrsInjector UseFileProvider<TProvider>()
124+
public CqrsInjector AddFileProvider<TProvider>()
125125
where TProvider : class, IFileProvider
126126
{
127127
Services.AddScoped<IFileProvider, TProvider>();
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
2+
3+
/// <summary>
4+
/// The integration event stored in buffer.
5+
/// </summary>
6+
/// <param name="Name">The event name.</param>
7+
/// <param name="Event">The event data.</param>
8+
public record BufferedIntegrationEvent(string Name, IntegrationEvent Event);

src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/Cnblogs.Architecture.Ddd.EventBus.Abstractions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@
66
</Description>
77
</PropertyGroup>
88
<ItemGroup>
9-
<PackageReference Include="MediatR" Version="12.1.1" />
9+
<ProjectReference Include="..\Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection\Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection.csproj" />
1010
</ItemGroup>
1111
</Project>
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,44 @@
1-
using Cnblogs.Architecture.Ddd.EventBus.Abstractions;
2-
using Dapr.Client;
31
using MediatR;
42
using Microsoft.Extensions.Logging;
5-
using Microsoft.Extensions.Options;
63

7-
namespace Cnblogs.Architecture.Ddd.EventBus.Dapr;
4+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
85

96
/// <summary>
10-
/// Dapr EventBus 实现。
7+
/// Default implementation for <see cref="IEventBus"/>
118
/// </summary>
12-
public class DaprEventBus : IEventBus
9+
public class DefaultEventBus : IEventBus
1310
{
14-
private readonly DaprClient _daprClient;
15-
private readonly DaprOptions _daprOptions;
11+
private readonly IEventBuffer _eventBuffer;
1612
private readonly IMediator _mediator;
17-
private readonly ILogger<DaprEventBus> _logger;
13+
private readonly ILogger<DefaultEventBus> _logger;
1814

1915
/// <summary>
20-
/// 创建一个 DaprEventBus
16+
/// Create a <see cref="DefaultEventBus"/> instance.
2117
/// </summary>
22-
/// <param name="daprOptions"><see cref="DaprOptions"/></param>
23-
/// <param name="daprClient"><see cref="DaprClient"/></param>
24-
/// <param name="logger">日志记录器。</param>
25-
/// <param name="mediator"><see cref="IMediator"/></param>
26-
public DaprEventBus(
27-
IOptions<DaprOptions> daprOptions,
28-
DaprClient daprClient,
29-
IMediator mediator,
30-
ILogger<DaprEventBus> logger)
18+
/// <param name="eventBuffer">The underlying event buffer.</param>
19+
/// <param name="mediator">The IMediator.</param>
20+
/// <param name="logger">The logger.</param>
21+
public DefaultEventBus(IEventBuffer eventBuffer, IMediator mediator, ILogger<DefaultEventBus> logger)
3122
{
32-
_daprClient = daprClient;
23+
_eventBuffer = eventBuffer;
3324
_logger = logger;
3425
_mediator = mediator;
35-
_daprOptions = daprOptions.Value;
3626
}
3727

3828
/// <inheritdoc />
39-
public async Task PublishAsync<TEvent>(TEvent @event)
29+
public Task PublishAsync<TEvent>(TEvent @event)
4030
where TEvent : IntegrationEvent
4131
{
42-
await PublishAsync(typeof(TEvent).Name, @event);
32+
return PublishAsync(typeof(TEvent).Name, @event);
4333
}
4434

4535
/// <inheritdoc />
46-
public async Task PublishAsync<TEvent>(string eventName, TEvent @event)
36+
public Task PublishAsync<TEvent>(string eventName, TEvent @event)
4737
where TEvent : IntegrationEvent
4838
{
49-
_logger.LogInformation(
50-
"Publishing IntegrationEvent, Name: {EventName}, Body: {Event}, TraceId: {TraceId}",
51-
eventName,
52-
@event,
53-
@event.TraceId ?? @event.Id);
5439
@event.TraceId = TraceId;
55-
await _daprClient.PublishEventAsync(
56-
DaprOptions.PubSubName,
57-
DaprUtils.GetDaprTopicName(_daprOptions.AppName, eventName),
58-
@event);
40+
_eventBuffer.Add(eventName, @event);
41+
return Task.CompletedTask;
5942
}
6043

6144
/// <inheritdoc />
@@ -97,4 +80,4 @@ public Task ReceiveAsync<TEvent>(TEvent receivedEvent)
9780

9881
/// <inheritdoc />
9982
public Guid? TraceId { get; set; }
100-
}
83+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
3+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
4+
5+
/// <summary>
6+
/// Options for event bus.
7+
/// </summary>
8+
public class EventBusOptions
9+
{
10+
/// <summary>
11+
/// The service collection for
12+
/// </summary>
13+
public IServiceCollection? Services { get; set; }
14+
15+
/// <summary>
16+
/// Interval for publish integration event.
17+
/// </summary>
18+
public int Interval { get; set; } = 1;
19+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
3+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
4+
5+
/// <summary>
6+
/// Builder for <see cref="EventBusOptions"/>.
7+
/// </summary>
8+
public class EventBusOptionsBuilder
9+
{
10+
/// <summary>
11+
/// Create a <see cref="EventBusOptionsBuilder"/>.
12+
/// </summary>
13+
/// <param name="services"></param>
14+
public EventBusOptionsBuilder(IServiceCollection services)
15+
{
16+
Services = services;
17+
}
18+
19+
/// <summary>
20+
/// Internal service collection.
21+
/// </summary>
22+
public IServiceCollection Services { get; }
23+
24+
/// <summary>
25+
/// The interval in milliseconds for checking pending integration events.
26+
/// </summary>
27+
public int Interval { get; set; } = 1000;
28+
29+
internal Action<EventBusOptions> GetConfiguration()
30+
{
31+
return o =>
32+
{
33+
o.Interval = Interval;
34+
};
35+
}
36+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System.Reflection;
2+
using Cnblogs.Architecture.Ddd.Cqrs.DependencyInjection;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using Microsoft.Extensions.DependencyInjection.Extensions;
5+
6+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
7+
8+
/// <summary>
9+
/// Extension methods for injecting <see cref="IEventBus"/> to service collection.
10+
/// </summary>
11+
public static class EventBusServiceInjector
12+
{
13+
/// <summary>
14+
/// Add event bus for integration event support.
15+
/// </summary>
16+
/// <param name="services">The services.</param>
17+
/// <param name="configuration">Extra configurations for event bus.</param>
18+
/// <param name="handlerAssemblies">The assemblies for handlers.</param>
19+
/// <returns><see cref="IServiceCollection"/>.</returns>
20+
public static IServiceCollection AddEventBus(
21+
this IServiceCollection services,
22+
Action<EventBusOptionsBuilder>? configuration = null,
23+
params Assembly[] handlerAssemblies)
24+
{
25+
services.TryAddSingleton<IEventBuffer, InMemoryEventBuffer>();
26+
services.TryAddScoped<IEventBus, DefaultEventBus>();
27+
services.AddHostedService<PublishIntegrationEventHostedService>();
28+
var builder = new EventBusOptionsBuilder(services);
29+
configuration?.Invoke(builder);
30+
services.Configure(builder.GetConfiguration());
31+
if (handlerAssemblies.Length > 0)
32+
{
33+
services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblies(handlerAssemblies));
34+
}
35+
36+
return services;
37+
}
38+
39+
/// <summary>
40+
/// Add event bus for integration event support.
41+
/// </summary>
42+
/// <param name="cqrsInjector">The <see cref="CqrsInjector"/>.</param>
43+
/// <param name="configuration">The configuration.</param>
44+
/// <param name="handlerAssemblies">The assemblies for handlers.</param>
45+
/// <returns></returns>
46+
public static CqrsInjector AddEventBus(
47+
this CqrsInjector cqrsInjector,
48+
Action<EventBusOptionsBuilder>? configuration = null,
49+
params Assembly[] handlerAssemblies)
50+
{
51+
cqrsInjector.Services.AddEventBus(configuration, handlerAssemblies);
52+
return cqrsInjector;
53+
}
54+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
2+
3+
/// <summary>
4+
/// Buffer for integration events.
5+
/// </summary>
6+
public interface IEventBuffer
7+
{
8+
/// <summary>
9+
/// Number of pending events.
10+
/// </summary>
11+
int Count { get; }
12+
13+
/// <summary>
14+
/// Add an event to buffer.
15+
/// </summary>
16+
/// <param name="name">The name of integration event.</param>
17+
/// <param name="event">The event.</param>
18+
/// <typeparam name="TEvent">The type of integration event.</typeparam>
19+
void Add<TEvent>(string name, TEvent @event)
20+
where TEvent : IntegrationEvent;
21+
22+
/// <summary>
23+
/// Get an integration event without removing it.
24+
/// </summary>
25+
/// <returns>The integration event, <c>null</c> will be returned if buffer is empty.</returns>
26+
BufferedIntegrationEvent? Peek();
27+
28+
/// <summary>
29+
/// Get an integration event and remove it.
30+
/// </summary>
31+
/// <returns>The integration event, <c>null</c> will be returned if buffer is empty.</returns>
32+
BufferedIntegrationEvent? Pop();
33+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
2+
3+
/// <summary>
4+
/// Provider contract for event bus.
5+
/// </summary>
6+
public interface IEventBusProvider
7+
{
8+
/// <summary>
9+
/// Emit an integration event.
10+
/// </summary>
11+
/// <param name="eventName">The name of the event.</param>
12+
/// <param name="event">The event body.</param>
13+
/// <returns></returns>
14+
Task PublishAsync(string eventName, IntegrationEvent @event);
15+
}

src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusHandler.cs renamed to src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBusRequestHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
33
/// <summary>
44
/// The empty interface as a generic type constraint
55
/// </summary>
6-
public interface IEventBusHandler
6+
public interface IEventBusRequestHandler
77
{
88
}

src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IIntegrationEventHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
66
/// 集成事件处理器。
77
/// </summary>
88
/// <typeparam name="TEvent">集成事件。</typeparam>
9-
public interface IIntegrationEventHandler<TEvent> : INotificationHandler<TEvent>, IEventBusHandler
9+
public interface IIntegrationEventHandler<TEvent> : INotificationHandler<TEvent>, IEventBusRequestHandler
1010
where TEvent : IntegrationEvent
1111
{
1212
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using System.Collections.Concurrent;
2+
3+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
4+
5+
/// <summary>
6+
/// Implementation of <see cref="IEventBuffer"/> using <see cref="ConcurrentQueue{T}"/>.
7+
/// </summary>
8+
public class InMemoryEventBuffer : IEventBuffer
9+
{
10+
private readonly ConcurrentQueue<BufferedIntegrationEvent> _queue = new();
11+
12+
/// <inheritdoc />
13+
public int Count => _queue.Count;
14+
15+
/// <inheritdoc />
16+
public void Add<TEvent>(string name, TEvent @event)
17+
where TEvent : IntegrationEvent
18+
{
19+
_queue.Enqueue(new BufferedIntegrationEvent(name, @event));
20+
}
21+
22+
/// <inheritdoc />
23+
public BufferedIntegrationEvent? Peek()
24+
{
25+
return _queue.TryPeek(out var @event) ? @event : null;
26+
}
27+
28+
/// <inheritdoc />
29+
public BufferedIntegrationEvent? Pop()
30+
{
31+
return _queue.TryDequeue(out var @event) ? @event : null;
32+
}
33+
}

0 commit comments

Comments
 (0)