-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-19779: Track assignment epochs in streams groups [3/N] #20730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements assignment epoch tracking for streams groups to support fencing of zombie commits. The core change replaces task assignments from Map<String, Set<Integer>> to Map<String, Map<Integer, Integer>> where the inner map value represents the assignment epoch for each partition. This change only applies to active tasks, as standby and warmup tasks don't commit offsets.
Key changes:
- Introduces
TasksTupleWithEpochsto carry assignment epochs alongside task assignments - Updates
CurrentAssignmentBuilderto preserve assignment epochs when tasks remain assigned and applies the target assignment epoch to newly assigned tasks - Modifies the
StreamsGroupCurrentMemberAssignmentValuerecord schema to include optional assignment epochs for active tasks
Reviewed Changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| TasksTupleWithEpochs.java | New class to represent task assignments with epochs for active tasks |
| TasksTupleWithEpochsTest.java | Comprehensive test coverage for the new TasksTupleWithEpochs class |
| CurrentAssignmentBuilder.java | Updated to track and preserve assignment epochs through state transitions |
| CurrentAssignmentBuilderTest.java | Extended with tests for epoch assignment lookup paths |
| StreamsGroupMember.java | Changed to use TasksTupleWithEpochs instead of TasksTuple |
| StreamsCoordinatorRecordHelpers.java | Added conversion logic for tasks with epochs to records |
| StreamsGroup.java | Updated process ID tracking to work with epoched tasks |
| GroupMetadataManager.java | Modified validation and response building for epoched tasks |
| TaskAssignmentTestUtil.java | Added helper methods for creating tasks with epochs in tests |
| StreamsGroupCurrentMemberAssignmentValue.json | Schema updated with optional assignmentEpochs field |
| Various test files | Mechanical updates to use epoch-aware task creation utilities |
Comments suppressed due to low confidence (2)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:1
- [nitpick] There's an empty line (16290) between the method chain continuation. While not incorrect, this inconsistent formatting reduces readability. Remove the blank line to maintain consistent method chaining style.
/*
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:1
- [nitpick] The epoch parameter (11) is placed on a separate line, which is inconsistent with other similar calls in the file where the epoch appears on the same line as the method name. Consider moving it to line 18132 for consistency.
/*
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
...ordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
Show resolved
Hide resolved
...coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
Show resolved
Hide resolved
b1a708d to
6ca6534
Compare
6ca6534 to
af9ad43
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did not go over test code yet.
...ordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTupleWithEpochs.java
Show resolved
Hide resolved
...nator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
Outdated
Show resolved
Hide resolved
...nator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
Outdated
Show resolved
Hide resolved
...nator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
Show resolved
Hide resolved
...nator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for you comments @mjsax ! Answered / Addressed.
I also updated the code in CurrentAssingmentBuilder to be more direct / not create intermediate collections and work a lot more like the previous code.
Instead of using computeAssignmentDifference in standby tasks and warmup tasks, active tasks now use computeAssignmentDifferenceWithEpoch, which updates epochs correctly but works otherwise very similar. This is also the code that consumer groups can use.
|
While playing through an example, I realized I can simplify the logic - I was trying to preserve assignment epochs from tasks that were previously pending revocation, in case the task is reassigned. But we will get no tasks assigned while pending revocations, even not the task that we are currently pending revocation. So we will only get the task back after having confirmed revocations (and completing commits) for the task, so then it is fine to get a new assignment epoch in this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. I made a first pass on it and it looks pretty good to me overall. However, I am not too familiar with Streams' concepts. I left a few comments for consideration.
| .setPartitions(partitions) | ||
| .setAssignmentEpochs(epochs)); | ||
| }); | ||
| taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that this is not strictly necessary but convenient for testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, convenient for testing and I like the persistent records to be deterministic, but there is no strict reason why we must do this.
| * This method creates a new map on each call. Consider using {@link #activeTasksWithEpochs()} | ||
| * directly when possible to avoid the conversion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to just get rid of the method if we don't recommend using it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I removed it
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIdsFromEpochs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // If the member comes with the previous epoch and has a subset of the current assignment partitions, | ||
| // we accept it because the response with the bumped epoch may have been lost. | ||
| if (receivedMemberEpoch != member.previousMemberEpoch() | ||
| || !areOwnedTasksContainedInAssignedTasks(ownedActiveTasks, member.assignedTasks().activeTasks()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is areOwnedTasksContainedInAssignedTasks still used or could we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's still used for standby tasks.
| * - For tasks in resultAssignedTasks and resultTasksPendingRevocation, the epoch from currentAssignment is preserved. | ||
| * - For tasks in resultTasksPendingAssignment, the targetAssignmentEpoch is used. | ||
| */ | ||
| private boolean computeAssignmentDifferenceWithEpoch(Map<String, Map<Integer, Integer>> currentAssignment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those new methods are basically copies of the previous ones. The only difference is that they operate on Map<String, Map<Integer, Integer>> vs Map<String, Set<Integer>>. This is a bit unfortunate but I guess that we don't really have the choice unless we use Map<String, Map<Integer, Integer>> everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, as I mentioned, the change for consumer groups will be cleaner. For streams we have the added complexity of having to track standby tasks and warmup tasks as well.
We could consider simplifying this code and tracking assignment epochs also for standby tasks and warmup tasks. I don't think there is much runtime overhead, so I'd be open to that. I just did not do it since it may become confusing that we are tracking assingment epochs for standby tasks but then never use them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .setPartitions(partitions) | ||
| .setAssignmentEpochs(epochs)); | ||
| }); | ||
| taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, convenient for testing and I like the persistent records to be deterministic, but there is no strict reason why we must do this.
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIdsFromEpochs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * This method creates a new map on each call. Consider using {@link #activeTasksWithEpochs()} | ||
| * directly when possible to avoid the conversion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I removed it
This PR implements assignment epoch tracking for streams groups.
Fundamentally, this replaces the representation of current assignments
and pending revocations from Map<String, Set> to Map<String,
Map<Integer, Integer>> where the value of the inner map is the
assignment epoch. Target assignments, and the tasks reported as
currently owned, still use the former representation, since they do not
include assignment epochs. This change only applies to active tasks.
From
TasksTuple, we createTasksTupleWithEpochs, which carries theassignment epochs.
The core of the change is in
CurrentAssignmentBuilder, which takes theprevious assignment epochs from
assignedTasksandtasksPendingRevocationand applies them to all tasks in the newassignment. If a task in the new assignment was not previously assigned,
it gets the targetAssignmentEpoch, that the member transitions to.
There are a lot of mechanic follow-up changes to use
TasksTupleWithEpochsin-place ofTasksTuple. In general, I tried tofollow the following strategy when adapting existing tests: - When
assignment epochs do no play a role in a test, we instantiate all tasks
with the same assignments epoch using TestAssignmentUtils. - The main
tests for correctly updating the assignment epoch are in
CurrentAssignmentBuilderTest.