Skip to content

Commit d7d17cf

Browse files
[KIP-848 EA] Admin API for listing consumer groups now has an optional filter to return only groups of given types
Co-authored-by: mahajanadhitya <amahajan@confluent.io>
1 parent a356f2a commit d7d17cf

File tree

8 files changed

+164
-28
lines changed

8 files changed

+164
-28
lines changed

CHANGELOG.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# Confluent's Python client for Apache Kafka
22

3+
## v2.6.0
4+
5+
v2.6.0 is a feature release with the following features, fixes and enhancements:
6+
7+
- [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#).
8+
9+
confluent-kafka-python is based on librdkafka v2.6.0, see the
10+
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0)
11+
for a complete list of changes, enhancements, fixes and upgrade considerations.
12+
13+
314
## v2.5.3
415

516
v2.5.3 is a maintenance release with the following fixes and enhancements:
@@ -18,15 +29,15 @@ for a complete list of changes, enhancements, fixes and upgrade considerations.
1829
## v2.5.0
1930

2031
> [!WARNING]
21-
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
32+
This version has introduced a regression in which an assert is triggered during **PushTelemetry** call. This happens when no metric is matched on the client side among those requested by broker subscription.
2233
>
2334
> You won't face any problem if:
2435
> * Broker doesn't support [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability).
2536
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the broker side.
2637
> * [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) feature is disabled on the client side. This is enabled by default. Set configuration `enable.metrics.push` to `false`.
2738
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side and there is no subscription configured there.
2839
> * If [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) is enabled on the broker side with subscriptions that match the [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) metrics defined on the client.
29-
>
40+
>
3041
> Having said this, we strongly recommend using `v2.5.3` and above to not face this regression at all.
3142
3243
v2.5.0 is a feature release with the following features, fixes and enhancements:

examples/adminapi.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
# Example use of AdminClient operations.
1919

2020
from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
21-
TopicPartition, ConsumerGroupState, TopicCollection,
22-
IsolationLevel)
21+
TopicPartition, ConsumerGroupState,
22+
TopicCollection, IsolationLevel,
23+
ConsumerGroupType)
2324
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
2425
ConfigEntry, ConfigSource, AclBinding,
2526
AclBindingFilter, ResourceType, ResourcePatternType,
@@ -471,18 +472,51 @@ def example_list(a, args):
471472
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))
472473

473474

475+
def parse_list_consumer_groups_args(args, states, types):
476+
def usage(message):
477+
raise Exception(f"{message}\nUsage: list_consumer_groups [-states <state1> <state2> ..] "
478+
"[-types <type1> <type2> ..]")
479+
480+
if len(args) > 0:
481+
typeArray = False
482+
stateArray = False
483+
lastArray = 0
484+
for i in range(0, len(args)):
485+
if (args[i] == "-states"):
486+
if (stateArray):
487+
usage("Cannot pass the states flag (-states) more than once")
488+
lastArray = 1
489+
stateArray = True
490+
elif (args[i] == "-types"):
491+
if (typeArray):
492+
usage("Cannot pass the types flag (-types) more than once")
493+
lastArray = 2
494+
typeArray = True
495+
else:
496+
if (lastArray == 1):
497+
states.add(ConsumerGroupState[args[i]])
498+
elif (lastArray == 2):
499+
types.add(ConsumerGroupType[args[i]])
500+
else:
501+
usage(f"Unknown argument: {args[i]}")
502+
503+
474504
def example_list_consumer_groups(a, args):
475505
"""
476506
List Consumer Groups
477507
"""
478-
states = {ConsumerGroupState[state] for state in args}
479-
future = a.list_consumer_groups(request_timeout=10, states=states)
508+
509+
states = set()
510+
types = set()
511+
parse_list_consumer_groups_args(args, states, types)
512+
513+
future = a.list_consumer_groups(request_timeout=10, states=states, types=types)
480514
try:
481515
list_consumer_groups_result = future.result()
482516
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
483517
for valid in list_consumer_groups_result.valid:
484-
print(" id: {} is_simple: {} state: {}".format(
485-
valid.group_id, valid.is_simple_consumer_group, valid.state))
518+
print(" id: {} is_simple: {} state: {} type: {}".format(
519+
valid.group_id, valid.is_simple_consumer_group, valid.state, valid.type))
486520
print("{} errors".format(len(list_consumer_groups_result.errors)))
487521
for error in list_consumer_groups_result.errors:
488522
print(" error: {}".format(error))

src/confluent_kafka/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from ._model import (Node, # noqa: F401
2323
ConsumerGroupTopicPartitions,
2424
ConsumerGroupState,
25+
ConsumerGroupType,
2526
TopicCollection,
2627
TopicPartitionInfo,
2728
IsolationLevel)
@@ -48,7 +49,8 @@
4849
'Producer', 'DeserializingConsumer',
4950
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
5051
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
51-
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
52+
'ConsumerGroupTopicPartitions', 'ConsumerGroupState',
53+
'ConsumerGroupType', 'Uuid',
5254
'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo']
5355

5456
__version__ = version()[0]

src/confluent_kafka/_model/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,23 @@ def __lt__(self, other):
9595
return self.value < other.value
9696

9797

98+
class ConsumerGroupType(Enum):
99+
"""
100+
Enumerates the different types of Consumer Group Type.
101+
"""
102+
#: Type is not known or not set
103+
UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN
104+
#: Consumer Type
105+
CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER
106+
#: Classic Type
107+
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC
108+
109+
def __lt__(self, other):
110+
if self.__class__ != other.__class__:
111+
return NotImplemented
112+
return self.value < other.value
113+
114+
98115
class TopicCollection:
99116
"""
100117
Represents collection of topics in the form of different identifiers

src/confluent_kafka/admin/__init__.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656

5757
from ._records import DeletedRecords # noqa: F401
5858

59-
from .._model import TopicCollection as _TopicCollection
59+
from .._model import TopicCollection as _TopicCollection, ConsumerGroupType as _ConsumerGroupType
6060

6161
from ..cimpl import (KafkaException, # noqa: F401
6262
KafkaError,
@@ -881,6 +881,8 @@ def list_consumer_groups(self, **kwargs):
881881
on broker, and response. Default: `socket.timeout.ms/1000.0`
882882
:param set(ConsumerGroupState) states: only list consumer groups which are currently in
883883
these states.
884+
:param set(ConsumerGroupType) types: only list consumer groups of
885+
these types.
884886
885887
:returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`.
886888
@@ -900,6 +902,16 @@ def list_consumer_groups(self, **kwargs):
900902
raise TypeError("All elements of states must be of type ConsumerGroupState")
901903
kwargs["states_int"] = [state.value for state in states]
902904
kwargs.pop("states")
905+
if "types" in kwargs:
906+
types = kwargs["types"]
907+
if types is not None:
908+
if not isinstance(types, set):
909+
raise TypeError("'types' must be a set")
910+
for type in types:
911+
if not isinstance(type, _ConsumerGroupType):
912+
raise TypeError("All elements of types must be of type ConsumerGroupType")
913+
kwargs["types_int"] = [type.value for type in types]
914+
kwargs.pop("types")
903915

904916
f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result)
905917

src/confluent_kafka/admin/_group.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
from .._util import ConversionUtil
17-
from .._model import ConsumerGroupState
17+
from .._model import ConsumerGroupState, ConsumerGroupType
1818
from ._acl import AclOperation
1919

2020

@@ -31,13 +31,17 @@ class ConsumerGroupListing:
3131
Whether a consumer group is simple or not.
3232
state : ConsumerGroupState
3333
Current state of the consumer group.
34+
type : ConsumerGroupType
35+
Type of the consumer group.
3436
"""
3537

36-
def __init__(self, group_id, is_simple_consumer_group, state=None):
38+
def __init__(self, group_id, is_simple_consumer_group, state=None, type=None):
3739
self.group_id = group_id
3840
self.is_simple_consumer_group = is_simple_consumer_group
3941
if state is not None:
4042
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
43+
if type is not None:
44+
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)
4145

4246

4347
class ListConsumerGroupsResult:

0 commit comments

Comments
 (0)