Skip to content

Commit 7768191

Browse files
authored
Add AdminClient.list_groups API (@messense, #948)
* Add AdminClient.list_groups API * Require fastavro<1.0 on Python 2.7 fastavro 1.0 dropped Python 2 support * Reuse c_broker_to_py * Address review comments * Refactor adminapi list groups example * Add list_groups intergration test * Print protocol and protocol type in list_groups example
1 parent e8bdc65 commit 7768191

File tree

8 files changed

+398
-34
lines changed

8 files changed

+398
-34
lines changed

confluent_kafka/admin/__init__.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,3 +573,57 @@ def __repr__(self):
573573

574574
def __str__(self):
575575
return "{}".format(self.id)
576+
577+
578+
class GroupMember(object):
579+
"""Group member information
580+
581+
For more information on metadata format, see
582+
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI
583+
584+
This class is typically not user instantiated.
585+
586+
:ivar str id: Member id (generated by broker)
587+
:ivar str client_id: Client id
588+
:ivar str client_host: Client hostname
589+
:ivar bytes metadata: Member metadata(binary), format depends on protocol type
590+
:ivar bytes assignment: Member assignment(binary), format depends on protocol type
591+
"""
592+
def __init__(self,):
593+
self.id = None
594+
self.client_id = None
595+
self.client_host = None
596+
self.metadata = None
597+
self.assignment = None
598+
599+
600+
class GroupMetadata(object):
601+
"""GroupMetadata contains information about a Kafka consumer group
602+
603+
This class is typically not user instantiated.
604+
605+
:ivar BrokerMetadata broker: Originating broker metadata
606+
:ivar str id: Group name
607+
:ivar KafkaError -error: Broker-originated error, or None. Value is a KafkaError object.
608+
:ivar str state: Group state
609+
:ivar str protocol_type: Group protocol type
610+
:ivar str protocol: Group protocol
611+
:ivar list(GroupMember) members: Group members
612+
"""
613+
def __init__(self):
614+
self.broker = None
615+
self.id = None
616+
self.error = None
617+
self.state = None
618+
self.protocol_type = None
619+
self.protocol = None
620+
self.members = []
621+
622+
def __repr__(self):
623+
if self.error is not None:
624+
return "GroupMetadata({}, {})".format(self.id, self.error)
625+
else:
626+
return "GroupMetadata({})".format(self.id)
627+
628+
def __str__(self):
629+
return self.id

confluent_kafka/src/Admin.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,10 @@ static PyMethodDef Admin_methods[] = {
10741074
list_topics_doc
10751075
},
10761076

1077+
{ "list_groups", (PyCFunction)list_groups, METH_VARARGS|METH_KEYWORDS,
1078+
list_groups_doc
1079+
},
1080+
10771081
{ NULL }
10781082
};
10791083

confluent_kafka/src/Metadata.c

Lines changed: 246 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,38 @@ c_topics_to_py (Handle *self, const rd_kafka_metadata_topic_t *c_topics,
190190
}
191191

192192

193+
static PyObject *c_broker_to_py(Handle *self, PyObject *BrokerMetadata_type,
194+
const rd_kafka_metadata_broker_t c_broker) {
195+
PyObject *broker;
196+
PyObject *key;
197+
198+
broker = PyObject_CallObject(BrokerMetadata_type, NULL);
199+
if (!broker)
200+
return NULL;
201+
202+
key = cfl_PyInt_FromInt(c_broker.id);
203+
204+
if (PyObject_SetAttrString(broker, "id", key) == -1) {
205+
Py_DECREF(key);
206+
Py_DECREF(broker);
207+
return NULL;
208+
}
209+
Py_DECREF(key);
210+
211+
if (cfl_PyObject_SetString(broker, "host",
212+
c_broker.host) == -1) {
213+
Py_DECREF(broker);
214+
return NULL;
215+
}
216+
if (cfl_PyObject_SetInt(broker, "port",
217+
(int)c_broker.port) == -1) {
218+
Py_DECREF(broker);
219+
return NULL;
220+
}
221+
return broker;
222+
}
223+
224+
193225
/**
194226
* @returns a dict<broker_id, BrokerMetadata>, or NULL (and exception) on error.
195227
*/
@@ -213,7 +245,7 @@ static PyObject *c_brokers_to_py (Handle *self,
213245
PyObject *broker;
214246
PyObject *key;
215247

216-
broker = PyObject_CallObject(BrokerMetadata_type, NULL);
248+
broker = c_broker_to_py(self, BrokerMetadata_type, c_brokers[i]);
217249
if (!broker)
218250
goto err;
219251

@@ -226,19 +258,6 @@ static PyObject *c_brokers_to_py (Handle *self,
226258
}
227259

228260
Py_DECREF(broker);
229-
230-
if (PyObject_SetAttrString(broker, "id", key) == -1) {
231-
Py_DECREF(key);
232-
goto err;
233-
}
234-
Py_DECREF(key);
235-
236-
if (cfl_PyObject_SetString(broker, "host",
237-
c_brokers[i].host) == -1)
238-
goto err;
239-
if (cfl_PyObject_SetInt(broker, "port",
240-
(int)c_brokers[i].port) == -1)
241-
goto err;
242261
}
243262

244263
Py_DECREF(BrokerMetadata_type);
@@ -403,5 +422,216 @@ const char list_topics_doc[] = PyDoc_STR(
403422
"\n"
404423
" :param str topic: If specified, only request info about this topic, else return for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified it will be created.\n"
405424
" :param float timeout: Maximum response time before timing out, or -1 for infinite timeout.\n"
406-
" :rtype: ClusterMetadata \n"
407-
" :raises: KafkaException \n");
425+
" :rtype: ClusterMetadata\n"
426+
" :raises: KafkaException\n");
427+
428+
429+
static PyObject *
430+
c_group_members_to_py(Handle *self, const struct rd_kafka_group_member_info *c_members,
431+
int member_cnt) {
432+
PyObject *GroupMember_type, *list;
433+
int i;
434+
435+
GroupMember_type = cfl_PyObject_lookup("confluent_kafka.admin",
436+
"GroupMember");
437+
if (!GroupMember_type)
438+
return NULL;
439+
440+
list = PyList_New(member_cnt);
441+
if (!list)
442+
goto err;
443+
444+
for (i = 0; i < member_cnt; i++) {
445+
PyObject *member, *metadata, *assignment;
446+
447+
member = PyObject_CallObject(GroupMember_type, NULL);
448+
if (!member)
449+
goto err;
450+
451+
if (cfl_PyObject_SetString(member, "id", c_members[i].member_id) == -1) {
452+
goto err;
453+
}
454+
455+
if (cfl_PyObject_SetString(member, "client_id", c_members[i].client_id) == -1) {
456+
goto err;
457+
}
458+
459+
if (cfl_PyObject_SetString(member, "client_host", c_members[i].client_host) == -1) {
460+
goto err;
461+
}
462+
463+
metadata = PyBytes_FromStringAndSize(c_members[i].member_metadata,
464+
c_members[i].member_metadata_size);
465+
if (!metadata)
466+
goto err;
467+
468+
if (PyObject_SetAttrString(member, "metadata", metadata) == -1) {
469+
Py_DECREF(metadata);
470+
goto err;
471+
}
472+
Py_DECREF(metadata);
473+
474+
assignment = PyBytes_FromStringAndSize(c_members[i].member_assignment,
475+
c_members[i].member_assignment_size);
476+
if (!assignment)
477+
goto err;
478+
479+
if (PyObject_SetAttrString(member, "assignment", assignment) == -1) {
480+
Py_DECREF(assignment);
481+
goto err;
482+
}
483+
Py_DECREF(assignment);
484+
485+
PyList_SET_ITEM(list, i, member);
486+
}
487+
Py_DECREF(GroupMember_type);
488+
return list;
489+
err:
490+
Py_DECREF(GroupMember_type);
491+
return NULL;
492+
}
493+
494+
495+
/**
496+
* @returns a GroupMetadata object populated with all metadata information
497+
* from \p metadata, or NULL on error in which case an exception
498+
* has been raised.
499+
*/
500+
static PyObject *
501+
c_groups_to_py (Handle *self, const struct rd_kafka_group_list *group_list) {
502+
PyObject *GroupMetadata_type, *BrokerMetadata_type;
503+
PyObject *groups;
504+
int i;
505+
506+
GroupMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
507+
"GroupMetadata");
508+
if (!GroupMetadata_type)
509+
return NULL;
510+
511+
BrokerMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
512+
"BrokerMetadata");
513+
if (!BrokerMetadata_type) {
514+
Py_DECREF(GroupMetadata_type);
515+
return NULL;
516+
}
517+
518+
groups = PyList_New(group_list->group_cnt);
519+
if (!groups)
520+
goto err;
521+
for (i = 0; i < group_list->group_cnt; i++) {
522+
PyObject *group, *error, *broker, *members;
523+
524+
group = PyObject_CallObject(GroupMetadata_type, NULL);
525+
if (!group)
526+
goto err;
527+
528+
if (cfl_PyObject_SetString(group, "id",
529+
group_list->groups[i].group) == -1)
530+
goto err;
531+
532+
error = KafkaError_new_or_None(group_list->groups[i].err, NULL);
533+
534+
if (PyObject_SetAttrString(group, "error", error) == -1) {
535+
Py_DECREF(error);
536+
goto err;
537+
}
538+
539+
Py_DECREF(error);
540+
541+
if (cfl_PyObject_SetString(group, "state",
542+
group_list->groups[i].state) == -1)
543+
goto err;
544+
545+
if (cfl_PyObject_SetString(group, "protocol_type",
546+
group_list->groups[i].protocol_type) == -1)
547+
goto err;
548+
549+
if (cfl_PyObject_SetString(group, "protocol",
550+
group_list->groups[i].protocol) == -1)
551+
goto err;
552+
553+
broker = c_broker_to_py(self, BrokerMetadata_type, group_list->groups[i].broker);
554+
if (!broker)
555+
goto err;
556+
if (PyObject_SetAttrString(group, "broker", broker) == -1) {
557+
Py_DECREF(broker);
558+
goto err;
559+
}
560+
Py_DECREF(broker);
561+
562+
members = c_group_members_to_py(self, group_list->groups[i].members,
563+
group_list->groups[i].member_cnt);
564+
if (!members)
565+
goto err;
566+
if (PyObject_SetAttrString(group, "members", members) == -1) {
567+
Py_DECREF(members);
568+
goto err;
569+
}
570+
Py_DECREF(members);
571+
572+
PyList_SET_ITEM(groups, i, group);
573+
}
574+
Py_DECREF(GroupMetadata_type);
575+
Py_DECREF(BrokerMetadata_type);
576+
return groups;
577+
err:
578+
Py_DECREF(GroupMetadata_type);
579+
Py_DECREF(BrokerMetadata_type);
580+
Py_XDECREF(groups);
581+
return NULL;
582+
}
583+
584+
585+
/**
586+
* @brief List consumer groups
587+
*/
588+
PyObject *
589+
list_groups (Handle *self, PyObject *args, PyObject *kwargs) {
590+
CallState cs;
591+
PyObject *result = NULL;
592+
rd_kafka_resp_err_t err;
593+
const struct rd_kafka_group_list *group_list = NULL;
594+
const char *group = NULL;
595+
double tmout = -1.0f;
596+
static char *kws[] = {"group", "timeout", NULL};
597+
598+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|zd", kws,
599+
&group, &tmout))
600+
return NULL;
601+
602+
CallState_begin(self, &cs);
603+
604+
err = rd_kafka_list_groups(self->rk, group, &group_list,
605+
cfl_timeout_ms(tmout));
606+
607+
if (!CallState_end(self, &cs)) {
608+
/* Exception raised */
609+
goto end;
610+
}
611+
612+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
613+
cfl_PyErr_Format(err,
614+
"Failed to list groups: %s",
615+
rd_kafka_err2str(err));
616+
617+
goto end;
618+
}
619+
result = c_groups_to_py(self, group_list);
620+
end:
621+
if (group_list != NULL) {
622+
rd_kafka_group_list_destroy(group_list);
623+
}
624+
return result;
625+
}
626+
627+
const char list_groups_doc[] = PyDoc_STR(
628+
".. py:function:: list_groups([group=None], [timeout=-1])\n"
629+
"\n"
630+
" Request Group Metadata from cluster.\n"
631+
" This method provides the same information as"
632+
" listGroups(), describeGroups() in the Java Admin client.\n"
633+
"\n"
634+
" :param str group: If specified, only request info about this group, else return for all groups in cluster"
635+
" :param float timeout: Maximum response time before timing out, or -1 for infinite timeout.\n"
636+
" :rtype: GroupMetadata\n"
637+
" :raises: KafkaException\n");

confluent_kafka/src/confluent_kafka.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,9 +373,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
373373
PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
374374
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
375375
PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs);
376+
PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs);
376377

377378

378379
extern const char list_topics_doc[];
380+
extern const char list_groups_doc[];
379381

380382

381383
#ifdef RD_KAFKA_V_HEADERS

0 commit comments

Comments
 (0)