-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhancement for Kafka Admin Client's "Describe Consumer Group" #2035
Changes from 3 commits
27fad5f
74379e4
5e13d38
c890b8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,13 @@ | ||
import pytest | ||
|
||
from test.testutil import env_kafka_version | ||
from logging import info | ||
from test.testutil import env_kafka_version, random_string | ||
from threading import Event, Thread | ||
from time import time, sleep | ||
|
||
from kafka.errors import NoError | ||
from kafka.admin import ( | ||
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) | ||
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError) | ||
|
||
|
||
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") | ||
|
@@ -138,3 +141,98 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): | |
|
||
with pytest.raises(ValueError): | ||
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) | ||
|
||
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') | ||
def test_describe_consumer_group_does_not_exist(kafka_admin_client): | ||
"""Tests that the describe consumer group call fails if the group coordinator is not available | ||
""" | ||
with pytest.raises(GroupCoordinatorNotAvailableError): | ||
group_description = kafka_admin_client.describe_consumer_groups(['test']) | ||
|
||
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') | ||
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): | ||
"""Tests that the describe consumer group call returns valid consumer group information | ||
This test takes inspiration from the test 'test_group' in test_consumer_group.py. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Long term we should probably move this to a dedicated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree! Let me see if I can add this in a subsequent PR. |
||
""" | ||
consumers = {} | ||
stop = {} | ||
threads = {} | ||
random_group_id = 'test-group-' + random_string(6) | ||
group_id_list = [random_group_id, random_group_id + '_2'] | ||
generations = {group_id_list[0]: set(), group_id_list[1]: set()} | ||
def consumer_thread(i, group_id): | ||
assert i not in consumers | ||
assert i not in stop | ||
stop[i] = Event() | ||
consumers[i] = kafka_consumer_factory(group_id=group_id) | ||
while not stop[i].is_set(): | ||
consumers[i].poll(20) | ||
consumers[i].close() | ||
consumers[i] = None | ||
stop[i] = None | ||
|
||
num_consumers = 3 | ||
for i in range(num_consumers): | ||
group_id = group_id_list[i % 2] | ||
t = Thread(target=consumer_thread, args=(i, group_id,)) | ||
t.start() | ||
threads[i] = t | ||
|
||
try: | ||
timeout = time() + 35 | ||
while True: | ||
for c in range(num_consumers): | ||
|
||
# Verify all consumers have been created | ||
if c not in consumers: | ||
break | ||
|
||
# Verify all consumers have an assignment | ||
elif not consumers[c].assignment(): | ||
break | ||
|
||
# If all consumers exist and have an assignment | ||
else: | ||
|
||
info('All consumers have assignment... checking for stable group') | ||
# Verify all consumers are in the same generation | ||
# then log state and break while loop | ||
|
||
for consumer in consumers.values(): | ||
generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id) | ||
|
||
is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()]) | ||
|
||
# New generation assignment is not complete until | ||
# coordinator.rejoining = False | ||
rejoining = any([consumer._coordinator.rejoining | ||
for consumer in list(consumers.values())]) | ||
|
||
if not rejoining and is_same_generation: | ||
break | ||
else: | ||
sleep(1) | ||
assert time() < timeout, "timeout waiting for assignments" | ||
|
||
info('Group stabilized; verifying assignment') | ||
output = kafka_admin_client.describe_consumer_groups(group_id_list) | ||
assert len(output) == 2 | ||
consumer_groups = set() | ||
for consumer_group in output: | ||
assert(consumer_group.group in group_id_list) | ||
if consumer_group.group == group_id_list[0]: | ||
assert(len(consumer_group.members) == 2) | ||
else: | ||
assert(len(consumer_group.members) == 1) | ||
for member in consumer_group.members: | ||
assert(member.member_metadata.subscription[0] == topic) | ||
assert(member.member_assignment.assignment[0][0] == topic) | ||
consumer_groups.add(consumer_group.group) | ||
assert(sorted(list(consumer_groups)) == group_id_list) | ||
finally: | ||
info('Shutting down %s consumers', num_consumers) | ||
for c in range(num_consumers): | ||
info('Stopping consumer %s', c) | ||
stop[c].set() | ||
threads[c].join() | ||
threads[c] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one shouldn't be moved since it's merely a vendored import of a standard 3p lib... (and thankfully one that will disappear in not too distant future!)
I'll try to add a commit to this PR fixing this up... otherwise will merge and re-fix after