Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class RoutingToDispatchConnectorTests
[Test]
public async Task Should_preserve_message_state_for_one_routing_strategy_for_allocation_reasons()
{
var behavior = new RoutingToDispatchConnector();
var behavior = new RoutingToDispatchConnector([]);
IEnumerable<TransportOperation> operations = null;
var testableRoutingContext = new TestableRoutingContext
{
Expand Down Expand Up @@ -59,7 +59,7 @@ await behavior.Invoke(testableRoutingContext, context =>
[Test]
public async Task Should_copy_message_state_for_multiple_routing_strategies()
{
var behavior = new RoutingToDispatchConnector();
var behavior = new RoutingToDispatchConnector([]);
IEnumerable<TransportOperation> operations = null;
var testableRoutingContext = new TestableRoutingContext
{
Expand Down Expand Up @@ -124,7 +124,7 @@ await behavior.Invoke(testableRoutingContext, context =>
[Test]
public async Task Should_preserve_headers_generated_by_custom_routing_strategy()
{
var behavior = new RoutingToDispatchConnector();
var behavior = new RoutingToDispatchConnector([]);
Dictionary<string, string> headers = null;
await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = [new HeaderModifyingRoutingStrategy()] }, context =>
{
Expand All @@ -142,7 +142,7 @@ public async Task Should_dispatch_immediately_if_user_requested()
options.RequireImmediateDispatch();

var dispatched = false;
var behavior = new RoutingToDispatchConnector();
var behavior = new RoutingToDispatchConnector([]);
var message = new OutgoingMessage("ID", [], Array.Empty<byte>());

await behavior.Invoke(new RoutingContext(message,
Expand All @@ -159,7 +159,7 @@ await behavior.Invoke(new RoutingContext(message,
public async Task Should_dispatch_immediately_if_not_sending_from_a_handler()
{
var dispatched = false;
var behavior = new RoutingToDispatchConnector();
var behavior = new RoutingToDispatchConnector([]);
var message = new OutgoingMessage("ID", [], Array.Empty<byte>());

await behavior.Invoke(new RoutingContext(message,
Expand All @@ -176,7 +176,7 @@ await behavior.Invoke(new RoutingContext(message,
public async Task Should_not_dispatch_by_default()
{
var dispatched = false;
var behavior = new RoutingToDispatchConnector();
var behavior = new RoutingToDispatchConnector([]);
var message = new OutgoingMessage("ID", [], Array.Empty<byte>());

await behavior.Invoke(new RoutingContext(message,
Expand All @@ -192,7 +192,7 @@ await behavior.Invoke(new RoutingContext(message,
[Test]
public async Task Should_promote_message_headers_to_pipeline_activity()
{
var behavior = new RoutingToDispatchConnector();
var behavior = new RoutingToDispatchConnector([]);
var routingContext = new TestableRoutingContext();
routingContext.Message.Headers[Headers.ContentType] = "test content type"; // one of the headers that will be mapped to tags

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,16 @@

namespace NServiceBus;

using System.Linq;
using System.Threading.Tasks;
using Pipeline;
using Transport;

class ImmediateDispatchTerminator : PipelineTerminator<IDispatchContext>
class ImmediateDispatchTerminator(IMessageDispatcher dispatcher) : PipelineTerminator<IDispatchContext>
{
public ImmediateDispatchTerminator(IMessageDispatcher dispatcher)
{
this.dispatcher = dispatcher;
}

protected override Task Terminate(IDispatchContext context)
{
var transaction = context.Extensions.GetOrCreate<TransportTransaction>();
var operations = context.Operations as TransportOperation[] ?? context.Operations.ToArray();
var operations = context.Operations as TransportOperation[] ?? [.. context.Operations];
return dispatcher.Dispatch(new TransportOperations(operations), transaction, context.CancellationToken);
}

readonly IMessageDispatcher dispatcher;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace NServiceBus;

using System;
using System.Collections.Frozen;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -12,7 +13,7 @@ namespace NServiceBus;
using Routing;
using Transport;

class RoutingToDispatchConnector : StageConnector<IRoutingContext, IDispatchContext>
class RoutingToDispatchConnector(FrozenSet<string> dispatchPropertyNamesToPropagate) : StageConnector<IRoutingContext, IDispatchContext>
{
public override Task Invoke(IRoutingContext context, Func<IDispatchContext, Task> stage)
{
Expand All @@ -26,13 +27,36 @@ public override Task Invoke(IRoutingContext context, Func<IDispatchContext, Task
// This may not be the outgoing message activity created by NServiceBus.
ContextPropagation.PropagateContextToHeaders(Activity.Current, context.Message.Headers, context.Extensions);

DispatchProperties? dispatchProperties;
bool shouldPropagate;
if (dispatchPropertyNamesToPropagate.Count > 0)
{
shouldPropagate = context.Extensions.TryGet("IncomingMessage.DispatchProperties", out dispatchProperties);
}
else
{
dispatchProperties = null;
shouldPropagate = false;
}

var operations = new TransportOperation[context.RoutingStrategies.Count];
var index = 0;
// when there are more than one routing strategy we want to make sure each transport operation is independent
var copySharedMutableMessageState = context.RoutingStrategies.Count > 1;
foreach (var strategy in context.RoutingStrategies)
{
operations[index] = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState);
var transportOperation = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState);
if (shouldPropagate)
{
foreach (var propertyName in dispatchPropertyNamesToPropagate)
{
if (dispatchProperties?.TryGetValue(propertyName, out var propertyValue) is true && !transportOperation.Properties.TryAdd(propertyName, propertyValue))
{
// explicitly set on the outgoing operation, do not override
}
}
}
operations[index] = transportOperation;
index++;
}

Expand Down
3 changes: 2 additions & 1 deletion src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace NServiceBus;
using MessageInterfaces;
using Microsoft.Extensions.DependencyInjection;
using Pipeline;
using Settings;
using Transport;

class SendComponent
Expand All @@ -27,7 +28,7 @@ public static SendComponent Initialize(PipelineSettings pipelineSettings, Hostin


pipelineSettings.Register(new OutgoingPhysicalToRoutingConnector(), "Starts the message dispatch pipeline");
pipelineSettings.Register(new RoutingToDispatchConnector(), "Decides if the current message should be batched or immediately be dispatched to the transport");
pipelineSettings.Register(b => new RoutingToDispatchConnector(b.GetRequiredService<IReadOnlySettings>().Get<TransportDefinition>().DispatchPropertyNamesToPreserve), "Decides if the current message should be batched or immediately be dispatched to the transport");
pipelineSettings.Register(new BatchToDispatchConnector(), "Passes batched messages over to the immediate dispatch part of the pipeline");
pipelineSettings.Register(b => new ImmediateDispatchTerminator(b.GetRequiredService<IMessageDispatcher>()), "Hands the outgoing messages over to the transport for immediate delivery");

Expand Down
7 changes: 7 additions & 0 deletions src/NServiceBus.Core/Transports/ErrorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public ErrorContext(Exception exception, Dictionary<string, string> headers, str

DelayedDeliveriesPerformed = Message.GetDelayedDeliveriesPerformed();
Extensions = context;

if (context.TryGet<DispatchProperties>(out var dispatchProperties))
{
context.Remove<DispatchProperties>();
// Hack hardcoded string for now
context.Set("IncomingMessage.DispatchProperties", dispatchProperties);
}
}

/// <summary>
Expand Down
7 changes: 7 additions & 0 deletions src/NServiceBus.Core/Transports/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public MessageContext(string nativeMessageId, Dictionary<string, string> headers
ReceiveAddress = receiveAddress;
TransportTransaction = transportTransaction;

if (context.TryGet<DispatchProperties>(out var dispatchProperties))
{
context.Remove<DispatchProperties>();
// Hack hardcoded string for now
context.Set("IncomingMessage.DispatchProperties", dispatchProperties);
}

context.GetOrCreate<IncomingPipelineMetricTags>();
}

Expand Down
6 changes: 6 additions & 0 deletions src/NServiceBus.Core/Transports/TransportDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace NServiceBus.Transport;

using System;
using System.Collections.Frozen;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -84,6 +85,11 @@ public virtual TransportTransactionMode TransportTransactionMode
/// </summary>
public bool SupportsTTBR { get; }

/// <summary>
/// TBD
/// </summary>
protected internal virtual FrozenSet<string> DispatchPropertyNamesToPreserve { get; } = [];

/// <summary>
/// Allows the transport to register required services into the service collection.
/// </summary>
Expand Down
Loading