Skip to content

Commit 645b0d5

Browse files
[KIP-848 EA] Admin API for listing consumer groups now has (#1830)
an optional filter to return only groups of given types Co-authored-by: mahajanadhitya <amahajan@confluent.io>
1 parent 979343a commit 645b0d5

File tree

8 files changed

+154
-16
lines changed

8 files changed

+154
-16
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# Confluent's Python client for Apache Kafka
22

3+
## v2.6.0
4+
5+
v2.6.0 is a feature release with the following features, fixes and enhancements:
6+
7+
- [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#1830).
8+
9+
confluent-kafka-python is based on librdkafka v2.6.0, see the
10+
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0)
11+
for a complete list of changes, enhancements, fixes and upgrade considerations.
12+
13+
314
## v2.5.3
415

516
v2.5.3 is a maintenance release with the following fixes and enhancements:

examples/adminapi.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
# Example use of AdminClient operations.
1919

2020
from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
21-
TopicPartition, ConsumerGroupState, TopicCollection,
22-
IsolationLevel, ElectionType)
21+
TopicPartition, ConsumerGroupState,
22+
TopicCollection, IsolationLevel,
23+
ConsumerGroupType, ElectionType)
2324
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
2425
ConfigEntry, ConfigSource, AclBinding,
2526
AclBindingFilter, ResourceType, ResourcePatternType,
@@ -30,6 +31,7 @@
3031
import sys
3132
import threading
3233
import logging
34+
import argparse
3335

3436
logging.basicConfig()
3537

@@ -471,18 +473,47 @@ def example_list(a, args):
471473
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))
472474

473475

476+
def parse_list_consumer_groups_args(args, states, types):
477+
parser = argparse.ArgumentParser(prog='list_consumer_groups')
478+
parser.add_argument('-states')
479+
parser.add_argument('-types')
480+
parsed_args = parser.parse_args(args)
481+
482+
def usage(message):
483+
print(message)
484+
parser.print_usage()
485+
sys.exit(1)
486+
487+
if parsed_args.states:
488+
for arg in parsed_args.states.split(","):
489+
try:
490+
states.add(ConsumerGroupState[arg])
491+
except KeyError:
492+
usage(f"Invalid state: {arg}")
493+
if parsed_args.types:
494+
for arg in parsed_args.types.split(","):
495+
try:
496+
types.add(ConsumerGroupType[arg])
497+
except KeyError:
498+
usage(f"Invalid type: {arg}")
499+
500+
474501
def example_list_consumer_groups(a, args):
475502
"""
476503
List Consumer Groups
477504
"""
478-
states = {ConsumerGroupState[state] for state in args}
479-
future = a.list_consumer_groups(request_timeout=10, states=states)
505+
506+
states = set()
507+
types = set()
508+
parse_list_consumer_groups_args(args, states, types)
509+
510+
future = a.list_consumer_groups(request_timeout=10, states=states, types=types)
480511
try:
481512
list_consumer_groups_result = future.result()
482513
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
483514
for valid in list_consumer_groups_result.valid:
484-
print(" id: {} is_simple: {} state: {}".format(
485-
valid.group_id, valid.is_simple_consumer_group, valid.state))
515+
print(" id: {} is_simple: {} state: {} type: {}".format(
516+
valid.group_id, valid.is_simple_consumer_group, valid.state, valid.type))
486517
print("{} errors".format(len(list_consumer_groups_result.errors)))
487518
for error in list_consumer_groups_result.errors:
488519
print(" error: {}".format(error))
@@ -937,7 +968,8 @@ def example_elect_leaders(a, args):
937968
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
938969
'<principal1> <host1> <operation1> <permission_type1> ..\n')
939970
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
940-
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
971+
sys.stderr.write(' list_consumer_groups [-states <state1>,<state2>,..] ' +
972+
'[-types <type1>,<type2>,..]\n')
941973
sys.stderr.write(' describe_consumer_groups <include_authorized_operations> <group1> <group2> ..\n')
942974
sys.stderr.write(' describe_topics <include_authorized_operations> <topic1> <topic2> ..\n')
943975
sys.stderr.write(' describe_cluster <include_authorized_operations>\n')

src/confluent_kafka/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from ._model import (Node, # noqa: F401
2323
ConsumerGroupTopicPartitions,
2424
ConsumerGroupState,
25+
ConsumerGroupType,
2526
TopicCollection,
2627
TopicPartitionInfo,
2728
IsolationLevel,
@@ -49,7 +50,8 @@
4950
'Producer', 'DeserializingConsumer',
5051
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
5152
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
52-
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid',
53+
'ConsumerGroupTopicPartitions', 'ConsumerGroupState',
54+
'ConsumerGroupType', 'Uuid',
5355
'IsolationLevel', 'TopicCollection', 'TopicPartitionInfo']
5456

5557
__version__ = version()[0]

src/confluent_kafka/_model/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,26 @@ def __lt__(self, other):
9595
return self.value < other.value
9696

9797

98+
class ConsumerGroupType(Enum):
99+
"""
100+
Enumerates the different types of Consumer Group Type.
101+
102+
Values:
103+
-------
104+
"""
105+
#: Type is not known or not set
106+
UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN
107+
#: Consumer Type
108+
CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER
109+
#: Classic Type
110+
CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC
111+
112+
def __lt__(self, other):
113+
if self.__class__ != other.__class__:
114+
return NotImplemented
115+
return self.value < other.value
116+
117+
98118
class TopicCollection:
99119
"""
100120
Represents collection of topics in the form of different identifiers

src/confluent_kafka/admin/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from ._records import DeletedRecords # noqa: F401
5858

5959
from .._model import (TopicCollection as _TopicCollection,
60+
ConsumerGroupType as _ConsumerGroupType,
6061
ElectionType as _ElectionType)
6162

6263
from ..cimpl import (KafkaException, # noqa: F401
@@ -898,6 +899,8 @@ def list_consumer_groups(self, **kwargs):
898899
on broker, and response. Default: `socket.timeout.ms/1000.0`
899900
:param set(ConsumerGroupState) states: only list consumer groups which are currently in
900901
these states.
902+
:param set(ConsumerGroupType) types: only list consumer groups of
903+
these types.
901904
902905
:returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`.
903906
@@ -917,6 +920,16 @@ def list_consumer_groups(self, **kwargs):
917920
raise TypeError("All elements of states must be of type ConsumerGroupState")
918921
kwargs["states_int"] = [state.value for state in states]
919922
kwargs.pop("states")
923+
if "types" in kwargs:
924+
types = kwargs["types"]
925+
if types is not None:
926+
if not isinstance(types, set):
927+
raise TypeError("'types' must be a set")
928+
for type in types:
929+
if not isinstance(type, _ConsumerGroupType):
930+
raise TypeError("All elements of types must be of type ConsumerGroupType")
931+
kwargs["types_int"] = [type.value for type in types]
932+
kwargs.pop("types")
920933

921934
f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result)
922935

src/confluent_kafka/admin/_group.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
from .._util import ConversionUtil
17-
from .._model import ConsumerGroupState
17+
from .._model import ConsumerGroupState, ConsumerGroupType
1818
from ._acl import AclOperation
1919

2020

@@ -31,13 +31,17 @@ class ConsumerGroupListing:
3131
Whether a consumer group is simple or not.
3232
state : ConsumerGroupState
3333
Current state of the consumer group.
34+
type : ConsumerGroupType
35+
Type of the consumer group.
3436
"""
3537

36-
def __init__(self, group_id, is_simple_consumer_group, state=None):
38+
def __init__(self, group_id, is_simple_consumer_group, state=None, type=None):
3739
self.group_id = group_id
3840
self.is_simple_consumer_group = is_simple_consumer_group
3941
if state is not None:
4042
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
43+
if type is not None:
44+
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)
4145

4246

4347
class ListConsumerGroupsResult:

src/confluent_kafka/src/Admin.c

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ struct Admin_options {
8282
rd_kafka_IsolationLevel_t isolation_level;
8383
rd_kafka_consumer_group_state_t* states;
8484
int states_cnt;
85+
rd_kafka_consumer_group_type_t* types;
86+
int types_cnt;
8587
};
8688

8789
/**@brief "unset" value initializers for Admin_options
@@ -96,6 +98,8 @@ struct Admin_options {
9698
Admin_options_def_int, \
9799
Admin_options_def_ptr, \
98100
Admin_options_def_cnt, \
101+
Admin_options_def_ptr, \
102+
Admin_options_def_cnt, \
99103
}
100104

101105
#define Admin_options_is_set_int(v) ((v) != Admin_options_def_int)
@@ -185,6 +189,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
185189
goto err;
186190
}
187191

192+
if (Admin_options_is_set_ptr(options->types) &&
193+
(err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types(
194+
c_options, options->types, options->types_cnt))) {
195+
snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj));
196+
goto err;
197+
}
198+
188199
return c_options;
189200

190201
err:
@@ -1698,24 +1709,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR(
16981709
* @brief List consumer groups
16991710
*/
17001711
PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) {
1701-
PyObject *future, *states_int = NULL;
1712+
PyObject *future, *states_int = NULL, *types_int = NULL;
17021713
struct Admin_options options = Admin_options_INITIALIZER;
17031714
rd_kafka_AdminOptions_t *c_options = NULL;
17041715
CallState cs;
17051716
rd_kafka_queue_t *rkqu;
17061717
rd_kafka_consumer_group_state_t *c_states = NULL;
1718+
rd_kafka_consumer_group_type_t *c_types = NULL;
17071719
int states_cnt = 0;
1720+
int types_cnt = 0;
17081721
int i = 0;
17091722

17101723
static char *kws[] = {"future",
17111724
/* options */
17121725
"states_int",
1726+
"types_int",
17131727
"request_timeout",
17141728
NULL};
17151729

1716-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws,
1730+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws,
17171731
&future,
17181732
&states_int,
1733+
&types_int,
17191734
&options.request_timeout)) {
17201735
goto err;
17211736
}
@@ -1736,7 +1751,7 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17361751
PyObject *state = PyList_GET_ITEM(states_int, i);
17371752
if(!cfl_PyInt_Check(state)) {
17381753
PyErr_SetString(PyExc_ValueError,
1739-
"Element of states must be a valid state");
1754+
"Element of states must be valid states");
17401755
goto err;
17411756
}
17421757
c_states[i] = (rd_kafka_consumer_group_state_t) cfl_PyInt_AsInt(state);
@@ -1746,6 +1761,33 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17461761
}
17471762
}
17481763

1764+
if(types_int != NULL && types_int != Py_None) {
1765+
if(!PyList_Check(types_int)) {
1766+
PyErr_SetString(PyExc_ValueError,
1767+
"types must of type list");
1768+
goto err;
1769+
}
1770+
1771+
types_cnt = (int)PyList_Size(types_int);
1772+
1773+
if(types_cnt > 0) {
1774+
c_types = (rd_kafka_consumer_group_type_t *)
1775+
malloc(types_cnt *
1776+
sizeof(rd_kafka_consumer_group_type_t));
1777+
for(i = 0 ; i < types_cnt ; i++) {
1778+
PyObject *type = PyList_GET_ITEM(types_int, i);
1779+
if(!cfl_PyInt_Check(type)) {
1780+
PyErr_SetString(PyExc_ValueError,
1781+
"Element of types must be valid group types");
1782+
goto err;
1783+
}
1784+
c_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(type);
1785+
}
1786+
options.types = c_types;
1787+
options.types_cnt = types_cnt;
1788+
}
1789+
}
1790+
17491791
c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS,
17501792
&options, future);
17511793
if (!c_options) {
@@ -1774,22 +1816,27 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw
17741816
if(c_states) {
17751817
free(c_states);
17761818
}
1819+
if(c_types) {
1820+
free(c_types);
1821+
}
17771822
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
17781823
rd_kafka_AdminOptions_destroy(c_options);
1779-
17801824
Py_RETURN_NONE;
17811825
err:
17821826
if(c_states) {
17831827
free(c_states);
17841828
}
1829+
if(c_types) {
1830+
free(c_types);
1831+
}
17851832
if (c_options) {
17861833
rd_kafka_AdminOptions_destroy(c_options);
17871834
Py_DECREF(future);
17881835
}
17891836
return NULL;
17901837
}
17911838
const char Admin_list_consumer_groups_doc[] = PyDoc_STR(
1792-
".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n"
1839+
".. py:function:: list_consumer_groups(future, [states_int], [types_int], [request_timeout])\n"
17931840
"\n"
17941841
" List all the consumer groups.\n"
17951842
"\n"
@@ -3711,6 +3758,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py(
37113758

37123759
cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i]));
37133760

3761+
cfl_PyDict_SetInt(kwargs, "type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i]));
3762+
37143763
args = PyTuple_New(0);
37153764

37163765
valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs);

src/confluent_kafka/src/AdminTypes.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,8 +570,14 @@ static void AdminTypes_AddObjectsConsumerGroupStates (PyObject *m) {
570570
PyModule_AddIntConstant(m, "CONSUMER_GROUP_STATE_EMPTY", RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY);
571571
}
572572

573+
static void AdminTypes_AddObjectsConsumerGroupTypes (PyObject *m) {
574+
/* rd_kafka_consumer_group_type_t */
575+
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_UNKNOWN", RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN);
576+
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CONSUMER", RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER);
577+
PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CLASSIC", RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC);
578+
}
579+
573580
static void AdminTypes_AddObjectsAlterConfigOpType (PyObject *m) {
574-
/* rd_kafka_consumer_group_state_t */
575581
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_SET", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET);
576582
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_DELETE", RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE);
577583
PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_APPEND", RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND);
@@ -620,6 +626,7 @@ void AdminTypes_AddObjects (PyObject *m) {
620626
AdminTypes_AddObjectsAclOperation(m);
621627
AdminTypes_AddObjectsAclPermissionType(m);
622628
AdminTypes_AddObjectsConsumerGroupStates(m);
629+
AdminTypes_AddObjectsConsumerGroupTypes(m);
623630
AdminTypes_AddObjectsAlterConfigOpType(m);
624631
AdminTypes_AddObjectsScramMechanismType(m);
625632
AdminTypes_AddObjectsIsolationLevel(m);

0 commit comments

Comments
 (0)