Skip to content

Commit

Permalink
ElectLeaders api(KIP-460) implemented (#1818)
Browse files Browse the repository at this point in the history
* electLeaders

* requested changes

* requested changes

* style fix

* requested changes

* whitespace error

* changes acc to new format

* requested changes

* whitespace error

* line break

* line break

* indentation changes
  • Loading branch information
PratRanj07 authored Oct 9, 2024
1 parent a356f2a commit 6642180
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 4 deletions.
36 changes: 34 additions & 2 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
AclOperation, AclPermissionType, AlterConfigOpType,
ScramMechanism, ScramCredentialInfo,
UserScramCredentialUpsertion, UserScramCredentialDeletion,
OffsetSpec)
OffsetSpec, ElectionType)
import sys
import threading
import logging
Expand Down Expand Up @@ -878,6 +878,36 @@ def example_delete_records(a, args):
f" before offset {partition.offset}: {e}")


def example_elect_leaders(a, args):
partitions = []
if (len(args) - 1) % 2 != 0:
raise ValueError("Invalid number of arguments for elect_leaders, Expected format: " +
"elect_leaders <election_type> [<topic1> <partition1>" +
" <topic2> <partition2> ..]")

try:
election_type = ElectionType[args[0]]
except KeyError:
raise ValueError(f"Invalid election_type: {args[0]}, expected 'PREFERRED' or 'UNCLEAN'")

for topic, partition in zip(args[1::2], args[2::2]):
partitions.append(TopicPartition(topic, int(partition)))

f = a.elect_leaders(election_type, partitions)
try:
results = f.result()
for partition, exception in results.items():
if exception is None:
print(f"Leader Election Successful for topic: '{partition.topic}'" +
f" partition: '{partition.partition}'")
else:
print(
"Leader Election Failed for topic: " +
f"'{partition.topic}' partition: '{partition.partition}': {exception}")
except KafkaException as e:
print(f"Error electing leaders: {e}")


if __name__ == '__main__':
if len(sys.argv) < 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
Expand Down Expand Up @@ -917,6 +947,7 @@ def example_delete_records(a, args):
sys.stderr.write(' list_offsets <isolation_level> <topic1> <partition1> <offset_spec1> ' +
'[<topic2> <partition2> <offset_spec2> ..]\n')
sys.stderr.write(' delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..]\n')
sys.stderr.write(' elect_leaders <election_type> [<topic1> <partition1> <topic2> <partition2> ..]\n')
sys.exit(1)

broker = sys.argv[1]
Expand Down Expand Up @@ -947,7 +978,8 @@ def example_delete_records(a, args):
'describe_user_scram_credentials': example_describe_user_scram_credentials,
'alter_user_scram_credentials': example_alter_user_scram_credentials,
'list_offsets': example_list_offsets,
'delete_records': example_delete_records}
'delete_records': example_delete_records,
'elect_leaders': example_elect_leaders}

if operation not in opsmap:
sys.stderr.write('Unknown operation: %s\n' % operation)
Expand Down
53 changes: 53 additions & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
from ._listoffsets import (OffsetSpec, # noqa: F401
ListOffsetsResultInfo)

from ._election import (ElectionType) # noqa: F401

from ._records import DeletedRecords # noqa: F401

from .._model import TopicCollection as _TopicCollection
Expand Down Expand Up @@ -548,6 +550,22 @@ def _check_delete_records(request):
if req.partition < 0:
raise ValueError("'partition' cannot be negative")

@staticmethod
def _check_elect_leaders(election_type, partitions):
if not isinstance(election_type, ElectionType):
raise TypeError("Expected 'election_type' to be of type 'ElectionType'")
if partitions is not None:
if not isinstance(partitions, list):
raise TypeError("Expected 'partitions' to be a list, got " +
f"'{type(partitions).__name__}'")
for partition in partitions:
if not isinstance(partition, _TopicPartition):
raise TypeError("Element of the 'partitions' list must be of type 'TopicPartition'" +
f" got '{type(partition).__name__}' ")
if partition.partition < 0:
raise ValueError("Elements of the 'partitions' list must not have negative value" +
" for 'partition' field")

def create_topics(self, new_topics, **kwargs):
"""
Create one or more new topics.
Expand Down Expand Up @@ -1258,3 +1276,38 @@ def delete_records(self, topic_partition_offsets, **kwargs):

super(AdminClient, self).delete_records(topic_partition_offsets, f, **kwargs)
return futmap

def elect_leaders(self, election_type, partitions=None, **kwargs):
"""
Perform Preferred or Unclean leader election for
all the specified topic partitions.
:param ElectionType election_type: The type of election to perform.
:param List[TopicPartition]|None partitions: The topic partitions to perform
the election on. Use ``None`` to perform on all the topic partitions.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:param float operation_timeout: The operation timeout in seconds,
controlling how long the 'elect_leaders' request will block
on the broker waiting for the election to propagate
in the cluster. A value of 0 returns immediately.
Default: `socket.timeout.ms/1000.0`
:returns: A future. Method result() of the future returns
dict[TopicPartition, KafkaException|None].
:rtype: future
:raises KafkaException: Operation failed locally or on broker.
:raises TypeError: Invalid input type.
:raises ValueError: Invalid input value.
"""

AdminClient._check_elect_leaders(election_type, partitions)

f = AdminClient._create_future()

super(AdminClient, self).elect_leaders(election_type.value, partitions, f, **kwargs)

return f
29 changes: 29 additions & 0 deletions src/confluent_kafka/admin/_election.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2024 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
from .. import cimpl as _cimpl


class ElectionType(Enum):
"""
Enumerates the different types of leader elections.
"""
PREFERRED = _cimpl.ELECTION_TYPE_PREFERRED #: Preferred election
UNCLEAN = _cimpl.ELECTION_TYPE_UNCLEAN #: Unclean election

def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
119 changes: 119 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3059,6 +3059,104 @@ const char Admin_delete_records_doc[] = PyDoc_STR(
"\n"
" This method should not be used directly, use confluent_kafka.AdminClient.delete_records()\n");

/**
* @brief Elect leaders
*/
PyObject *Admin_elect_leaders(Handle *self, PyObject *args, PyObject *kwargs) {
PyObject *election_type = NULL, *partitions = NULL, *future;
rd_kafka_ElectLeaders_t *c_elect_leaders = NULL;
rd_kafka_ElectionType_t c_election_type;
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
rd_kafka_topic_partition_list_t *c_partitions = NULL;
CallState cs;
rd_kafka_queue_t *rkqu;

static char *kws[] = {"election_type",
"partitions"
"future",
/* options */
"request_timeout", "operation_timeout", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OOO|ff", kws,
&election_type, &partitions, &future,
&options.request_timeout,
&options.operation_timeout)) {
goto err;
}

c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ELECTLEADERS,
&options, future);
if (!c_options) {
goto err; /* Exception raised by options_to_c() */
}

/* options_to_c() sets future as the opaque, which is used in the
* background_event_cb to set the results on the future as the
* admin operation is finished, so we need to keep our own refcount. */
Py_INCREF(future);

c_election_type = (rd_kafka_ElectionType_t)cfl_PyInt_AsInt(election_type);

if (partitions != Py_None && !PyList_Check(partitions)) {
PyErr_SetString(PyExc_ValueError, "partitions must be None or a list");
goto err;
}

if (partitions != Py_None) {
c_partitions = py_to_c_parts(partitions);
}

c_elect_leaders = rd_kafka_ElectLeaders_new(c_election_type, c_partitions);

if(c_partitions) {
rd_kafka_topic_partition_list_destroy(c_partitions);
}

/* Use librdkafka's background thread queue to automatically dispatch
* Admin_background_event_cb() when the admin operation is finished. */
rkqu = rd_kafka_queue_get_background(self->rk);

/**
*
* Call ElectLeaders
*
* We need to set up a CallState and release GIL here since
* the event_cb may be triggered immediately.
*
*/
CallState_begin(self, &cs);
rd_kafka_ElectLeaders(self->rk, c_elect_leaders, c_options, rkqu);
CallState_end(self, &cs);

rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

rd_kafka_AdminOptions_destroy(c_options);
rd_kafka_ElectLeaders_destroy(c_elect_leaders);

Py_RETURN_NONE;

err:
if (c_elect_leaders) {
rd_kafka_ElectLeaders_destroy(c_elect_leaders);
}
if (c_options) {
rd_kafka_AdminOptions_destroy(c_options);
Py_DECREF(future);
}
return NULL;
}

const char Admin_elect_leaders_doc[] = PyDoc_STR(
".. py:function:: elect_leaders(election_type, partitions, "
"future, [request_timeout, operation_timeout])\n"
"\n"
" Perform Preferred or Unclean election for the specified "
"Topic Partitions.\n"
"\n"
" This method should not be used directly, use "
"confluent_kafka.AdminClient.elect_leaders()\n");

/**
* @brief Call rd_kafka_poll() and keep track of crashing callbacks.
* @returns -1 if callback crashed (or poll() failed), else the number
Expand Down Expand Up @@ -3225,6 +3323,10 @@ static PyMethodDef Admin_methods[] = {
Admin_delete_records_doc
},

{ "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS,
Admin_elect_leaders_doc
},

{ NULL }
};

Expand Down Expand Up @@ -4875,6 +4977,23 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
break;
}

case RD_KAFKA_EVENT_ELECTLEADERS_RESULT:
{
size_t c_result_cnt;

const rd_kafka_ElectLeaders_result_t
*c_elect_leaders_res_event =
rd_kafka_event_ElectLeaders_result(rkev);

const rd_kafka_topic_partition_result_t **partition_results =
rd_kafka_ElectLeaders_result_partitions(
c_elect_leaders_res_event, &c_result_cnt);

result = c_topic_partition_result_to_py_dict(partition_results, c_result_cnt);

break;
}

default:
Py_DECREF(error); /* Py_None */
error = KafkaError_new0(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
Expand Down
9 changes: 9 additions & 0 deletions src/confluent_kafka/src/AdminTypes.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,14 @@ static void AdminTypes_AddObjectsOffsetSpec (PyObject *m) {
PyModule_AddIntConstant(m,"OFFSET_SPEC_LATEST", RD_KAFKA_OFFSET_SPEC_LATEST);
}

static void AdminTypes_AddObjectsElectionType(PyObject *m) {
/* rd_kafka_ElectionType_t */
PyModule_AddIntConstant(m, "ELECTION_TYPE_PREFERRED",
RD_KAFKA_ELECTION_TYPE_PREFERRED);
PyModule_AddIntConstant(m, "ELECTION_TYPE_UNCLEAN",
RD_KAFKA_ELECTION_TYPE_UNCLEAN);
}

/**
* @brief Add Admin types to module
*/
Expand All @@ -616,4 +624,5 @@ void AdminTypes_AddObjects (PyObject *m) {
AdminTypes_AddObjectsScramMechanismType(m);
AdminTypes_AddObjectsIsolationLevel(m);
AdminTypes_AddObjectsOffsetSpec(m);
AdminTypes_AddObjectsElectionType(m);
}
Loading

0 comments on commit 6642180

Please sign in to comment.