Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/guide/messaging/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,39 @@ opts.MessagePartitioning.PublishToShardedAmazonSqsQueues("letters", 4, topology
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/AWS/Wolverine.AmazonSqs.Tests/concurrency_resilient_sharded_processing.cs#L72-L87' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_partitioned_publishing_through_amazon_sqs' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Propagating GroupId to PartitionKey <Badge type="tip" text="5.17" />

When using Kafka (or any transport that uses `PartitionKey`), you may want cascaded messages from a handler to
automatically inherit the originating message's `GroupId` as their `PartitionKey`. This ensures that cascaded messages
land on the same Kafka partition as the originating message without manually specifying `DeliveryOptions` on every
outgoing message.

This is especially useful when you have a chain of message handlers where the first message arrives at a Kafka topic
with a consumer group id, and you want all downstream cascaded messages to be published to the same partition.

<!-- snippet: sample_propagate_group_id_to_partition_key -->
<a id='snippet-sample_propagate_group_id_to_partition_key'></a>
```cs
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// Automatically propagate the originating message's GroupId
// to the PartitionKey of all cascaded outgoing messages.
// This is particularly useful with Kafka where you want
// cascaded messages to land on the same partition as the
// originating message without manually specifying
// DeliveryOptions on every outgoing message.
opts.Policies.PropagateGroupIdToPartitionKey();
});
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Samples/DocumentationSamples/PartitioningSamples.cs#L144-L158' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_propagate_group_id_to_partition_key' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: tip
The rule will not override an explicitly set `PartitionKey` on an outgoing envelope. If you set `PartitionKey` via
`DeliveryOptions`, that value takes precedence.
:::

## Partitioning Messages Received from External Systems

::: warning
Expand Down
14 changes: 14 additions & 0 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ public static ValueTask publish_by_partition_key(IMessageBus bus)
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Kafka/Wolverine.Kafka.Tests/when_publishing_and_receiving_by_partition_key.cs#L13-L20' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_publish_to_kafka_by_partition_key' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Propagating GroupId to PartitionKey <Badge type="tip" text="5.17" />

When consuming from a Kafka topic, the incoming envelope's `GroupId` is automatically set from the Kafka consumer's
configured `GroupId`. If your handler produces cascaded messages that should land on the same partition, you can
enable automatic propagation of the originating `GroupId` to the outgoing `PartitionKey`:

```csharp
opts.Policies.PropagateGroupIdToPartitionKey();
```

This eliminates the need to manually set `DeliveryOptions.PartitionKey` on every outgoing message from your handlers.
The rule will never override an explicitly set `PartitionKey`. See the [Partitioned Sequential Messaging](/guide/messaging/partitioning#propagating-groupid-to-partitionkey)
documentation for more details and a code sample.

## Interoperability

::: tip
Expand Down
21 changes: 20 additions & 1 deletion src/Samples/DocumentationSamples/PartitioningSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,30 @@ public static async Task configuring_by_property_name()
public static async Task SendMessageToGroup(IMessageBus bus)
{
await bus.PublishAsync(
new ApproveInvoice("AAA"),
new ApproveInvoice("AAA"),
new() { GroupId = "agroup" });
}

#endregion

public static async Task propagate_group_id_to_partition_key()
{
#region sample_propagate_group_id_to_partition_key

var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// Automatically propagate the originating message's GroupId
// to the PartitionKey of all cascaded outgoing messages.
// This is particularly useful with Kafka where you want
// cascaded messages to land on the same partition as the
// originating message without manually specifying
// DeliveryOptions on every outgoing message.
opts.Policies.PropagateGroupIdToPartitionKey();
});

#endregion
}
}

public record PayInvoice(string Id);
Expand Down
75 changes: 75 additions & 0 deletions src/Testing/CoreTests/GroupIdToPartitionKeyRuleTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using NSubstitute;
using Wolverine;
using Wolverine.ComplianceTests;
using Xunit;

namespace CoreTests;

public class GroupIdToPartitionKeyRuleTests
{
[Fact]
public void propagates_group_id_to_partition_key()
{
var rule = new GroupIdToPartitionKeyRule();
var originator = Substitute.For<IMessageContext>();
var originatingEnvelope = ObjectMother.Envelope();
originatingEnvelope.GroupId = "stream-123";
originator.Envelope.Returns(originatingEnvelope);

var outgoing = ObjectMother.Envelope();
outgoing.PartitionKey = null;

rule.ApplyCorrelation(originator, outgoing);

outgoing.PartitionKey.ShouldBe("stream-123");
}

[Fact]
public void does_not_override_existing_partition_key()
{
var rule = new GroupIdToPartitionKeyRule();
var originator = Substitute.For<IMessageContext>();
var originatingEnvelope = ObjectMother.Envelope();
originatingEnvelope.GroupId = "stream-123";
originator.Envelope.Returns(originatingEnvelope);

var outgoing = ObjectMother.Envelope();
outgoing.PartitionKey = "explicit-key";

rule.ApplyCorrelation(originator, outgoing);

outgoing.PartitionKey.ShouldBe("explicit-key");
}

[Fact]
public void does_nothing_when_originator_has_no_group_id()
{
var rule = new GroupIdToPartitionKeyRule();
var originator = Substitute.For<IMessageContext>();
var originatingEnvelope = ObjectMother.Envelope();
originatingEnvelope.GroupId = null;
originator.Envelope.Returns(originatingEnvelope);

var outgoing = ObjectMother.Envelope();
outgoing.PartitionKey = null;

rule.ApplyCorrelation(originator, outgoing);

outgoing.PartitionKey.ShouldBeNull();
}

[Fact]
public void does_nothing_when_originator_has_no_envelope()
{
var rule = new GroupIdToPartitionKeyRule();
var originator = Substitute.For<IMessageContext>();
originator.Envelope.Returns((Envelope?)null);

var outgoing = ObjectMother.Envelope();
outgoing.PartitionKey = null;

rule.ApplyCorrelation(originator, outgoing);

outgoing.PartitionKey.ShouldBeNull();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Tracking;

namespace Wolverine.Kafka.Tests;

public class propagate_group_id_to_partition_key : IAsyncLifetime
{
private IHost _host;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ServiceName = "PropagateGroupIdTests";

opts.UseKafka("localhost:9092").AutoProvision();

// Enable the feature under test
opts.Policies.PropagateGroupIdToPartitionKey();

opts.Policies.DisableConventionalLocalRouting();

// Listen to source topic with an explicit GroupId
opts.ListenToKafkaTopic("groupid-source")
.ProcessInline()
.ConfigureConsumer(config =>
{
config.GroupId = "source-group-123";
});

// Listen to target topic where cascaded messages arrive
opts.ListenToKafkaTopic("groupid-target")
.ProcessInline();

// Route cascaded TargetFromGroupId messages to the target topic
opts.PublishMessage<TargetFromGroupId>()
.ToKafkaTopic("groupid-target")
.SendInline();

opts.Discovery.IncludeAssembly(GetType().Assembly);
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
}

[Fact]
public async Task cascaded_message_receives_partition_key_from_originating_group_id()
{
var session = await _host.TrackActivity()

Check failure on line 52 in src/Transports/Kafka/Wolverine.Kafka.Tests/propagate_group_id_to_partition_key.cs

View workflow job for this annotation

GitHub Actions / test

Wolverine.Kafka.Tests.propagate_group_id_to_partition_key.cascaded_message_receives_partition_key_from_originating_group_id

System.TimeoutException : This TrackedSession timed out before all activity completed. Activity detected: ------------------------------------------------------------------------------------------------------------------ | Message Id | Message Type | Time (ms) | Event | ------------------------------------------------------------------------------------------------------------------ | 08de7897-ee39-43eb-7ced-8d2b94fe0000 | Wolverine.Kafka.Tests.TriggerFromGroupId | 0| NoRoutes | ------------------------------------------------------------------------------------------------------------------ Conditions: Wait for message of type Wolverine.Kafka.Tests.TargetFromGroupId to be received at node bbb6693d-e2a8-44f2-9bab-b2a26d43a9bd (False)
.IncludeExternalTransports()
.Timeout(30.Seconds())
.WaitForMessageToBeReceivedAt<TargetFromGroupId>(_host)
.PublishMessageAndWaitAsync(new TriggerFromGroupId("hello"));

var envelope = session.Received.SingleEnvelope<TargetFromGroupId>();
envelope.PartitionKey.ShouldBe("source-group-123");
}

public async Task DisposeAsync()
{
await _host.StopAsync();
}
}

public record TriggerFromGroupId(string Name);

public record TargetFromGroupId(string Name);

public static class TriggerFromGroupIdHandler
{
public static TargetFromGroupId Handle(TriggerFromGroupId message)
{
return new TargetFromGroupId(message.Name);
}
}

public static class TargetFromGroupIdHandler
{
public static void Handle(TargetFromGroupId message)
{
// no-op, just receive
}
}
25 changes: 25 additions & 0 deletions src/Wolverine/IEnvelopeRule.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Core;
using Wolverine.Util;

namespace Wolverine;
Expand Down Expand Up @@ -108,6 +109,30 @@ public override int GetHashCode()
}
}

/// <summary>
/// Propagates the originating message's GroupId to the outgoing envelope's PartitionKey.
/// This is useful for automatically carrying forward a stream/group id as a Kafka partition key
/// on cascaded or published messages without manually setting DeliveryOptions on every message.
/// </summary>
internal class GroupIdToPartitionKeyRule : IEnvelopeRule
{
public void Modify(Envelope envelope)
{
// No-op when used without an originator context
}

public void ApplyCorrelation(IMessageContext originator, Envelope outgoing)
{
if (outgoing.PartitionKey.IsNotEmpty()) return;

var groupId = originator.Envelope?.GroupId;
if (groupId.IsNotEmpty())
{
outgoing.PartitionKey = groupId;
}
}
}

internal class LambdaEnvelopeRule : IEnvelopeRule
{
private readonly Action<Envelope> _configure;
Expand Down
8 changes: 8 additions & 0 deletions src/Wolverine/IPolicies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,12 @@ public interface IPolicies : IEnumerable<IWolverinePolicy>, IWithFailurePolicies
/// history, and endpoint events to the message bus? The default is false;
/// </summary>
bool PublishAgentEvents { get; set; }

/// <summary>
/// Automatically propagate the originating message's GroupId to the PartitionKey of outgoing
/// envelopes. This is useful when using Kafka and you want cascaded messages to inherit
/// the partition key from the originating message's group/stream id without manually
/// specifying DeliveryOptions on every outgoing message.
/// </summary>
void PropagateGroupIdToPartitionKey();
}
19 changes: 18 additions & 1 deletion src/Wolverine/Runtime/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,12 @@ public ValueTask BroadcastToTopicAsync(string topicName, object message, Deliver
internal async ValueTask PersistOrSendAsync(Envelope envelope)
{
if (envelope is null) return; // Not sure how this would happen


foreach (var rule in Runtime.Options.MetadataRules)
{
rule.ApplyCorrelation(this, envelope);
}

if (envelope.Sender is null)
{
throw new InvalidOperationException("Envelope has not been routed");
Expand Down Expand Up @@ -306,6 +311,18 @@ internal virtual void TrackEnvelopeCorrelation(Envelope outbound, Activity? acti

internal async ValueTask PersistOrSendAsync(params Envelope[] outgoing)
{
var metadataRules = Runtime.Options.MetadataRules;
if (metadataRules.Count > 0)
{
foreach (var envelope in outgoing)
{
foreach (var rule in metadataRules)
{
rule.ApplyCorrelation(this, envelope);
}
}
}

if (Transaction != null)
{
// This filtering is done to only persist envelopes where
Expand Down
5 changes: 5 additions & 0 deletions src/Wolverine/WolverineOptions.Policies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ void IPolicies.LogMessageStarting(LogLevel logLevel)
RegisteredPolicies.Insert(0, new LogStartingActivityPolicy(logLevel));
}

void IPolicies.PropagateGroupIdToPartitionKey()
{
MetadataRules.Add(new GroupIdToPartitionKeyRule());
}

internal MiddlewarePolicy FindOrCreateMiddlewarePolicy()
{
var policy = RegisteredPolicies.OfType<MiddlewarePolicy>().FirstOrDefault();
Expand Down
8 changes: 7 additions & 1 deletion src/Wolverine/WolverineOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@

}

public WolverineOptions(string? assemblyName)

Check warning on line 108 in src/Wolverine/WolverineOptions.cs

View workflow job for this annotation

GitHub Actions / test

Non-nullable property 'ServiceName' must contain a non-null value when exiting constructor. Consider adding the 'required' modifier or declaring the property as nullable.
{
Transports = new TransportCollection();

Expand Down Expand Up @@ -199,7 +199,13 @@
/// This will be automatically applied to all outgoing messages, but will never override
/// any explicitly defined Envelope.GroupId
/// </summary>
public MessagePartitioningRules MessagePartitioning { get; }
public MessagePartitioningRules MessagePartitioning { get; }

/// <summary>
/// Internal list of IEnvelopeRule instances that are applied via ApplyCorrelation
/// to outgoing envelopes in PersistOrSendAsync
/// </summary>
internal List<IEnvelopeRule> MetadataRules { get; } = new();


/// For advanced usages, this gives you the ability to register pre-canned message handling
Expand Down Expand Up @@ -228,7 +234,7 @@
public void AddMessageHandler<T>(MessageHandler<T> handler)
{
AddMessageHandler(typeof(T), handler);
handler.ConfigureChain(handler.Chain); // Yeah, this is 100% a tell, don't ask violation

Check warning on line 237 in src/Wolverine/WolverineOptions.cs

View workflow job for this annotation

GitHub Actions / test

Possible null reference argument for parameter 'chain' in 'void MessageHandler<T>.ConfigureChain(HandlerChain chain)'.
}

[IgnoreDescription]
Expand Down
Loading