Skip to content

[KIP-460] ElectLeaders implementation public API changes and documentation improvement #1833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Supporting classes
- :ref:`ConsumerGroupTopicPartitions <pythonclient_consumer_group_topic_partition>`
- :ref:`ConsumerGroupState <pythonclient_consumer_group_state>`
- :ref:`Uuid <pythonclient_uuid>`
- :ref:`ElectionType <pythonclient_election_type>`

- Errors:
- :ref:`KafkaError <pythonclient_kafkaerror>`
Expand Down Expand Up @@ -701,6 +702,15 @@ Uuid
.. autoclass:: confluent_kafka.Uuid
:members:

.. _pythonclient_election_type:

************
ElectionType
************

.. autoclass:: confluent_kafka.ElectionType
:members:

.. _serde_field:

************
Expand Down
17 changes: 12 additions & 5 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
TopicPartition, ConsumerGroupState, TopicCollection,
IsolationLevel)
IsolationLevel, ElectionType)
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
ConfigEntry, ConfigSource, AclBinding,
AclBindingFilter, ResourceType, ResourcePatternType,
AclOperation, AclPermissionType, AlterConfigOpType,
ScramMechanism, ScramCredentialInfo,
UserScramCredentialUpsertion, UserScramCredentialDeletion,
OffsetSpec, ElectionType)
OffsetSpec)
import sys
import threading
import logging
Expand Down Expand Up @@ -893,17 +893,24 @@ def example_elect_leaders(a, args):
for topic, partition in zip(args[1::2], args[2::2]):
partitions.append(TopicPartition(topic, int(partition)))

if len(partitions) == 0:
# When passing None as partitions, election is triggered for
# all partitions in the cluster
partitions = None

f = a.elect_leaders(election_type, partitions)
try:
results = f.result()
for partition, exception in results.items():
if exception is None:
print(f"Elect leaders call returned {len(results)} result(s):")
for partition, error in results.items():
if error is None:
print(f"Leader Election Successful for topic: '{partition.topic}'" +
f" partition: '{partition.partition}'")
else:
print(
"Leader Election Failed for topic: " +
f"'{partition.topic}' partition: '{partition.partition}': {exception}")
f"'{partition.topic}' partition: '{partition.partition}' " +
f"error code: {error.code()} error message: {error.str()}")
except KafkaException as e:
print(f"Error electing leaders: {e}")

Expand Down
3 changes: 2 additions & 1 deletion src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
ConsumerGroupState,
TopicCollection,
TopicPartitionInfo,
IsolationLevel)
IsolationLevel,
ElectionType)

from .cimpl import (Producer,
Consumer,
Expand Down
19 changes: 19 additions & 0 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,22 @@ def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value


class ElectionType(Enum):
"""
Enumerates the different types of leader elections.

Values:
-------
"""

#: Preferred election
PREFERRED = cimpl.ELECTION_TYPE_PREFERRED
#: Unclean election
UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN

def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
9 changes: 4 additions & 5 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@
from ._listoffsets import (OffsetSpec, # noqa: F401
ListOffsetsResultInfo)

from ._election import (ElectionType) # noqa: F401

from ._records import DeletedRecords # noqa: F401

from .._model import TopicCollection as _TopicCollection
from .._model import (TopicCollection as _TopicCollection,
ElectionType as _ElectionType)

from ..cimpl import (KafkaException, # noqa: F401
KafkaError,
Expand Down Expand Up @@ -552,7 +551,7 @@ def _check_delete_records(request):

@staticmethod
def _check_elect_leaders(election_type, partitions):
if not isinstance(election_type, ElectionType):
if not isinstance(election_type, _ElectionType):
raise TypeError("Expected 'election_type' to be of type 'ElectionType'")
if partitions is not None:
if not isinstance(partitions, list):
Expand Down Expand Up @@ -1280,7 +1279,7 @@ def delete_records(self, topic_partition_offsets, **kwargs):
def elect_leaders(self, election_type, partitions=None, **kwargs):
"""
Perform Preferred or Unclean leader election for
all the specified topic partitions.
all the specified partitions or all partitions in the cluster.

:param ElectionType election_type: The type of election to perform.
:param List[TopicPartition]|None partitions: The topic partitions to perform
Expand Down
29 changes: 0 additions & 29 deletions src/confluent_kafka/admin/_election.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3152,7 +3152,7 @@ const char Admin_elect_leaders_doc[] = PyDoc_STR(
"future, [request_timeout, operation_timeout])\n"
"\n"
" Perform Preferred or Unclean election for the specified "
"Topic Partitions.\n"
"partion or all partition in the cluster.\n"
"\n"
" This method should not be used directly, use "
"confluent_kafka.AdminClient.elect_leaders()\n");
Expand Down
31 changes: 2 additions & 29 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -408,33 +408,6 @@ static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason) {
PyErr_SetObject(KafkaException, eo);
}

/**
* @brief Creates a KafkaException from error code and error string.
*/
PyObject *KafkaException_new_or_none (rd_kafka_resp_err_t err, const char *str) {
if (err) {
PyObject *excargs , *exc;
PyObject *error = KafkaError_new0(err, str);

excargs = PyTuple_New(1);
PyTuple_SetItem(excargs, 0, error);

exc = ((PyTypeObject *)KafkaException)->tp_new(
(PyTypeObject *)KafkaException, NULL, NULL);
exc->ob_type->tp_init(exc, excargs, NULL);

Py_DECREF(excargs);
Py_DECREF(error);

return exc;
}
else
Py_RETURN_NONE;
}





/****************************************************************************
*
Expand Down Expand Up @@ -1403,8 +1376,8 @@ PyObject *c_topic_partition_result_to_py_dict(
rd_kafka_topic_partition_result_partition(partition_results[i]);
c_error = rd_kafka_topic_partition_result_error(partition_results[i]);

value = KafkaException_new_or_none(rd_kafka_error_code(c_error),
rd_kafka_error_string(c_error));
value = KafkaError_new_or_None(rd_kafka_error_code(c_error),
rd_kafka_error_string(c_error));
key = c_part_to_py(c_topic_partition);

PyDict_SetItem(result, key, value);
Expand Down
1 change: 0 additions & 1 deletion src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ extern PyObject *KafkaException;
PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...);
PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str);
PyObject *KafkaError_new_from_error_destroy (rd_kafka_error_t *error);
PyObject *KafkaException_new_or_none (rd_kafka_resp_err_t err, const char *str);

/**
* @brief Raise an exception using KafkaError.
Expand Down
9 changes: 6 additions & 3 deletions tests/test_Admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
ResourcePatternType, AclOperation, AclPermissionType, AlterConfigOpType, \
ScramCredentialInfo, ScramMechanism, \
UserScramCredentialAlteration, UserScramCredentialDeletion, \
UserScramCredentialUpsertion, OffsetSpec, \
ElectionType
UserScramCredentialUpsertion, OffsetSpec
from confluent_kafka import KafkaException, KafkaError, libversion, \
TopicPartition, ConsumerGroupTopicPartitions, ConsumerGroupState, \
IsolationLevel, TopicCollection
IsolationLevel, TopicCollection, ElectionType
import concurrent.futures


Expand Down Expand Up @@ -1230,3 +1229,7 @@ def test_elect_leaders():

with pytest.raises(ValueError):
a.elect_leaders(correct_election_type, [incorrect_partitions])

with pytest.raises(KafkaException):
a.elect_leaders(correct_election_type, [correct_partitions])\
.result(timeout=1)