Skip to content

Commit

Permalink
Merge pull request #1494 from Particular/set-native-content-type-9-2
Browse files Browse the repository at this point in the history
Allow setting customizing the outgoing messages by setting AMQP attributes just before dispatching the message to the broker
  • Loading branch information
SzymonPobiega authored Nov 18, 2024
2 parents 3f071e3 + 920b89e commit a9e1f03
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System.Threading.Tasks;
using AcceptanceTesting;
using global::RabbitMQ.Client.Events;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

class When_customizing_outgoing_messages : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_set_native_property_value()
{
var scenario = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b => b.When((bus, c) => bus.SendLocal(new Message())))
.Done(c => c.MessageReceived)
.Run();

Assert.That(scenario.BasicDeliverEventArgs.BasicProperties.AppId, Is.EqualTo("MyValue"));
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>(endpointConfiguration =>
{
endpointConfiguration.ConfigureRabbitMQTransport().OutgoingNativeMessageCustomization =
(operation, properties) =>
{
properties.AppId = "MyValue";
};
});
}

class MyEventHandler : IHandleMessages<Message>
{
Context testContext;

public MyEventHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(Message message, IMessageHandlerContext context)
{
testContext.BasicDeliverEventArgs = context.Extensions.Get<BasicDeliverEventArgs>();
testContext.MessageReceived = true;

return Task.CompletedTask;
}
}
}

public class Message : IMessage
{
}

class Context : ScenarioContext
{
public bool MessageReceived { get; set; }

public BasicDeliverEventArgs BasicDeliverEventArgs { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected override BrokerConnection GetBoundValue(BindingContext bindingContext)
certificateCollection.Add(certificate);
}

var connectionFactory = new ConnectionFactory("rabbitmq-transport", connectionConfiguration, certificateCollection, disableCertificateValidation, useExternalAuth, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), new List<(string, int, bool)>());
var connectionFactory = new ConnectionFactory("rabbitmq-transport", connectionConfiguration, certificateCollection, disableCertificateValidation, useExternalAuth, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(10), []);
var brokerConnection = new BrokerConnection(connectionFactory);

return brokerConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace NServiceBus
public System.TimeSpan HeartbeatInterval { get; set; }
public System.Func<RabbitMQ.Client.Events.BasicDeliverEventArgs, string> MessageIdStrategy { get; set; }
public System.TimeSpan NetworkRecoveryInterval { get; set; }
public System.Action<NServiceBus.Transport.IOutgoingTransportOperation, RabbitMQ.Client.IBasicProperties> OutgoingNativeMessageCustomization { get; set; }
public NServiceBus.PrefetchCountCalculation PrefetchCountCalculation { get; set; }
public System.TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker { get; set; }
public bool UseExternalAuthMechanism { get; set; }
Expand Down
44 changes: 39 additions & 5 deletions src/NServiceBus.Transport.RabbitMQ/RabbitMQTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Transport;
using Transport.RabbitMQ;
Expand Down Expand Up @@ -91,6 +92,21 @@ public TimeSpan TimeToWaitBeforeTriggeringCircuitBreaker
}
}

/// <summary>
/// Gets or sets the action that allows customization of the native <see cref="IBasicProperties"/>
/// just before it is dispatched to the rabbitmq client.
/// </summary>
/// <remarks>
/// <para>
/// When provided the customization is called after all other transport customizations configured, meaning that
/// any changes made by customization logic may override or conflict with previous transport-level adjustments.
/// This extension point should be used with caution, as modifying a native message at this stage can lead to unintended behavior
/// downstream if the message content or properties are altered in ways that do not align
/// with expectations elsewhere in the system.
/// </para>
/// </remarks>
public Action<IOutgoingTransportOperation, IBasicProperties> OutgoingNativeMessageCustomization { get; set; }

/// <summary>
/// The calculation method for the prefetch count. The default is 3 times the maximum concurrency value.
/// </summary>
Expand Down Expand Up @@ -183,17 +199,35 @@ public override Task<TransportInfrastructure> Initialize(HostSettings hostSettin
certCollection = new X509Certificate2Collection(ClientCertificate);
}

var connectionFactory = new ConnectionFactory(hostSettings.Name, ConnectionConfiguration, certCollection, !ValidateRemoteCertificate,
UseExternalAuthMechanism, HeartbeatInterval, NetworkRecoveryInterval, additionalClusterNodes);
var connectionFactory = new ConnectionFactory(
hostSettings.Name,
ConnectionConfiguration,
certCollection,
!ValidateRemoteCertificate,
UseExternalAuthMechanism,
HeartbeatInterval,
NetworkRecoveryInterval,
additionalClusterNodes
);

var channelProvider = new ChannelProvider(connectionFactory, NetworkRecoveryInterval, RoutingTopology);
channelProvider.CreateConnection();

var converter = new MessageConverter(MessageIdStrategy);

var infra = new RabbitMQTransportInfrastructure(hostSettings, receivers, connectionFactory,
RoutingTopology, channelProvider, converter, TimeToWaitBeforeTriggeringCircuitBreaker,
PrefetchCountCalculation, NetworkRecoveryInterval, SupportsDelayedDelivery);
var infra = new RabbitMQTransportInfrastructure(
hostSettings,
receivers,
connectionFactory,
RoutingTopology,
channelProvider,
converter,
OutgoingNativeMessageCustomization,
TimeToWaitBeforeTriggeringCircuitBreaker,
PrefetchCountCalculation,
NetworkRecoveryInterval,
SupportsDelayedDelivery
);

if (hostSettings.SetupInfrastructure)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;

sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
{
Expand All @@ -17,6 +18,7 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings,
ConnectionFactory connectionFactory, IRoutingTopology routingTopology,
ChannelProvider channelProvider, MessageConverter messageConverter,
Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization,
TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, PrefetchCountCalculation prefetchCountCalculation,
TimeSpan networkRecoveryInterval, bool supportsDelayedDelivery)
{
Expand All @@ -26,7 +28,7 @@ public RabbitMQTransportInfrastructure(HostSettings hostSettings, ReceiveSetting
this.networkRecoveryInterval = networkRecoveryInterval;
this.supportsDelayedDelivery = supportsDelayedDelivery;

Dispatcher = new MessageDispatcher(channelProvider, supportsDelayedDelivery);
Dispatcher = new MessageDispatcher(channelProvider, messageCustomization, supportsDelayedDelivery);
Receivers = receiverSettings.Select(x => CreateMessagePump(hostSettings, x, messageConverter, timeToWaitBeforeTriggeringCircuitBreaker, prefetchCountCalculation))
.ToDictionary(x => x.Id, x => x);
}
Expand Down
11 changes: 10 additions & 1 deletion src/NServiceBus.Transport.RabbitMQ/Sending/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;

class MessageDispatcher : IMessageDispatcher
{
readonly ChannelProvider channelProvider;
readonly Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization;
readonly bool supportsDelayedDelivery;

public MessageDispatcher(ChannelProvider channelProvider, bool supportsDelayedDelivery)
public MessageDispatcher(
ChannelProvider channelProvider,
Action<IOutgoingTransportOperation, IBasicProperties> messageCustomization,
bool supportsDelayedDelivery
)
{
this.channelProvider = channelProvider;
this.messageCustomization = messageCustomization ?? (static (_, _) => { });
this.supportsDelayedDelivery = supportsDelayedDelivery;
}

Expand Down Expand Up @@ -59,6 +66,7 @@ Task SendMessage(UnicastTransportOperation transportOperation, ConfirmsAwareChan

var properties = channel.CreateBasicProperties();
properties.Fill(message, transportOperation.Properties);
messageCustomization(transportOperation, properties);

return channel.SendMessage(transportOperation.Destination, message, properties, cancellationToken);
}
Expand All @@ -71,6 +79,7 @@ Task PublishMessage(MulticastTransportOperation transportOperation, ConfirmsAwar

var properties = channel.CreateBasicProperties();
properties.Fill(message, transportOperation.Properties);
messageCustomization(transportOperation, properties);

return channel.PublishMessage(transportOperation.MessageType, message, properties, cancellationToken);
}
Expand Down

0 comments on commit a9e1f03

Please sign in to comment.