Skip to content

Add AdminClient.list_groups API #948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,3 +573,57 @@ def __repr__(self):

def __str__(self):
return "{}".format(self.id)


class GroupMember(object):
"""Group member information

For more information on metadata format, see
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI

This class is typically not user instantiated.

:ivar str id: Member id (generated by broker)
:ivar str client_id: Client id
:ivar str client_host: Client hostname
:ivar bytes metadata: Member metadata(binary), format depends on protocol type
:ivar bytes assignment: Member assignment(binary), format depends on protocol type
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to have APIs to parse metadata and assignment.

cc confluentinc/librdkafka#2643

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be great, but could be provided as a separate python utility package, perhaps auto-generated from the AK json protocol specs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested in decoding this data... is this work in progress somewhere? Or maybe point me to the spec you're referring to (I couldn't find it with limited googling) and I can take a crack at it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

@JjudeGotLemon JjudeGotLemon May 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snagafritz What's your solution to decode it? Could you share it out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def parse_member_assignment(buf):
    '''Parse Kafka group assignment'''
    import struct
    assigns = []
    if not buf:
        return assigns
    index = struct.calcsize('>hi')
    _version, assign_len = struct.unpack('>hi', buf[0:index])
    for _ in range(assign_len):
        size = struct.calcsize('>h')
        str_len = struct.unpack('>h', buf[index:index + size])[0]
        index += size
        topic = buf[index: index + str_len].decode('utf-8')
        index += str_len;
        size = struct.calcsize('>i')
        part_len = struct.unpack('>i', buf[index:index + size])[0]
        index += size
        parts = []
        for _ in range(part_len):
            part = struct.unpack('>i', buf[index:index + size])[0]
            parts.append(part)
            index += size
        assigns.append((topic, parts))
    return assigns

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@messense Very helpful! Thanks

"""
def __init__(self,):
self.id = None
self.client_id = None
self.client_host = None
self.metadata = None
self.assignment = None


class GroupMetadata(object):
"""GroupMetadata contains information about a Kafka consumer group

This class is typically not user instantiated.

:ivar BrokerMetadata broker: Originating broker metadata
:ivar str id: Group name
:ivar KafkaError -error: Broker-originated error, or None. Value is a KafkaError object.
:ivar str state: Group state
:ivar str protocol_type: Group protocol type
:ivar str protocol: Group protocol
:ivar list(GroupMember) members: Group members
"""
def __init__(self):
self.broker = None
self.id = None
self.error = None
self.state = None
self.protocol_type = None
self.protocol = None
self.members = []

def __repr__(self):
if self.error is not None:
return "GroupMetadata({}, {})".format(self.id, self.error)
else:
return "GroupMetadata({})".format(self.id)

def __str__(self):
return self.id
4 changes: 4 additions & 0 deletions confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,10 @@ static PyMethodDef Admin_methods[] = {
list_topics_doc
},

{ "list_groups", (PyCFunction)list_groups, METH_VARARGS|METH_KEYWORDS,
list_groups_doc
},

{ NULL }
};

Expand Down
262 changes: 246 additions & 16 deletions confluent_kafka/src/Metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,38 @@ c_topics_to_py (Handle *self, const rd_kafka_metadata_topic_t *c_topics,
}


static PyObject *c_broker_to_py(Handle *self, PyObject *BrokerMetadata_type,
const rd_kafka_metadata_broker_t c_broker) {
PyObject *broker;
PyObject *key;

broker = PyObject_CallObject(BrokerMetadata_type, NULL);
if (!broker)
return NULL;

key = cfl_PyInt_FromInt(c_broker.id);

if (PyObject_SetAttrString(broker, "id", key) == -1) {
Py_DECREF(key);
Py_DECREF(broker);
return NULL;
}
Py_DECREF(key);

if (cfl_PyObject_SetString(broker, "host",
c_broker.host) == -1) {
Py_DECREF(broker);
return NULL;
}
if (cfl_PyObject_SetInt(broker, "port",
(int)c_broker.port) == -1) {
Py_DECREF(broker);
return NULL;
}
return broker;
}


/**
* @returns a dict<broker_id, BrokerMetadata>, or NULL (and exception) on error.
*/
Expand All @@ -213,7 +245,7 @@ static PyObject *c_brokers_to_py (Handle *self,
PyObject *broker;
PyObject *key;

broker = PyObject_CallObject(BrokerMetadata_type, NULL);
broker = c_broker_to_py(self, BrokerMetadata_type, c_brokers[i]);
if (!broker)
goto err;

Expand All @@ -226,19 +258,6 @@ static PyObject *c_brokers_to_py (Handle *self,
}

Py_DECREF(broker);

if (PyObject_SetAttrString(broker, "id", key) == -1) {
Py_DECREF(key);
goto err;
}
Py_DECREF(key);

if (cfl_PyObject_SetString(broker, "host",
c_brokers[i].host) == -1)
goto err;
if (cfl_PyObject_SetInt(broker, "port",
(int)c_brokers[i].port) == -1)
goto err;
}

Py_DECREF(BrokerMetadata_type);
Expand Down Expand Up @@ -403,5 +422,216 @@ const char list_topics_doc[] = PyDoc_STR(
"\n"
" :param str topic: If specified, only request info about this topic, else return for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified it will be created.\n"
" :param float timeout: Maximum response time before timing out, or -1 for infinite timeout.\n"
" :rtype: ClusterMetadata \n"
" :raises: KafkaException \n");
" :rtype: ClusterMetadata\n"
" :raises: KafkaException\n");


static PyObject *
c_group_members_to_py(Handle *self, const struct rd_kafka_group_member_info *c_members,
int member_cnt) {
PyObject *GroupMember_type, *list;
int i;

GroupMember_type = cfl_PyObject_lookup("confluent_kafka.admin",
"GroupMember");
if (!GroupMember_type)
return NULL;

list = PyList_New(member_cnt);
if (!list)
goto err;

for (i = 0; i < member_cnt; i++) {
PyObject *member, *metadata, *assignment;

member = PyObject_CallObject(GroupMember_type, NULL);
if (!member)
goto err;

if (cfl_PyObject_SetString(member, "id", c_members[i].member_id) == -1) {
goto err;
}

if (cfl_PyObject_SetString(member, "client_id", c_members[i].client_id) == -1) {
goto err;
}

if (cfl_PyObject_SetString(member, "client_host", c_members[i].client_host) == -1) {
goto err;
}

metadata = PyBytes_FromStringAndSize(c_members[i].member_metadata,
c_members[i].member_metadata_size);
if (!metadata)
goto err;

if (PyObject_SetAttrString(member, "metadata", metadata) == -1) {
Py_DECREF(metadata);
goto err;
}
Py_DECREF(metadata);

assignment = PyBytes_FromStringAndSize(c_members[i].member_assignment,
c_members[i].member_assignment_size);
if (!assignment)
goto err;

if (PyObject_SetAttrString(member, "assignment", assignment) == -1) {
Py_DECREF(assignment);
goto err;
}
Py_DECREF(assignment);

PyList_SET_ITEM(list, i, member);
}
Py_DECREF(GroupMember_type);
return list;
err:
Py_DECREF(GroupMember_type);
return NULL;
}


/**
* @returns a GroupMetadata object populated with all metadata information
* from \p metadata, or NULL on error in which case an exception
* has been raised.
*/
static PyObject *
c_groups_to_py (Handle *self, const struct rd_kafka_group_list *group_list) {
PyObject *GroupMetadata_type, *BrokerMetadata_type;
PyObject *groups;
int i;

GroupMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
"GroupMetadata");
if (!GroupMetadata_type)
return NULL;

BrokerMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
"BrokerMetadata");
if (!BrokerMetadata_type) {
Py_DECREF(GroupMetadata_type);
return NULL;
}

groups = PyList_New(group_list->group_cnt);
if (!groups)
goto err;
for (i = 0; i < group_list->group_cnt; i++) {
PyObject *group, *error, *broker, *members;

group = PyObject_CallObject(GroupMetadata_type, NULL);
if (!group)
goto err;

if (cfl_PyObject_SetString(group, "id",
group_list->groups[i].group) == -1)
goto err;

error = KafkaError_new_or_None(group_list->groups[i].err, NULL);

if (PyObject_SetAttrString(group, "error", error) == -1) {
Py_DECREF(error);
goto err;
}

Py_DECREF(error);

if (cfl_PyObject_SetString(group, "state",
group_list->groups[i].state) == -1)
goto err;

if (cfl_PyObject_SetString(group, "protocol_type",
group_list->groups[i].protocol_type) == -1)
goto err;

if (cfl_PyObject_SetString(group, "protocol",
group_list->groups[i].protocol) == -1)
goto err;

broker = c_broker_to_py(self, BrokerMetadata_type, group_list->groups[i].broker);
if (!broker)
goto err;
if (PyObject_SetAttrString(group, "broker", broker) == -1) {
Py_DECREF(broker);
goto err;
}
Py_DECREF(broker);

members = c_group_members_to_py(self, group_list->groups[i].members,
group_list->groups[i].member_cnt);
if (!members)
goto err;
if (PyObject_SetAttrString(group, "members", members) == -1) {
Py_DECREF(members);
goto err;
}
Py_DECREF(members);

PyList_SET_ITEM(groups, i, group);
}
Py_DECREF(GroupMetadata_type);
Py_DECREF(BrokerMetadata_type);
return groups;
err:
Py_DECREF(GroupMetadata_type);
Py_DECREF(BrokerMetadata_type);
Py_XDECREF(groups);
return NULL;
}


/**
* @brief List consumer groups
*/
PyObject *
list_groups (Handle *self, PyObject *args, PyObject *kwargs) {
CallState cs;
PyObject *result = NULL;
rd_kafka_resp_err_t err;
const struct rd_kafka_group_list *group_list = NULL;
const char *group = NULL;
double tmout = -1.0f;
static char *kws[] = {"group", "timeout", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|zd", kws,
&group, &tmout))
return NULL;

CallState_begin(self, &cs);

err = rd_kafka_list_groups(self->rk, group, &group_list,
cfl_timeout_ms(tmout));

if (!CallState_end(self, &cs)) {
/* Exception raised */
goto end;
}

if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
cfl_PyErr_Format(err,
"Failed to list groups: %s",
rd_kafka_err2str(err));

goto end;
}
result = c_groups_to_py(self, group_list);
end:
if (group_list != NULL) {
rd_kafka_group_list_destroy(group_list);
}
return result;
}

const char list_groups_doc[] = PyDoc_STR(
".. py:function:: list_groups([group=None], [timeout=-1])\n"
"\n"
" Request Group Metadata from cluster.\n"
" This method provides the same information as"
" listGroups(), describeGroups() in the Java Admin client.\n"
"\n"
" :param str group: If specified, only request info about this group, else return for all groups in cluster"
" :param float timeout: Maximum response time before timing out, or -1 for infinite timeout.\n"
" :rtype: GroupMetadata\n"
" :raises: KafkaException\n");
2 changes: 2 additions & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs);
PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs);


extern const char list_topics_doc[];
extern const char list_groups_doc[];


#ifdef RD_KAFKA_V_HEADERS
Expand Down
Loading