Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for DeleteShareGroupOffsets RPC #18976

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest|GroupCoordinatorService).java"/>
<suppress checks="JavaNCSS"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

Expand Down
24 changes: 24 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,30 @@ default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareG
return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
}

/**
* Delete offsets for a set of partitions in a share group.
*
* @param groupId The group for which to delete offsets.
* @param partitions The topic-partitions.
* @param options The options to use when deleting offsets in a share group.
* @return The DeleteShareGroupOffsetsResult.
*/
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options);

/**
* Delete offsets for a set of partitions in a share group with the default options.
*
* <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
* See the overload for more details.
*
* @param groupId The group for which to delete offsets.
* @param partitions The topic-partitions.
* @return The DeleteShareGroupOffsetsResult.
*/
default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
}

/**
* Describe some classic groups in the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Set;

/**
* Options for the {@link Admin#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The result of the {@link Admin#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResult {

private final KafkaFuture<Map<TopicPartition, Errors>> future;

DeleteShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
this.future = future;
}

/**
* Return a future which succeeds only if all the deletions succeed.
*/
public KafkaFuture<Void> all() {
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed deleting share group offsets for the following partitions: " + partitionsFailed);
}
}
return null;
});
}

/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();

this.future.whenComplete((topicPartitions, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!topicPartitions.containsKey(partition)) {
result.completeExceptionally(new IllegalArgumentException(
"Delete offset for partition \"" + partition + "\" was not attempted"));
} else {
final Errors error = topicPartitions.get(partition);
if (error == Errors.NONE) {
result.complete(null);
} else {
result.completeExceptionally(error.exception());
}
}
});

return result;
}

/**
* Return a future which yields a map of topic partitions to errors
*
* @return Future which yields a map of topic partitions to errors
*/
public KafkaFuture<Map<TopicPartition, Errors>> partitionResults() {
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ public ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGr
return delegate.listShareGroupOffsets(groupSpecs, options);
}

@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
return delegate.deleteShareGroupOffsets(groupId, partitions, options);
}

@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DeleteShareGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
Expand Down Expand Up @@ -3818,6 +3819,14 @@ public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListS
return new ListShareGroupOffsetsResult(future.all());
}

@Override
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, partitions, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
}

@Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds,
final DescribeClassicGroupsOptions options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DeleteShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.DeleteShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* This class is the handler for {@link KafkaAdminClient#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call
*/
public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Errors>> {

private final CoordinatorKey groupId;

private final Logger log;

private final Set<TopicPartition> partitions;

private final CoordinatorStrategy lookupStrategy;


public DeleteShareGroupOffsetsHandler(String groupId, Set<TopicPartition> partitions, LogContext logContext) {
this.groupId = CoordinatorKey.byGroupId(groupId);
this.partitions = partitions;
this.log = logContext.logger(DeleteShareGroupOffsetsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
}

@Override
public String apiName() {
return "deleteShareGroupOffsets";
}

@Override
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
return lookupStrategy;
}

public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> newFuture(String groupId) {
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
}

private void validateKeys(Set<CoordinatorKey> groupIds) {
if (!groupIds.equals(Collections.singleton(groupId))) {
throw new IllegalArgumentException("Received unexpected group ids " + groupIds +
" (expected only " + Collections.singleton(groupId) + ")");
}
}

@Override
DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);

final List<DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic> topics =
new ArrayList<>();
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add(
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(topic)
.setPartitions(topicPartitions.stream()
.map(TopicPartition::partition)
.collect(Collectors.toList())
)
));

return new DeleteShareGroupOffsetsRequest.Builder(
new DeleteShareGroupOffsetsRequestData()
.setGroupId(groupId.idValue)
.setTopics(topics),
true
);
}

@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(
Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
validateKeys(groupIds);

final DeleteShareGroupOffsetsResponse response = (DeleteShareGroupOffsetsResponse) abstractResponse;

final Errors groupError = Errors.forCode(response.data().errorCode());

if (groupError != Errors.NONE) {
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
final Map<CoordinatorKey, Throwable> groupsFailed = new HashMap<>();
handleGroupError(groupId, groupError, groupsFailed, groupsToUnmap);

return new ApiResult<>(Collections.emptyMap(), groupsFailed, new ArrayList<>(groupsToUnmap));
} else {
final Map<TopicPartition, Errors> partitionResults = new HashMap<>();
response.data().responses().forEach(topic ->
topic.partitions().forEach(partition ->
partitionResults.put(
new TopicPartition(topic.topicName(), partition.partitionIndex()),
Errors.forCode(partition.errorCode())
)
)
);

return ApiResult.completed(groupId, partitionResults);
}
}

private void handleGroupError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case COORDINATOR_LOAD_IN_PROGRESS:
case REBALANCE_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("DeleteShareGroupOffsets request for group id {} failed because the coordinator" +
" is still in the process of loading state. Will retry.", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("DeleteShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry.",
groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
case GROUP_ID_NOT_FOUND:
case NON_EMPTY_GROUP:
case INVALID_REQUEST:
case UNKNOWN_SERVER_ERROR:
case KAFKA_STORAGE_ERROR:
case GROUP_AUTHORIZATION_FAILED:
log.debug("DeleteShareGroupOffsets request for group id {} failed due to error {}.", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
default:
log.error("DeleteShareGroupOffsets request for group id {} failed due to unexpected error {}.", groupId.idValue, error);
failed.put(groupId, error.exception());
}
}
}
Loading
Loading