Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions tests/integration/admin/test_describe_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType,
ResourcePatternType, AclOperation, AclPermissionType)
from confluent_kafka.error import ConsumeError
from confluent_kafka import ConsumerGroupState, TopicCollection

from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType
from tests.common import TestUtils

topic_prefix = "test-topic"
Expand All @@ -30,12 +29,16 @@ def verify_commit_result(err, _):
assert err is not None


def consume_messages(sasl_cluster, group_id, topic, num_messages=None):
def consume_messages(sasl_cluster, group_id, group_protocol, topic, num_messages=None):
conf = {'group.id': group_id,
'session.timeout.ms': 6000,
'group.protocol': group_protocol,
'enable.auto.commit': False,
'on_commit': verify_commit_result,
'auto.offset.reset': 'earliest'}

if group_protocol == 'classic':
conf['session.timeout.ms'] = 6000

consumer = sasl_cluster.consumer(conf)
consumer.subscribe([topic])
read_messages = 0
Expand Down Expand Up @@ -164,7 +167,9 @@ def verify_describe_groups(cluster, admin_client, topic):

# Consume some messages for the group
group = 'test-group'
consume_messages(cluster, group, topic, 2)
group_type = ConsumerGroupType.CONSUMER if TestUtils.use_group_protocol_consumer() else ConsumerGroupType.CLASSIC
group_type_str = 'classic' if group_type == ConsumerGroupType.CLASSIC else 'consumer'
consume_messages(cluster, group, group_type_str, topic, 2)

# Verify Describe Consumer Groups
desc = verify_provided_describe_for_authorized_operations(admin_client,
Expand All @@ -177,6 +182,7 @@ def verify_describe_groups(cluster, admin_client, topic):
assert group == desc.group_id
assert desc.is_simple_consumer_group is False
assert desc.state == ConsumerGroupState.EMPTY
assert desc.type == group_type

# Delete group
perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10)
Expand Down Expand Up @@ -217,11 +223,7 @@ def test_describe_operations(sasl_cluster):
verify_describe_topics(admin_client, our_topic)

# Verify Authorized Operations in Describe Groups
# Skip this test if using group protocol `consumer`
# as there is new RPC for describe_groups() in
# group protocol `consumer` case.
if not TestUtils.use_group_protocol_consumer():
verify_describe_groups(sasl_cluster, admin_client, our_topic)
verify_describe_groups(sasl_cluster, admin_client, our_topic)

# Delete Topic
perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)