Skip to content

feat(#3266): reset consumer group offsets while keeping consumers 'online'#3267

Merged
superhx merged 3 commits into
1.6from
force_reset_offsets
Mar 31, 2026
Merged

feat(#3266): reset consumer group offsets while keeping consumers 'online'#3267
superhx merged 3 commits into
1.6from
force_reset_offsets

Conversation

@superhx
Copy link
Copy Markdown
Collaborator

@superhx superhx commented Mar 30, 2026

close #3266

…line'

Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
@superhx superhx changed the title feat(#3266): reset consumer group offsets while keeping consumers line' feat(#3266): reset consumer group offsets while keeping consumers 'online' Mar 30, 2026
@daniel-y
Copy link
Copy Markdown
Contributor

Code review

Found 1 issue:

  1. forceRemoveAllClassicGroupMembers makes an invalid state transition: group.transitionTo(ClassicGroupState.EMPTY) is called directly, but ClassicGroupState.EMPTY only accepts PREPARING_REBALANCE as a valid previous state. When the group is in STABLE or COMPLETING_REBALANCE (the normal state for a group with active consumers — which is the primary use case for this feature), assertValidTransition will throw IllegalStateException and crash the coordinator shard. The normal leave flow always transitions through PREPARING_REBALANCE before reaching EMPTY.

for (String memberId : new ArrayList<>(group.allMemberIds())) {
timer.cancel(classicGroupHeartbeatKey(group.groupId(), memberId));
group.remove(memberId);
}
group.transitionTo(ClassicGroupState.EMPTY);
records.add(CoordinatorRecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion()));
}

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
@superhx
Copy link
Copy Markdown
Collaborator Author

superhx commented Mar 31, 2026

Code review

Found 1 issue:

  1. forceRemoveAllClassicGroupMembers makes an invalid state transition: group.transitionTo(ClassicGroupState.EMPTY) is called directly, but ClassicGroupState.EMPTY only accepts PREPARING_REBALANCE as a valid previous state. When the group is in STABLE or COMPLETING_REBALANCE (the normal state for a group with active consumers — which is the primary use case for this feature), assertValidTransition will throw IllegalStateException and crash the coordinator shard. The normal leave flow always transitions through PREPARING_REBALANCE before reaching EMPTY.

for (String memberId : new ArrayList<>(group.allMemberIds())) {
timer.cancel(classicGroupHeartbeatKey(group.groupId(), memberId));
group.remove(memberId);
}
group.transitionTo(ClassicGroupState.EMPTY);
records.add(CoordinatorRecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion()));
}

🤖 Generated with Claude Code

  • If this code review was useful, please react with 👍. Otherwise, react with 👎.

Fixed. This issue only happens when the new coordinator is enabled.

@daniel-y
Copy link
Copy Markdown
Contributor

The fix at e288feb handles STABLE and COMPLETING_REBALANCE, but PREPARING_REBALANCE -> PREPARING_REBALANCE is still an invalid transition per the state machine:

PREPARING_REBALANCE.addValidPreviousStates(STABLE, COMPLETING_REBALANCE, EMPTY);

If a rebalance is already in progress when the force commit arrives, group.transitionTo(ClassicGroupState.PREPARING_REBALANCE) will throw IllegalStateException.

// EMPTY only accepts PREPARING_REBALANCE as a valid previous state,
// so we must transition through it first (mirroring the normal leave flow).
group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
group.transitionTo(ClassicGroupState.EMPTY);

Suggested fix — guard the transition:

if (!group.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
    group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
}
group.transitionTo(ClassicGroupState.EMPTY);

🤖 Generated with Claude Code

Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
@superhx
Copy link
Copy Markdown
Collaborator Author

superhx commented Mar 31, 2026

The fix at e288feb handles STABLE and COMPLETING_REBALANCE, but PREPARING_REBALANCE -> PREPARING_REBALANCE is still an invalid transition per the state machine:

PREPARING_REBALANCE.addValidPreviousStates(STABLE, COMPLETING_REBALANCE, EMPTY);

If a rebalance is already in progress when the force commit arrives, group.transitionTo(ClassicGroupState.PREPARING_REBALANCE) will throw IllegalStateException.

// EMPTY only accepts PREPARING_REBALANCE as a valid previous state,
// so we must transition through it first (mirroring the normal leave flow).
group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
group.transitionTo(ClassicGroupState.EMPTY);

Suggested fix — guard the transition:

if (!group.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
    group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
}
group.transitionTo(ClassicGroupState.EMPTY);

🤖 Generated with Claude Code

Fixed

@superhx superhx merged commit 5cb6705 into 1.6 Mar 31, 2026
6 checks passed
@superhx superhx deleted the force_reset_offsets branch March 31, 2026 08:41
superhx added a commit that referenced this pull request Mar 31, 2026
…line' (#3267)

Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants