Fix GroupIdToPartitionKeyRule to prefer outgoing GroupId over originator's consumer group#2338
Merged
jeremydmiller merged 3 commits intoJasperFx:mainfrom Mar 24, 2026
Conversation
…tor's consumer group Previously, GroupIdToPartitionKeyRule set the Kafka consumer group name (config.GroupId) on every received envelope and propagated it as the partition key on outgoing messages. This caused cascaded messages to use the application's consumer group name (e.g. "my-application-name") as the partition key instead of the actual business key derived from the message property via ByPropertyNamed / UseInferredMessageGrouping. Fix the propagation priority in ApplyCorrelation: 1. Outgoing message's own GroupId (set by ByPropertyNamed) — the business partition key 2. Originator's PartitionKey (the actual Kafka message key of the incoming message) 3. Originator's GroupId as a last resort Also implement Modify so that messages published outside a handler context (e.g. background services) have their GroupId promoted to PartitionKey when no explicit key is set. Stop stamping envelope.GroupId = config.GroupId in KafkaListener and KafkaTopicGroupListener — the consumer group name is not meaningful metadata on the incoming envelope and was the root cause of the incorrect propagation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| var envelope = mapper!.CreateEnvelope(result.Topic, message); | ||
| envelope.Offset = result.Offset.Value; | ||
| envelope.MessageType ??= _messageTypeName; | ||
| envelope.GroupId = config.GroupId; |
Member
There was a problem hiding this comment.
This change will have to be "opt in" some how
Contributor
Author
There was a problem hiding this comment.
I added a bool on the KafkaTopic, a method on KafkaListenerConfiguration to change its value, and a doc update explaining the change (well, I told Claude what to do). Reckon this is okay?
This was referenced Mar 25, 2026
This was referenced Apr 2, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This fix aims to solve two cases where the outgoing Kafka partition key was observed to be either:
The first case is caused by the Kafka listener setting the envelope.GroupId = config.GroupId on the incoming message. If the handler from the Kafka topic cascades an outgoing message, the GroupIdToPartitionKeyRule runs and copies this GroupId value to be the outgoing partition key.
The second case happens when the handler is processing a message from a local queue, or when publishing fresh from application code. At routing time, DetermineGroupId correctly resolves outgoing.GroupId = stream id (via ByPropertyNamed). But GroupIdToPartitionKeyRule only looks at originator.Envelope?.GroupId, which is null or unrelated, so it sets nothing, and outgoing.PartitionKey stays null. Then CreateMessage in the Kafka producer reaches its fallback: envelope.Id.ToString().
Fix the propagation priority in ApplyCorrelation:
Also implement Modify so that messages published outside a handler context (e.g. background services) have their GroupId promoted to PartitionKey when no explicit key is set.
Stop stamping envelope.GroupId = config.GroupId in KafkaListener and KafkaTopicGroupListener - the consumer group name is not meaningful metadata on the incoming envelope.