Skip to content

Commit

Permalink
Add queue name to RMQ, handle empty stream for ExpectedNew as non-exi…
Browse files Browse the repository at this point in the history
…stent.
  • Loading branch information
alexeyzimarev committed Jul 16, 2024
1 parent db4400b commit 05b6a4c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
_ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state")
};

if (loadedState.StreamVersion == ExpectedStreamVersion.NoStream && registeredHandler.ExpectedState == ExpectedState.New) {
throw new StreamNotFound(streamName);
}

var result = await registeredHandler
.Handler(loadedState.State, loadedState.Events, command, cancellationToken)
.NoContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@

// ReSharper disable UnusedAutoPropertyAccessor.Global
// ReSharper disable AutoPropertyCanBeMadeGetOnly.Global

namespace Eventuous.RabbitMq.Producers;

public class RabbitMqProduceOptions {
public string? RoutingKey { get; init; }
public string? AppId { get; init; }
public byte DeliveryMode { get; init; } = DefaultDeliveryMode;
public string? Expiration { get; init; }
public byte Priority { get; init; }
public string? ReplyTo { get; init; }
public bool Persisted { get; init; } = true;
public string? RoutingKey { get; init; }
public string? AppId { get; init; }

internal const byte DefaultDeliveryMode = 2;
}
/// <summary>
/// Message time-to-live in milliseconds
/// </summary>
public int? Expiration { get; init; }
public byte Priority { get; init; }
public string? ReplyTo { get; init; }
public bool Persisted { get; init; } = true;
}
29 changes: 14 additions & 15 deletions src/RabbitMq/src/Eventuous.RabbitMq/Producers/RabbitMqProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public class RabbitMqProducer : BaseProducer<RabbitMqProduceOptions>, IHostedPro
/// <param name="log">Optional logger</param>
/// <param name="options">Optional additional configuration for the exchange</param>
public RabbitMqProducer(
ConnectionFactory connectionFactory,
IEventSerializer? serializer = null,
ILogger<RabbitMqProducer>? log = null,
RabbitMqExchangeOptions? options = null
)
ConnectionFactory connectionFactory,
IEventSerializer? serializer = null,
ILogger<RabbitMqProducer>? log = null,
RabbitMqExchangeOptions? options = null
)
: base(TracingOptions) {
_log = log;
_options = options;
Expand All @@ -50,6 +50,7 @@ public Task StartAsync(CancellationToken cancellationToken = default) {
_channel = _connection.CreateModel();
_channel.ConfirmSelect();
Ready = true;

return Task.CompletedTask;
}

Expand All @@ -60,11 +61,11 @@ public Task StartAsync(CancellationToken cancellationToken = default) {
};

protected override async Task ProduceMessages(
StreamName stream,
IEnumerable<ProducedMessage> messages,
RabbitMqProduceOptions? options,
CancellationToken cancellationToken = default
) {
StreamName stream,
IEnumerable<ProducedMessage> messages,
RabbitMqProduceOptions? options,
CancellationToken cancellationToken = default
) {
EnsureExchange(stream);
var produced = new List<ProducedMessage>();
var failed = new List<(ProducedMessage Msg, Exception Ex)>();
Expand All @@ -77,8 +78,7 @@ protected override async Task ProduceMessages(
try {
Publish(stream, message, options);
produced.Add(message);
}
catch (Exception e) {
} catch (Exception e) {
_log?.LogError(e, "Failed to produce message to RabbitMQ");
failed.Add((message, e));
}
Expand All @@ -102,7 +102,7 @@ void Publish(string stream, ProducedMessage message, RabbitMqProduceOptions? opt

var prop = _channel.CreateBasicProperties();
prop.ContentType = contentType;
prop.DeliveryMode = options?.DeliveryMode ?? RabbitMqProduceOptions.DefaultDeliveryMode;
prop.Persistent = options?.Persisted != false;
prop.Type = eventType;
prop.CorrelationId = metadata!.GetCorrelationId();
prop.MessageId = message.MessageId.ToString();
Expand All @@ -111,8 +111,7 @@ void Publish(string stream, ProducedMessage message, RabbitMqProduceOptions? opt
prop.Headers = metadata.ToDictionary(x => x.Key, x => x.Value);

if (options != null) {
prop.Expiration = options.Expiration;
prop.Persistent = options.Persisted;
prop.Expiration = options.Expiration?.ToString();
prop.Priority = options.Priority;
prop.AppId = options.AppId;
prop.ReplyTo = options.ReplyTo;
Expand Down

0 comments on commit 05b6a4c

Please sign in to comment.