From 66421807d846efe6b4fdc3bdcfcb97586ed2d796 Mon Sep 17 00:00:00 2001 From: PratRanj07 <156985928+PratRanj07@users.noreply.github.com> Date: Thu, 10 Oct 2024 04:34:24 +0530 Subject: [PATCH] ElectLeaders api(KIP-460) implemented (#1818) * electLeaders * requested changes * requested changes * style fix * requested changes * whitespace error * changes acc to new format * requested changes * whitespace error * line break * line break * indentation changes --- examples/adminapi.py | 36 ++++++- src/confluent_kafka/admin/__init__.py | 53 ++++++++++ src/confluent_kafka/admin/_election.py | 29 ++++++ src/confluent_kafka/src/Admin.c | 119 ++++++++++++++++++++++ src/confluent_kafka/src/AdminTypes.c | 9 ++ src/confluent_kafka/src/confluent_kafka.c | 61 ++++++++++- src/confluent_kafka/src/confluent_kafka.h | 4 + tests/test_Admin.py | 38 ++++++- 8 files changed, 345 insertions(+), 4 deletions(-) create mode 100644 src/confluent_kafka/admin/_election.py diff --git a/examples/adminapi.py b/examples/adminapi.py index 8442d6453..08b244670 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -26,7 +26,7 @@ AclOperation, AclPermissionType, AlterConfigOpType, ScramMechanism, ScramCredentialInfo, UserScramCredentialUpsertion, UserScramCredentialDeletion, - OffsetSpec) + OffsetSpec, ElectionType) import sys import threading import logging @@ -878,6 +878,36 @@ def example_delete_records(a, args): f" before offset {partition.offset}: {e}") +def example_elect_leaders(a, args): + partitions = [] + if (len(args) - 1) % 2 != 0: + raise ValueError("Invalid number of arguments for elect_leaders, Expected format: " + + "elect_leaders [ " + + " ..]") + + try: + election_type = ElectionType[args[0]] + except KeyError: + raise ValueError(f"Invalid election_type: {args[0]}, expected 'PREFERRED' or 'UNCLEAN'") + + for topic, partition in zip(args[1::2], args[2::2]): + partitions.append(TopicPartition(topic, int(partition))) + + f = a.elect_leaders(election_type, partitions) + try: + results = f.result() + for partition, exception in results.items(): + if exception is None: + print(f"Leader Election Successful for topic: '{partition.topic}'" + + f" partition: '{partition.partition}'") + else: + print( + "Leader Election Failed for topic: " + + f"'{partition.topic}' partition: '{partition.partition}': {exception}") + except KafkaException as e: + print(f"Error electing leaders: {e}") + + if __name__ == '__main__': if len(sys.argv) < 3: sys.stderr.write('Usage: %s \n\n' % sys.argv[0]) @@ -917,6 +947,7 @@ def example_delete_records(a, args): sys.stderr.write(' list_offsets ' + '[ ..]\n') sys.stderr.write(' delete_records [ ..]\n') + sys.stderr.write(' elect_leaders [ ..]\n') sys.exit(1) broker = sys.argv[1] @@ -947,7 +978,8 @@ def example_delete_records(a, args): 'describe_user_scram_credentials': example_describe_user_scram_credentials, 'alter_user_scram_credentials': example_alter_user_scram_credentials, 'list_offsets': example_list_offsets, - 'delete_records': example_delete_records} + 'delete_records': example_delete_records, + 'elect_leaders': example_elect_leaders} if operation not in opsmap: sys.stderr.write('Unknown operation: %s\n' % operation) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 9101b651f..f128ef81c 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -54,6 +54,8 @@ from ._listoffsets import (OffsetSpec, # noqa: F401 ListOffsetsResultInfo) +from ._election import (ElectionType) # noqa: F401 + from ._records import DeletedRecords # noqa: F401 from .._model import TopicCollection as _TopicCollection @@ -548,6 +550,22 @@ def _check_delete_records(request): if req.partition < 0: raise ValueError("'partition' cannot be negative") + @staticmethod + def _check_elect_leaders(election_type, partitions): + if not isinstance(election_type, ElectionType): + raise TypeError("Expected 'election_type' to be of type 'ElectionType'") + if partitions is not None: + if not isinstance(partitions, list): + raise TypeError("Expected 'partitions' to be a list, got " + + f"'{type(partitions).__name__}'") + for partition in partitions: + if not isinstance(partition, _TopicPartition): + raise TypeError("Element of the 'partitions' list must be of type 'TopicPartition'" + + f" got '{type(partition).__name__}' ") + if partition.partition < 0: + raise ValueError("Elements of the 'partitions' list must not have negative value" + + " for 'partition' field") + def create_topics(self, new_topics, **kwargs): """ Create one or more new topics. @@ -1258,3 +1276,38 @@ def delete_records(self, topic_partition_offsets, **kwargs): super(AdminClient, self).delete_records(topic_partition_offsets, f, **kwargs) return futmap + + def elect_leaders(self, election_type, partitions=None, **kwargs): + """ + Perform Preferred or Unclean leader election for + all the specified topic partitions. + + :param ElectionType election_type: The type of election to perform. + :param List[TopicPartition]|None partitions: The topic partitions to perform + the election on. Use ``None`` to perform on all the topic partitions. + :param float request_timeout: The overall request timeout in seconds, + including broker lookup, request transmission, operation time + on broker, and response. Default: `socket.timeout.ms*1000.0` + :param float operation_timeout: The operation timeout in seconds, + controlling how long the 'elect_leaders' request will block + on the broker waiting for the election to propagate + in the cluster. A value of 0 returns immediately. + Default: `socket.timeout.ms/1000.0` + + :returns: A future. Method result() of the future returns + dict[TopicPartition, KafkaException|None]. + + :rtype: future + + :raises KafkaException: Operation failed locally or on broker. + :raises TypeError: Invalid input type. + :raises ValueError: Invalid input value. + """ + + AdminClient._check_elect_leaders(election_type, partitions) + + f = AdminClient._create_future() + + super(AdminClient, self).elect_leaders(election_type.value, partitions, f, **kwargs) + + return f diff --git a/src/confluent_kafka/admin/_election.py b/src/confluent_kafka/admin/_election.py new file mode 100644 index 000000000..adc3d7790 --- /dev/null +++ b/src/confluent_kafka/admin/_election.py @@ -0,0 +1,29 @@ +# Copyright 2024 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from .. import cimpl as _cimpl + + +class ElectionType(Enum): + """ + Enumerates the different types of leader elections. + """ + PREFERRED = _cimpl.ELECTION_TYPE_PREFERRED #: Preferred election + UNCLEAN = _cimpl.ELECTION_TYPE_UNCLEAN #: Unclean election + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self.value < other.value diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index c58166d6e..d03d78e27 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -3059,6 +3059,104 @@ const char Admin_delete_records_doc[] = PyDoc_STR( "\n" " This method should not be used directly, use confluent_kafka.AdminClient.delete_records()\n"); +/** + * @brief Elect leaders + */ +PyObject *Admin_elect_leaders(Handle *self, PyObject *args, PyObject *kwargs) { + PyObject *election_type = NULL, *partitions = NULL, *future; + rd_kafka_ElectLeaders_t *c_elect_leaders = NULL; + rd_kafka_ElectionType_t c_election_type; + struct Admin_options options = Admin_options_INITIALIZER; + rd_kafka_AdminOptions_t *c_options = NULL; + rd_kafka_topic_partition_list_t *c_partitions = NULL; + CallState cs; + rd_kafka_queue_t *rkqu; + + static char *kws[] = {"election_type", + "partitions" + "future", + /* options */ + "request_timeout", "operation_timeout", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OOO|ff", kws, + &election_type, &partitions, &future, + &options.request_timeout, + &options.operation_timeout)) { + goto err; + } + + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ELECTLEADERS, + &options, future); + if (!c_options) { + goto err; /* Exception raised by options_to_c() */ + } + + /* options_to_c() sets future as the opaque, which is used in the + * background_event_cb to set the results on the future as the + * admin operation is finished, so we need to keep our own refcount. */ + Py_INCREF(future); + + c_election_type = (rd_kafka_ElectionType_t)cfl_PyInt_AsInt(election_type); + + if (partitions != Py_None && !PyList_Check(partitions)) { + PyErr_SetString(PyExc_ValueError, "partitions must be None or a list"); + goto err; + } + + if (partitions != Py_None) { + c_partitions = py_to_c_parts(partitions); + } + + c_elect_leaders = rd_kafka_ElectLeaders_new(c_election_type, c_partitions); + + if(c_partitions) { + rd_kafka_topic_partition_list_destroy(c_partitions); + } + + /* Use librdkafka's background thread queue to automatically dispatch + * Admin_background_event_cb() when the admin operation is finished. */ + rkqu = rd_kafka_queue_get_background(self->rk); + + /** + * + * Call ElectLeaders + * + * We need to set up a CallState and release GIL here since + * the event_cb may be triggered immediately. + * + */ + CallState_begin(self, &cs); + rd_kafka_ElectLeaders(self->rk, c_elect_leaders, c_options, rkqu); + CallState_end(self, &cs); + + rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ + + rd_kafka_AdminOptions_destroy(c_options); + rd_kafka_ElectLeaders_destroy(c_elect_leaders); + + Py_RETURN_NONE; + +err: + if (c_elect_leaders) { + rd_kafka_ElectLeaders_destroy(c_elect_leaders); + } + if (c_options) { + rd_kafka_AdminOptions_destroy(c_options); + Py_DECREF(future); + } + return NULL; +} + +const char Admin_elect_leaders_doc[] = PyDoc_STR( + ".. py:function:: elect_leaders(election_type, partitions, " + "future, [request_timeout, operation_timeout])\n" + "\n" + " Perform Preferred or Unclean election for the specified " + "Topic Partitions.\n" + "\n" + " This method should not be used directly, use " + "confluent_kafka.AdminClient.elect_leaders()\n"); + /** * @brief Call rd_kafka_poll() and keep track of crashing callbacks. * @returns -1 if callback crashed (or poll() failed), else the number @@ -3225,6 +3323,10 @@ static PyMethodDef Admin_methods[] = { Admin_delete_records_doc }, + { "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS, + Admin_elect_leaders_doc + }, + { NULL } }; @@ -4875,6 +4977,23 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, break; } + case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: + { + size_t c_result_cnt; + + const rd_kafka_ElectLeaders_result_t + *c_elect_leaders_res_event = + rd_kafka_event_ElectLeaders_result(rkev); + + const rd_kafka_topic_partition_result_t **partition_results = + rd_kafka_ElectLeaders_result_partitions( + c_elect_leaders_res_event, &c_result_cnt); + + result = c_topic_partition_result_to_py_dict(partition_results, c_result_cnt); + + break; + } + default: Py_DECREF(error); /* Py_None */ error = KafkaError_new0(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, diff --git a/src/confluent_kafka/src/AdminTypes.c b/src/confluent_kafka/src/AdminTypes.c index 43ca665ba..4a9d37c1e 100644 --- a/src/confluent_kafka/src/AdminTypes.c +++ b/src/confluent_kafka/src/AdminTypes.c @@ -597,6 +597,14 @@ static void AdminTypes_AddObjectsOffsetSpec (PyObject *m) { PyModule_AddIntConstant(m,"OFFSET_SPEC_LATEST", RD_KAFKA_OFFSET_SPEC_LATEST); } +static void AdminTypes_AddObjectsElectionType(PyObject *m) { + /* rd_kafka_ElectionType_t */ + PyModule_AddIntConstant(m, "ELECTION_TYPE_PREFERRED", + RD_KAFKA_ELECTION_TYPE_PREFERRED); + PyModule_AddIntConstant(m, "ELECTION_TYPE_UNCLEAN", + RD_KAFKA_ELECTION_TYPE_UNCLEAN); +} + /** * @brief Add Admin types to module */ @@ -616,4 +624,5 @@ void AdminTypes_AddObjects (PyObject *m) { AdminTypes_AddObjectsScramMechanismType(m); AdminTypes_AddObjectsIsolationLevel(m); AdminTypes_AddObjectsOffsetSpec(m); + AdminTypes_AddObjectsElectionType(m); } diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 506c9a995..a25f289cc 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -340,7 +340,6 @@ static PyTypeObject KafkaErrorType = { KafkaError_new /* tp_new */ }; - /** * @brief Internal factory to create KafkaError object. */ @@ -409,6 +408,30 @@ static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason) { PyErr_SetObject(KafkaException, eo); } +/** + * @brief Creates a KafkaException from error code and error string. + */ +PyObject *KafkaException_new_or_none (rd_kafka_resp_err_t err, const char *str) { + if (err) { + PyObject *excargs , *exc; + PyObject *error = KafkaError_new0(err, str); + + excargs = PyTuple_New(1); + PyTuple_SetItem(excargs, 0, error); + + exc = ((PyTypeObject *)KafkaException)->tp_new( + (PyTypeObject *)KafkaException, NULL, NULL); + exc->ob_type->tp_init(exc, excargs, NULL); + + Py_DECREF(excargs); + Py_DECREF(error); + + return exc; + } + else + Py_RETURN_NONE; +} + @@ -1357,6 +1380,42 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { return c_parts; } +/** + * @brief Convert C rd_kafka_topic_partition_result_t to Python dict(TopicPartition, KafkaException). + * + * @returns The new Python dict object. + */ +PyObject *c_topic_partition_result_to_py_dict( + const rd_kafka_topic_partition_result_t **partition_results, + size_t cnt) { + PyObject *result = NULL; + size_t i; + + result = PyDict_New(); + + for (i = 0; i < cnt; i++) { + PyObject *key; + PyObject *value; + const rd_kafka_topic_partition_t *c_topic_partition; + const rd_kafka_error_t *c_error; + + c_topic_partition = + rd_kafka_topic_partition_result_partition(partition_results[i]); + c_error = rd_kafka_topic_partition_result_error(partition_results[i]); + + value = KafkaException_new_or_none(rd_kafka_error_code(c_error), + rd_kafka_error_string(c_error)); + key = c_part_to_py(c_topic_partition); + + PyDict_SetItem(result, key, value); + + Py_DECREF(key); + Py_DECREF(value); + } + + return result; +} + #ifdef RD_KAFKA_V_HEADERS diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index d6ad31635..2895a16bc 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -188,6 +188,7 @@ extern PyObject *KafkaException; PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...); PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str); PyObject *KafkaError_new_from_error_destroy (rd_kafka_error_t *error); +PyObject *KafkaException_new_or_none (rd_kafka_resp_err_t err, const char *str); /** * @brief Raise an exception using KafkaError. @@ -387,6 +388,9 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts); PyObject *c_Node_to_py(const rd_kafka_Node_t *c_node); PyObject *c_Uuid_to_py(const rd_kafka_Uuid_t *c_uuid); rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist); +PyObject *c_topic_partition_result_to_py_dict( + const rd_kafka_topic_partition_result_t **partition_results, + size_t cnt); PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs); PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs); PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs); diff --git a/tests/test_Admin.py b/tests/test_Admin.py index 5e8ff2e6c..2f01c4da3 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -6,7 +6,8 @@ ResourcePatternType, AclOperation, AclPermissionType, AlterConfigOpType, \ ScramCredentialInfo, ScramMechanism, \ UserScramCredentialAlteration, UserScramCredentialDeletion, \ - UserScramCredentialUpsertion, OffsetSpec + UserScramCredentialUpsertion, OffsetSpec, \ + ElectionType from confluent_kafka import KafkaException, KafkaError, libversion, \ TopicPartition, ConsumerGroupTopicPartitions, ConsumerGroupState, \ IsolationLevel, TopicCollection @@ -1194,3 +1195,38 @@ def test_delete_records(): with pytest.raises(ValueError): a.delete_records([TopicPartition("test-topic1")]) + + +def test_elect_leaders(): + a = AdminClient({"socket.timeout.ms": 10}) + + correct_partitions = TopicPartition("test-topic1", 0) + incorrect_partitions = TopicPartition("test-topic1", -1) + + correct_election_type = ElectionType.PREFERRED + + # Incorrect Election Type + with pytest.raises(TypeError): + a.elect_leaders(None, [correct_partitions]) + + with pytest.raises(TypeError): + a.elect_leaders("1", [correct_partitions]) + + # Incorrect Partitions type + with pytest.raises(TypeError, match="Expected 'partitions' to be a list, got 'str'"): + a.elect_leaders(correct_election_type, "1") + + # Partition-specific tests + with pytest.raises(TypeError, + match="Element of the 'partitions' list must be of type 'TopicPartition' got 'str'"): + a.elect_leaders(correct_election_type, ["test-1"]) + + with pytest.raises(TypeError, + match="Element of the 'partitions' list must be of type 'TopicPartition' got 'NoneType'"): + a.elect_leaders(correct_election_type, [None]) + + with pytest.raises(ValueError): + a.elect_leaders(correct_election_type, [TopicPartition("")]) + + with pytest.raises(ValueError): + a.elect_leaders(correct_election_type, [incorrect_partitions])