Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,16 @@
*/
@InterfaceStability.Evolving
public class AlterConsumerGroupOffsetsOptions extends AbstractOptions<AlterConsumerGroupOffsetsOptions> {
// AutoMQ inject start
private boolean force = false;

public AlterConsumerGroupOffsetsOptions force(boolean force) {
this.force = force;
return this;
}

public boolean force() {
return force;
}
// AutoMQ inject end
}
Original file line number Diff line number Diff line change
Expand Up @@ -4208,7 +4208,7 @@ public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(
) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> future =
AlterConsumerGroupOffsetsHandler.newFuture(groupId);
AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, offsets, logContext);
AlterConsumerGroupOffsetsHandler handler = new AlterConsumerGroupOffsetsHandler(groupId, offsets, logContext, options.force()); // AutoMQ
invokeDriver(handler, future, options.timeoutMs);
return new AlterConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
Expand All @@ -44,21 +45,43 @@

public class AlterConsumerGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> {

// AutoMQ inject start
// Full name: __automq_consumer_group_force_commit
public static final String FORCE_COMMIT_SENTINEL_TOPIC = Topic.FORCE_COMMIT_SENTINEL_TOPIC;
// AutoMQ inject end

private final CoordinatorKey groupId;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
// AutoMQ inject start
private final boolean force;
// AutoMQ inject end

public AlterConsumerGroupOffsetsHandler(
String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets,
LogContext logContext
) {
// AutoMQ inject start
this(groupId, offsets, logContext, false);
// AutoMQ inject end
}

// AutoMQ inject start
public AlterConsumerGroupOffsetsHandler(
String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets,
LogContext logContext,
boolean force
) {
this.groupId = CoordinatorKey.byGroupId(groupId);
this.offsets = offsets;
this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
this.force = force;
}
// AutoMQ inject end

@Override
public String apiName() {
Expand Down Expand Up @@ -108,6 +131,17 @@ public OffsetCommitRequest.Builder buildBatchedRequest(
.setGroupId(groupId.idValue)
.setTopics(new ArrayList<>(offsetData.values()));

// AutoMQ inject start
if (force) {
OffsetCommitRequestTopic sentinelTopic = new OffsetCommitRequestTopic()
.setName(FORCE_COMMIT_SENTINEL_TOPIC);
sentinelTopic.partitions().add(new OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(0));
data.topics().add(sentinelTopic);
}
// AutoMQ inject end

return new OffsetCommitRequest.Builder(data);
}

Expand All @@ -125,6 +159,11 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(
final Map<TopicPartition, Errors> partitionResults = new HashMap<>();

for (OffsetCommitResponseTopic topic : response.data().topics()) {
// AutoMQ inject start
if (FORCE_COMMIT_SENTINEL_TOPIC.equals(topic.name())) {
continue;
}
// AutoMQ inject end
for (OffsetCommitResponsePartition partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class Topic {
public static final String AUTO_BALANCER_METRICS_TOPIC_NAME = "__auto_balancer_metrics";
public static final String TABLE_TOPIC_CONTROL_TOPIC_NAME = "__automq_table_control";
public static final String TABLE_TOPIC_DATA_TOPIC_NAME = "__automq_table_data";
// Full name: __automq_consumer_group_force_commit
public static final String FORCE_COMMIT_SENTINEL_TOPIC = "__a.c.g.f.c";
// AutoMQ inject end

public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,20 @@ class GroupCoordinator(
offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
// AutoMQ inject start
val forceCommitSentinelTopic = Topic.FORCE_COMMIT_SENTINEL_TOPIC
val forceCommit = offsetMetadata.keys.exists(_.topic == forceCommitSentinelTopic)
val realOffsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata] =
if (forceCommit) offsetMetadata.filter(_._1.topic != forceCommitSentinelTopic).toMap else offsetMetadata
val wrappedCallback: immutable.Map[TopicIdPartition, Errors] => Unit = if (forceCommit) {
commitStatus => {
val sentinelResponse = offsetMetadata.keys.filter(_.topic == forceCommitSentinelTopic).map(_ -> Errors.NONE).toMap
responseCallback(commitStatus ++ sentinelResponse)
}
} else {
responseCallback
}
// AutoMQ inject end
validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
case None =>
Expand All @@ -977,16 +991,16 @@ class GroupCoordinator(
if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata,
responseCallback, requestLocal)
doCommitOffsets(group, memberId, groupInstanceId, generationId, realOffsetMetadata,
wrappedCallback, requestLocal, forceCommit) // AutoMQ
} else {
// or this is a request coming from an older generation. either way, reject the commit
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
}

case Some(group) =>
doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata,
responseCallback, requestLocal)
doCommitOffsets(group, memberId, groupInstanceId, generationId, realOffsetMetadata,
wrappedCallback, requestLocal, forceCommit) // AutoMQ
}
}
}
Expand Down Expand Up @@ -1072,8 +1086,29 @@ class GroupCoordinator(
generationId: Int,
offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
requestLocal: RequestLocal): Unit = {
requestLocal: RequestLocal,
forceCommit: Boolean // AutoMQ
): Unit = {
group.inLock {
// AutoMQ inject start
if (forceCommit && generationId < 0 && !group.is(Empty)) {
// Force path: remove all members, transition to Empty, then commit offsets.
group.allMembers.toList.foreach { mid =>
val member = group.get(mid)
group.maybeInvokeJoinCallback(member, JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
group.remove(mid)
}
if (!group.is(PreparingRebalance)) {
group.transitionTo(PreparingRebalance)
}
group.initNextGeneration()
// Now group is Empty. Store offsets.
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
groupManager.storeOffsets(group, memberId, offsetTopicPartition, offsetMetadata, responseCallback, verificationGuard = None)
return
}
// AutoMQ inject end

val validationErrorOpt = validateOffsetCommit(
group,
generationId,
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.internals.Topic.{FORCE_COMMIT_SENTINEL_TOPIC, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
Expand Down Expand Up @@ -485,7 +485,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseBuilder = new OffsetCommitResponse.Builder()
val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
offsetCommitRequest.data.topics.forEach { topic =>
if (!authorizedTopics.contains(topic.name)) {
// AutoMQ inject start
if (topic.name == FORCE_COMMIT_SENTINEL_TOPIC) {
// Sentinel topic for force offset commit — bypass auth and metadata checks.
authorizedTopicsRequest += topic
} else if (!authorizedTopics.contains(topic.name)) {
// AutoMQ inject end
// If the topic is not authorized, we add the topic and all its partitions
// to the response with TOPIC_AUTHORIZATION_FAILED.
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5768,6 +5768,45 @@ private void removeCurrentMemberFromClassicGroup(
group.remove(member.memberId());
}

// AutoMQ inject start
/**
* Force-remove all members from a classic group and generate an empty group metadata record.
* Used by the force offset commit path.
*
* @param group The classic group.
* @param records The record list to populate.
*/
public void forceRemoveAllClassicGroupMembers(ClassicGroup group, List<CoordinatorRecord> records) {
for (String memberId : new ArrayList<>(group.allMemberIds())) {
timer.cancel(classicGroupHeartbeatKey(group.groupId(), memberId));
group.remove(memberId);
}
// EMPTY only accepts PREPARING_REBALANCE as a valid previous state,
// so we must transition through it first (mirroring the normal leave flow).
if (!group.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
}
group.transitionTo(ClassicGroupState.EMPTY);
records.add(CoordinatorRecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion()));
}

/**
* Force-remove all members from a consumer group and generate tombstone + epoch records.
* Used by the force offset commit path.
*
* @param group The consumer group.
* @param records The record list to populate.
*/
public void forceRemoveAllConsumerGroupMembers(ConsumerGroup group, List<CoordinatorRecord> records) {
for (String memberId : new ArrayList<>(group.members().keySet())) {
removeMember(records, group.groupId(), memberId);
cancelTimers(group.groupId(), memberId);
}
int groupEpoch = group.groupEpoch() + 1;
records.add(CoordinatorRecordHelpers.newGroupEpochRecord(group.groupId(), groupEpoch));
}
// AutoMQ inject end

/**
* Handles a DeleteGroups request.
* Populates the record list passed in with record to update the state machine.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
Expand Down Expand Up @@ -82,6 +83,15 @@
*/
public class OffsetMetadataManager {

// AutoMQ inject start
public static final String FORCE_COMMIT_SENTINEL_TOPIC = Topic.FORCE_COMMIT_SENTINEL_TOPIC;

private static boolean isForceCommit(OffsetCommitRequestData request) {
return request.topics().stream()
.anyMatch(t -> FORCE_COMMIT_SENTINEL_TOPIC.equals(t.name()));
}
// AutoMQ inject end

public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
Expand Down Expand Up @@ -302,6 +312,19 @@ private Group validateOffsetCommit(
RequestContext context,
OffsetCommitRequestData request
) throws ApiException {
// AutoMQ inject start
if (isForceCommit(request) && request.generationIdOrMemberEpoch() < 0) {
if (!request.memberId().isEmpty() || request.groupInstanceId() != null) {
throw Errors.INVALID_REQUEST.exception();
}
try {
return groupMetadataManager.group(request.groupId());
} catch (GroupIdNotFoundException ex) {
return groupMetadataManager.getOrMaybeCreateClassicGroup(request.groupId(), true);
}
}
// AutoMQ inject end

Group group;
try {
group = groupMetadataManager.group(request.groupId());
Expand Down Expand Up @@ -451,9 +474,15 @@ public CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> commitOffs
) throws ApiException {
Group group = validateOffsetCommit(context, request);

// AutoMQ inject start
boolean force = isForceCommit(request);
// AutoMQ inject end

// In the old consumer group protocol, the offset commits maintain the session if
// the group is in Stable or PreparingRebalance state.
if (group.type() == Group.GroupType.CLASSIC) {
// AutoMQ inject start
if (!force && group.type() == Group.GroupType.CLASSIC) {
// AutoMQ inject end
ClassicGroup classicGroup = (ClassicGroup) group;
if (classicGroup.isInState(ClassicGroupState.STABLE) || classicGroup.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
groupMetadataManager.rescheduleClassicGroupMemberHeartbeat(
Expand All @@ -468,7 +497,35 @@ public CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> commitOffs
final long currentTimeMs = time.milliseconds();
final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs);

// AutoMQ inject start
// Step 1: If force and group is non-empty, fence all members.
if (force && !group.isEmpty()) {
if (group.type() == Group.GroupType.CLASSIC) {
ClassicGroup classicGroup = (ClassicGroup) group;
classicGroup.completeAllJoinFutures(Errors.UNKNOWN_MEMBER_ID);
classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
groupMetadataManager.forceRemoveAllClassicGroupMembers(classicGroup, records);
} else if (group.type() == Group.GroupType.CONSUMER) {
groupMetadataManager.forceRemoveAllConsumerGroupMembers(
(org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup) group, records);
}
}
// AutoMQ inject end

request.topics().forEach(topic -> {
// AutoMQ inject start
if (FORCE_COMMIT_SENTINEL_TOPIC.equals(topic.name())) {
// Add success response for sentinel topic but don't write offset records.
OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic().setName(topic.name());
topic.partitions().forEach(p -> topicResponse.partitions().add(
new OffsetCommitResponsePartition()
.setPartitionIndex(p.partitionIndex())
.setErrorCode(Errors.NONE.code())));
response.topics().add(topicResponse);
return;
}
// AutoMQ inject end

final OffsetCommitResponseTopic topicResponse = new OffsetCommitResponseTopic().setName(topic.name());
response.topics().add(topicResponse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
withTimeoutMs(new DescribeConsumerGroupsOptions())
).describedGroups();

// AutoMQ inject start
boolean force = opts.options.has(opts.forceOpt);
// AutoMQ inject end

Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();

consumerGroups.forEach((groupId, groupDescription) -> {
Expand All @@ -550,8 +554,26 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {

break;
default:
printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty());
result.put(groupId, Collections.emptyMap());
// AutoMQ inject start
if (force) {
Collection<TopicPartition> partitions = getPartitionsToReset(groupId);
Map<TopicPartition, OffsetAndMetadata> offsets = prepareOffsetsToReset(groupId, partitions);

boolean isDryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt);
if (!isDryRun) {
adminClient.alterConsumerGroupOffsets(
groupId,
offsets,
withTimeoutMs(new AlterConsumerGroupOffsetsOptions().force(true))
).all().get();
}

result.put(groupId, offsets);
} else {
printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty());
result.put(groupId, Collections.emptyMap());
}
// AutoMQ inject end
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
Expand Down
Loading
Loading