Skip to content

Fix concurrency exceptions with global partitioned Kafka topics#2344

Merged
jeremydmiller merged 2 commits intomainfrom
fix/global-partitioned-kafka-concurrency
Mar 24, 2026
Merged

Fix concurrency exceptions with global partitioned Kafka topics#2344
jeremydmiller merged 2 commits intomainfrom
fix/global-partitioned-kafka-concurrency

Conversation

@jeremydmiller
Copy link
Copy Markdown
Member

Summary

Fixes concurrency exceptions (EventStreamUnexpectedMaxEventIdException) when using UseShardedKafkaTopics for global partitioned messaging across multiple nodes.

Three root causes were identified and fixed:

  • Shared Kafka consumer GroupId: Each node used its own ServiceName as the Kafka consumer GroupId, causing all nodes to receive all messages (broadcast) instead of exclusive partition assignment. Fixed by setting a deterministic GroupId based on the baseName in PartitionedMessageTopologyWithTopics.buildListener.

  • Consumer GroupId overwriting business GroupId: The Kafka listener stamped the consumer group name onto Envelope.GroupId, overwriting the business partition key (e.g. aggregate ID) set by ByPropertyNamed. This broke local queue sharding since all messages got the same GroupId hash. Fixed by calling DisableConsumerGroupIdStamping() on sharded topic listeners.

  • GlobalPartitionedReceiverBridge not handled in EnqueueDirectlyAsync: When the durability agent reassigned messages, ListeningAgent threw "no active local queue" because GlobalPartitionedReceiverBridge was not recognized as a valid receiver type. Fixed by adding a case for the bridge.

Test plan

  • Bug_concurrency_with_global_partitioning test passes (0 exceptions, 0 duplicate envelope executions across 3 hosts over 30 seconds)
  • Run full Kafka test suite
  • Verify no regressions in CoreTests

🤖 Generated with Claude Code

jeremydmiller and others added 2 commits March 24, 2026 10:42
Three root causes were identified and fixed:

1. Shared Kafka consumer GroupId: When using UseShardedKafkaTopics for
   global partitioning, each node used its own ServiceName as the Kafka
   consumer GroupId. This caused all nodes to receive all messages
   (broadcast) instead of exclusive partition assignment. Fixed by
   setting a deterministic GroupId based on the baseName in
   PartitionedMessageTopologyWithTopics.buildListener.

2. Consumer GroupId overwriting business GroupId: The Kafka listener
   stamped the consumer group name onto Envelope.GroupId, overwriting
   the business partition key (e.g. aggregate ID) set by
   ByPropertyNamed. This broke the local queue sharding since all
   messages got the same GroupId hash. Fixed by calling
   DisableConsumerGroupIdStamping on sharded topic listeners.

3. GlobalPartitionedReceiverBridge not handled in EnqueueDirectlyAsync:
   When the durability agent reassigned messages, ListeningAgent threw
   "no active local queue" because GlobalPartitionedReceiverBridge was
   not recognized as a valid receiver type. Fixed by adding a case for
   the bridge in ListeningAgent.EnqueueDirectlyAsync.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The concurrency test spins up 3 in-process hosts with Kafka and
Marten, which is resource-intensive and can cause OOM aborts in the
Kafka test suite. Move it to SlowTests where long-running tests belong.

Added Wolverine.Kafka project reference to SlowTests.csproj.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant