Skip to content

Commit 16a0b31

Browse files
authored
Feature: delete consumergroups (dpkp#2040)
* Add consumergroup related errors * Add DeleteGroups to protocol.admin * Implement delete_groups feature on KafkaAdminClient
1 parent e485a6e commit 16a0b31

File tree

4 files changed

+219
-5
lines changed

4 files changed

+219
-5
lines changed

kafka/admin/client.py

+89-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
from kafka.metrics import MetricConfig, Metrics
2020
from kafka.protocol.admin import (
2121
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
22-
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
22+
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
23+
DeleteGroupsRequest
24+
)
2325
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2426
from kafka.protocol.metadata import MetadataRequest
2527
from kafka.protocol.types import Array
@@ -337,12 +339,34 @@ def _find_coordinator_id(self, group_id):
337339
name as a string.
338340
:return: The node_id of the broker that is the coordinator.
339341
"""
340-
# Note: Java may change how this is implemented in KAFKA-6791.
341342
future = self._find_coordinator_id_send_request(group_id)
342343
self._wait_for_futures([future])
343344
response = future.value
344345
return self._find_coordinator_id_process_response(response)
345346

347+
def _find_many_coordinator_ids(self, group_ids):
348+
"""Find the broker node_id of the coordinator for each of the given groups.
349+
350+
Sends a FindCoordinatorRequest message to the cluster for each group_id.
351+
Will block until the FindCoordinatorResponse is received for all groups.
352+
Any errors are immediately raised.
353+
354+
:param group_ids: A list of consumer group IDs. This is typically the group
355+
name as a string.
356+
:return: A list of tuples (group_id, node_id) where node_id is the id
357+
of the broker that is the coordinator for the corresponding group.
358+
"""
359+
futures = {
360+
group_id: self._find_coordinator_id_send_request(group_id)
361+
for group_id in group_ids
362+
}
363+
self._wait_for_futures(list(futures.values()))
364+
groups_coordinators = [
365+
(group_id, self._find_coordinator_id_process_response(f.value))
366+
for group_id, f in futures.items()
367+
]
368+
return groups_coordinators
369+
346370
def _send_request_to_node(self, node_id, request):
347371
"""Send a Kafka protocol message to a specific broker.
348372
@@ -1261,8 +1285,69 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
12611285
response = future.value
12621286
return self._list_consumer_group_offsets_process_response(response)
12631287

1264-
# delete groups protocol not yet implemented
1265-
# Note: send the request to the group's coordinator.
1288+
def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
1289+
"""Delete Consumer Group Offsets for given consumer groups.
1290+
1291+
Note:
1292+
This does not verify that the group ids actually exist and
1293+
group_coordinator_id is the correct coordinator for all these groups.
1294+
1295+
The result needs checking for potential errors.
1296+
1297+
:param group_ids: The consumer group ids of the groups which are to be deleted.
1298+
:param group_coordinator_id: The node_id of the broker which is the coordinator for
1299+
all the groups. Use only if all groups are coordinated by the same broker.
1300+
If set to None, will query the cluster to find the coordinator for every single group.
1301+
Explicitly specifying this can be useful to prevent
1302+
that extra network round trips if you already know the group
1303+
coordinator. Default: None.
1304+
:return: A list of tuples (group_id, KafkaError)
1305+
"""
1306+
if group_coordinator_id is not None:
1307+
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
1308+
else:
1309+
groups_coordinators = defaultdict(list)
1310+
for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
1311+
groups_coordinators[group_coordinator_id].append(group_id)
1312+
futures = [
1313+
self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
1314+
for group_coordinator_id, group_ids in groups_coordinators.items()
1315+
]
1316+
1317+
self._wait_for_futures(futures)
1318+
1319+
results = []
1320+
for f in futures:
1321+
results.extend(self._convert_delete_groups_response(f.value))
1322+
return results
1323+
1324+
def _convert_delete_groups_response(self, response):
1325+
if response.API_VERSION <= 1:
1326+
results = []
1327+
for group_id, error_code in response.results:
1328+
results.append((group_id, Errors.for_code(error_code)))
1329+
return results
1330+
else:
1331+
raise NotImplementedError(
1332+
"Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient."
1333+
.format(response.API_VERSION))
1334+
1335+
def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
1336+
"""Send a DeleteGroups request to a broker.
1337+
1338+
:param group_ids: The consumer group ids of the groups which are to be deleted.
1339+
:param group_coordinator_id: The node_id of the broker which is the coordinator for
1340+
all the groups.
1341+
:return: A message future
1342+
"""
1343+
version = self._matching_api_version(DeleteGroupsRequest)
1344+
if version <= 1:
1345+
request = DeleteGroupsRequest[version](group_ids)
1346+
else:
1347+
raise NotImplementedError(
1348+
"Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient."
1349+
.format(version))
1350+
return self._send_request_to_node(group_coordinator_id, request)
12661351

12671352
def _wait_for_futures(self, futures):
12681353
while not all(future.succeeded() for future in futures):

kafka/errors.py

+12
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,18 @@ class SecurityDisabledError(BrokerResponseError):
449449
description = 'Security features are disabled.'
450450

451451

452+
class NonEmptyGroupError(BrokerResponseError):
453+
errno = 68
454+
message = 'NON_EMPTY_GROUP'
455+
description = 'The group is not empty.'
456+
457+
458+
class GroupIdNotFoundError(BrokerResponseError):
459+
errno = 69
460+
message = 'GROUP_ID_NOT_FOUND'
461+
description = 'The group id does not exist.'
462+
463+
452464
class KafkaUnavailableError(KafkaError):
453465
pass
454466

kafka/protocol/admin.py

+41
Original file line numberDiff line numberDiff line change
@@ -882,3 +882,44 @@ class CreatePartitionsRequest_v1(Request):
882882
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
883883
]
884884

885+
886+
class DeleteGroupsResponse_v0(Response):
887+
API_KEY = 42
888+
API_VERSION = 0
889+
SCHEMA = Schema(
890+
("throttle_time_ms", Int32),
891+
("results", Array(
892+
("group_id", String("utf-8")),
893+
("error_code", Int16)))
894+
)
895+
896+
897+
class DeleteGroupsResponse_v1(Response):
898+
API_KEY = 42
899+
API_VERSION = 1
900+
SCHEMA = DeleteGroupsResponse_v0.SCHEMA
901+
902+
903+
class DeleteGroupsRequest_v0(Request):
904+
API_KEY = 42
905+
API_VERSION = 0
906+
RESPONSE_TYPE = DeleteGroupsResponse_v0
907+
SCHEMA = Schema(
908+
("groups_names", Array(String("utf-8")))
909+
)
910+
911+
912+
class DeleteGroupsRequest_v1(Request):
913+
API_KEY = 42
914+
API_VERSION = 1
915+
RESPONSE_TYPE = DeleteGroupsResponse_v1
916+
SCHEMA = DeleteGroupsRequest_v0.SCHEMA
917+
918+
919+
DeleteGroupsRequest = [
920+
DeleteGroupsRequest_v0, DeleteGroupsRequest_v1
921+
]
922+
923+
DeleteGroupsResponse = [
924+
DeleteGroupsResponse_v0, DeleteGroupsResponse_v1
925+
]

test/test_admin_integration.py

+77-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from kafka.admin import (
99
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
10-
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
10+
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError)
1111

1212

1313
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -142,13 +142,15 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
142142
with pytest.raises(ValueError):
143143
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
144144

145+
145146
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
146147
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
147148
"""Tests that the describe consumer group call fails if the group coordinator is not available
148149
"""
149150
with pytest.raises(GroupCoordinatorNotAvailableError):
150151
group_description = kafka_admin_client.describe_consumer_groups(['test'])
151152

153+
152154
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
153155
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
154156
"""Tests that the describe consumer group call returns valid consumer group information
@@ -236,3 +238,77 @@ def consumer_thread(i, group_id):
236238
stop[c].set()
237239
threads[c].join()
238240
threads[c] = None
241+
242+
243+
@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
244+
def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages):
245+
random_group_id = 'test-group-' + random_string(6)
246+
group1 = random_group_id + "_1"
247+
group2 = random_group_id + "_2"
248+
group3 = random_group_id + "_3"
249+
250+
send_messages(range(0, 100), partition=0)
251+
consumer1 = kafka_consumer_factory(group_id=group1)
252+
next(consumer1)
253+
consumer1.close()
254+
255+
consumer2 = kafka_consumer_factory(group_id=group2)
256+
next(consumer2)
257+
consumer2.close()
258+
259+
consumer3 = kafka_consumer_factory(group_id=group3)
260+
next(consumer3)
261+
consumer3.close()
262+
263+
consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
264+
assert group1 in consumergroups
265+
assert group2 in consumergroups
266+
assert group3 in consumergroups
267+
268+
delete_results = {
269+
group_id: error
270+
for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2])
271+
}
272+
assert delete_results[group1] == NoError
273+
assert delete_results[group2] == NoError
274+
assert group3 not in delete_results
275+
276+
consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
277+
assert group1 not in consumergroups
278+
assert group2 not in consumergroups
279+
assert group3 in consumergroups
280+
281+
282+
@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
283+
def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages):
284+
random_group_id = 'test-group-' + random_string(6)
285+
group1 = random_group_id + "_1"
286+
group2 = random_group_id + "_2"
287+
group3 = random_group_id + "_3"
288+
289+
send_messages(range(0, 100), partition=0)
290+
consumer1 = kafka_consumer_factory(group_id=group1)
291+
next(consumer1)
292+
consumer1.close()
293+
294+
consumer2 = kafka_consumer_factory(group_id=group2)
295+
next(consumer2)
296+
297+
consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
298+
assert group1 in consumergroups
299+
assert group2 in consumergroups
300+
assert group3 not in consumergroups
301+
302+
delete_results = {
303+
group_id: error
304+
for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3])
305+
}
306+
307+
assert delete_results[group1] == NoError
308+
assert delete_results[group2] == NonEmptyGroupError
309+
assert delete_results[group3] == GroupIdNotFoundError
310+
311+
consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
312+
assert group1 not in consumergroups
313+
assert group2 in consumergroups
314+
assert group3 not in consumergroups

0 commit comments

Comments
 (0)