Skip to content

Conversation

@lucasbru
Copy link
Member

@lucasbru lucasbru commented Oct 20, 2025

This PR implements assignment epoch tracking for streams groups.

Fundamentally, this replaces the representation of current assignments
and pending revocations from Map<String, Set> to Map<String,
Map<Integer, Integer>> where the value of the inner map is the
assignment epoch. Target assignments, and the tasks reported as
currently owned, still use the former representation, since they do not
include assignment epochs. This change only applies to active tasks.
From TasksTuple, we create TasksTupleWithEpochs, which carries the
assignment epochs.

The core of the change is in CurrentAssignmentBuilder, which takes the
previous assignment epochs from assignedTasks and
tasksPendingRevocation and applies them to all tasks in the new
assignment. If a task in the new assignment was not previously assigned,
it gets the targetAssignmentEpoch, that the member transitions to.

There are a lot of mechanic follow-up changes to use
TasksTupleWithEpochs in-place of TasksTuple. In general, I tried to
follow the following strategy when adapting existing tests: - When
assignment epochs do no play a role in a test, we instantiate all tasks
with the same assignments epoch using TestAssignmentUtils. - The main
tests for correctly updating the assignment epoch are in
CurrentAssignmentBuilderTest.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements assignment epoch tracking for streams groups to support fencing of zombie commits. The core change replaces task assignments from Map<String, Set<Integer>> to Map<String, Map<Integer, Integer>> where the inner map value represents the assignment epoch for each partition. This change only applies to active tasks, as standby and warmup tasks don't commit offsets.

Key changes:

  • Introduces TasksTupleWithEpochs to carry assignment epochs alongside task assignments
  • Updates CurrentAssignmentBuilder to preserve assignment epochs when tasks remain assigned and applies the target assignment epoch to newly assigned tasks
  • Modifies the StreamsGroupCurrentMemberAssignmentValue record schema to include optional assignment epochs for active tasks

Reviewed Changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
TasksTupleWithEpochs.java New class to represent task assignments with epochs for active tasks
TasksTupleWithEpochsTest.java Comprehensive test coverage for the new TasksTupleWithEpochs class
CurrentAssignmentBuilder.java Updated to track and preserve assignment epochs through state transitions
CurrentAssignmentBuilderTest.java Extended with tests for epoch assignment lookup paths
StreamsGroupMember.java Changed to use TasksTupleWithEpochs instead of TasksTuple
StreamsCoordinatorRecordHelpers.java Added conversion logic for tasks with epochs to records
StreamsGroup.java Updated process ID tracking to work with epoched tasks
GroupMetadataManager.java Modified validation and response building for epoched tasks
TaskAssignmentTestUtil.java Added helper methods for creating tasks with epochs in tests
StreamsGroupCurrentMemberAssignmentValue.json Schema updated with optional assignmentEpochs field
Various test files Mechanical updates to use epoch-aware task creation utilities
Comments suppressed due to low confidence (2)

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:1

  • [nitpick] There's an empty line (16290) between the method chain continuation. While not incorrect, this inconsistent formatting reduces readability. Remove the blank line to maintain consistent method chaining style.
/*

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:1

  • [nitpick] The epoch parameter (11) is placed on a separate line, which is inconsistent with other similar calls in the file where the epoch appears on the same line as the method name. Consider moving it to line 18132 for consistency.
/*

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@lucasbru lucasbru force-pushed the KAFKA-19779-3 branch 4 times, most recently from b1a708d to 6ca6534 Compare October 20, 2025 15:10
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not go over test code yet.

@lucasbru lucasbru changed the title KAFKA-19779: Track assignment epochs to streams groups KAFKA-19779: Track assignment epochs in streams groups Oct 21, 2025
Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for you comments @mjsax ! Answered / Addressed.

I also updated the code in CurrentAssingmentBuilder to be more direct / not create intermediate collections and work a lot more like the previous code.

Instead of using computeAssignmentDifference in standby tasks and warmup tasks, active tasks now use computeAssignmentDifferenceWithEpoch, which updates epochs correctly but works otherwise very similar. This is also the code that consumer groups can use.

@lucasbru
Copy link
Member Author

While playing through an example, I realized I can simplify the logic - I was trying to preserve assignment epochs from tasks that were previously pending revocation, in case the task is reassigned. But we will get no tasks assigned while pending revocations, even not the task that we are currently pending revocation. So we will only get the task back after having confirmed revocations (and completing commits) for the task, so then it is fine to get a new assignment epoch in this case.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. I made a first pass on it and it looks pretty good to me overall. However, I am not too familiar with Streams' concepts. I left a few comments for consideration.

.setPartitions(partitions)
.setAssignmentEpochs(epochs));
});
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that this is not strictly necessary but convenient for testing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, convenient for testing and I like the persistent records to be deterministic, but there is no strict reason why we must do this.

Comment on lines 67 to 68
* This method creates a new map on each call. Consider using {@link #activeTasksWithEpochs()}
* directly when possible to avoid the conversion.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to just get rid of the method if we don't recommend using it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I removed it

.collect(Collectors.toList());
}

private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// If the member comes with the previous epoch and has a subset of the current assignment partitions,
// we accept it because the response with the bumped epoch may have been lost.
if (receivedMemberEpoch != member.previousMemberEpoch()
|| !areOwnedTasksContainedInAssignedTasks(ownedActiveTasks, member.assignedTasks().activeTasks())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is areOwnedTasksContainedInAssignedTasks still used or could we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's still used for standby tasks.

* - For tasks in resultAssignedTasks and resultTasksPendingRevocation, the epoch from currentAssignment is preserved.
* - For tasks in resultTasksPendingAssignment, the targetAssignmentEpoch is used.
*/
private boolean computeAssignmentDifferenceWithEpoch(Map<String, Map<Integer, Integer>> currentAssignment,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those new methods are basically copies of the previous ones. The only difference is that they operate on Map<String, Map<Integer, Integer>> vs Map<String, Set<Integer>>. This is a bit unfortunate but I guess that we don't really have the choice unless we use Map<String, Map<Integer, Integer>> everywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as I mentioned, the change for consumer groups will be cleaner. For streams we have the added complexity of having to track standby tasks and warmup tasks as well.

We could consider simplifying this code and tracking assignment epochs also for standby tasks and warmup tasks. I don't think there is much runtime overhead, so I'd be open to that. I just did not do it since it may become confusing that we are tracking assingment epochs for standby tasks but then never use them.

Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax and @dajac for the review comments. All addressed

.setPartitions(partitions)
.setAssignmentEpochs(epochs));
});
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, convenient for testing and I like the persistent records to be deterministic, but there is no strict reason why we must do this.

.collect(Collectors.toList());
}

private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 67 to 68
* This method creates a new map on each call. Consider using {@link #activeTasksWithEpochs()}
* directly when possible to avoid the conversion.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I removed it

@lucasbru lucasbru changed the title KAFKA-19779: Track assignment epochs in streams groups KAFKA-19779: Track assignment epochs in streams groups [3/N] Oct 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants