Skip to content

Add throttle_cb support (#237) #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jun 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,9 @@ In order to run full test suite, simply execute:
**NOTE**: Requires `tox` (please install with `pip install tox`), several supported versions of Python on your path, and `librdkafka` [installed](tools/bootstrap-librdkafka.sh) into `tmp-build`.


**Run integration tests:**

To run the integration tests, uncomment the following line from `tox.ini` and add the paths to your Kafka and Confluent Schema Registry instances. You can also run the integration tests outside of `tox` by running this command from the source root.

examples/integration_test.py <kafka-broker> [<test-topic>] [<schema-registry>]

**WARNING**: These tests require an active Kafka cluster and will create new topics.
**Integration tests:**

See [tests/README.md](tests/README.md) for instructions on how to run integration tests.



Expand Down
22 changes: 22 additions & 0 deletions confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,3 +634,25 @@ def __repr__(self):

def __str__(self):
return "{}".format(self.partition)


class ThrottleEvent (object):
"""
ThrottleEvent contains details about a throttled request.

This class is typically not user instantiated.

:ivar broker_name str: The hostname of the broker which throttled the request
:ivar broker_id int: The broker id
:ivar throttle_time float: The amount of time (in seconds) the broker throttled (delayed) the request
"""
def __init__(self, broker_name,
broker_id,
throttle_time):

self.broker_name = broker_name
self.broker_id = broker_id
self.throttle_time = throttle_time

def __str__(self):
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, int(self.throttle_time * 1000))
88 changes: 88 additions & 0 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,63 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
CallState_resume(cs);
}

/**
* @brief librdkafka throttle callback triggered by poll() or flush(), triggers the
* corresponding Python throttle_cb
*/
static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id,
int throttle_time_ms, void *opaque) {
Handle *h = opaque;
PyObject *ThrottleEvent_type, *throttle_event;
PyObject *result, *kwargs, *args;
CallState *cs;

cs = CallState_get(h);
if (!h->throttle_cb) {
/* No callback defined */
goto done;
}

ThrottleEvent_type = cfl_PyObject_lookup("confluent_kafka",
"ThrottleEvent");

if (!ThrottleEvent_type) {
/* ThrottleEvent class not found */
goto err;
}

args = Py_BuildValue("(sid)", broker_name, broker_id, (double)throttle_time_ms/1000);
throttle_event = PyObject_Call(ThrottleEvent_type, args, NULL);

Py_DECREF(args);
Py_DECREF(ThrottleEvent_type);

if (!throttle_event) {
/* Failed to instantiate ThrottleEvent object */
goto err;
}

result = PyObject_CallFunctionObjArgs(h->throttle_cb, throttle_event, NULL);

Py_DECREF(throttle_event);

if (result) {
/* throttle_cb executed successfully */
Py_DECREF(result);
goto done;
}

/**
* Stop callback dispatcher, return err to application
* fall-through to unlock GIL
*/
err:
CallState_crash(cs);
rd_kafka_yield(h->rk);
done:
CallState_resume(cs);
}

static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
Handle *h = opaque;
PyObject *eo = NULL, *result = NULL;
Expand Down Expand Up @@ -1266,6 +1323,9 @@ void Handle_clear (Handle *h) {
if (h->error_cb)
Py_DECREF(h->error_cb);

if (h->throttle_cb)
Py_DECREF(h->throttle_cb);

if (h->stats_cb)
Py_DECREF(h->stats_cb);

Expand All @@ -1287,6 +1347,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) {
if (h->error_cb)
Py_VISIT(h->error_cb);

if (h->throttle_cb)
Py_VISIT(h->throttle_cb);

if (h->stats_cb)
Py_VISIT(h->stats_cb);

Expand Down Expand Up @@ -1620,6 +1683,28 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_XDECREF(ks8);
Py_DECREF(ks);
continue;
} else if (!strcmp(k, "throttle_cb")) {
if (!PyCallable_Check(vo)) {
PyErr_SetString(PyExc_ValueError,
"expected throttle_cb property "
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_XDECREF(ks8);
Py_DECREF(ks);
return NULL;
}
if (h->throttle_cb) {
Py_DECREF(h->throttle_cb);
h->throttle_cb = NULL;
}
if (vo != Py_None) {
h->throttle_cb = vo;
Py_INCREF(h->throttle_cb);
}
Py_XDECREF(ks8);
Py_DECREF(ks);
continue;
} else if (!strcmp(k, "stats_cb")) {
if (!PyCallable_Check(vo)) {
PyErr_SetString(PyExc_TypeError,
Expand Down Expand Up @@ -1719,6 +1804,9 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
if (h->error_cb)
rd_kafka_conf_set_error_cb(conf, error_cb);

if (h->throttle_cb)
rd_kafka_conf_set_throttle_cb(conf, throttle_cb);

if (h->stats_cb)
rd_kafka_conf_set_stats_cb(conf, stats_cb);

Expand Down
1 change: 1 addition & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ typedef struct {
PyObject_HEAD
rd_kafka_t *rk;
PyObject *error_cb;
PyObject *throttle_cb;
PyObject *stats_cb;
int initiated;

Expand Down
17 changes: 11 additions & 6 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,17 @@ The Python bindings also provide some additional configuration properties:
* ``default.topic.config``: value is a dict of topic-level configuration
properties that are applied to all used topics for the instance.

* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by
poll().
* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served upon calling
``client.poll()`` or ``producer.flush()``.

* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll()
* ``throttle_cb(confluent_kafka.ThrottleEvent)``: Callback for throttled request reporting.
This callback is served upon calling ``client.poll()`` or ``producer.flush()``.

* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll() or flush
every ``statistics.interval.ms`` (needs to be configured separately).
Function argument ``json_str`` is a str instance of a JSON document containing statistics data.
This callback is served upon calling ``client.poll()`` or ``producer.flush()``. See
https://github.com/edenhill/librdkafka/wiki/Statistics" for more information.

* ``on_delivery(kafka.KafkaError, kafka.Message)`` (**Producer**): value is a Python function reference
that is called once for each produced message to indicate the final
Expand All @@ -103,15 +108,15 @@ The Python bindings also provide some additional configuration properties:
(or ``on_delivery=callable``) to the confluent_kafka.Producer.produce() function.
Currently message headers are not supported on the message returned to the
callback. The ``msg.headers()`` will return None even if the original message
had headers set.
had headers set. This callback is served upon calling ``producer.poll()`` or ``producer.flush()``.

* ``on_commit(kafka.KafkaError, list(kafka.TopicPartition))`` (**Consumer**): Callback used to indicate success or failure
of commit requests.
of commit requests. This callback is served upon calling ``consumer.poll()``.

* ``logger=logging.Handler`` kwarg: forward logs from the Kafka client to the
provided ``logging.Handler`` instance.
To avoid spontaneous calls from non-Python threads the log messages
will only be forwarded when ``client.poll()`` is called.
will only be forwarded when ``client.poll()`` or ``producer.flush()`` are called.

mylogger = logging.getLogger()
mylogger.addHandler(logging.StreamHandler())
Expand Down
79 changes: 77 additions & 2 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
# global variable to be set by stats_cb call back function
good_stats_cb_result = False

# global variable to be incremented by throttle_cb call back function
throttled_requests = 0

# Shared between producer and consumer tests and used to verify
# that consumed headers are what was actually produced.
produce_headers = [('foo1', 'bar'),
Expand All @@ -85,6 +88,18 @@ def error_cb(err):
print('Error: %s' % err)


def throttle_cb(throttle_event):
# validate argument type
assert isinstance(throttle_event.broker_name, str)
assert isinstance(throttle_event.broker_id, int)
assert isinstance(throttle_event.throttle_time, float)

global throttled_requests
throttled_requests += 1

print(throttle_event)


class InMemorySchemaRegistry(object):

schemas = {}
Expand Down Expand Up @@ -842,6 +857,59 @@ def my_on_revoke(consumer, partitions):
c.close()


def verify_throttle_cb():
""" Verify throttle_cb is invoked
This test requires client quotas be configured.
See tests/README.md for more information
"""
conf = {'bootstrap.servers': bootstrap_servers,
'api.version.request': api_version_request,
'linger.ms': 500,
'client.id': 'throttled_client',
'throttle_cb': throttle_cb}

p = confluent_kafka.Producer(conf)

msgcnt = 1000
msgsize = 100
msg_pattern = 'test.py throttled client'
msg_payload = (msg_pattern * int(msgsize / len(msg_pattern)))[0:msgsize]

msgs_produced = 0
msgs_backpressure = 0
print('# producing %d messages to topic %s' % (msgcnt, topic))

if with_progress:
bar = Bar('Producing', max=msgcnt)
else:
bar = None

for i in range(0, msgcnt):
while True:
try:
p.produce(topic, value=msg_payload)
break
except BufferError:
# Local queue is full (slow broker connection?)
msgs_backpressure += 1
if bar is not None and (msgs_backpressure % 1000) == 0:
bar.next(n=0)
p.poll(100)
continue

if bar is not None and (msgs_produced % 5000) == 0:
bar.next(n=5000)
msgs_produced += 1
p.poll(0)

if bar is not None:
bar.finish()

p.flush()
print('# %d of %d produce requests were throttled' % (throttled_requests, msgs_produced))
assert throttled_requests >= 1


def verify_stats_cb():
""" Verify stats_cb """

Expand Down Expand Up @@ -1053,7 +1121,9 @@ def verify_config(expconfig, configs):
print("Topic {} marked for deletion".format(our_topic))


all_modes = ['consumer', 'producer', 'avro', 'performance', 'admin', 'none']
# Exclude throttle since from default list
default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin']
all_modes = default_modes + ['throttle', 'none']
"""All test modes"""


Expand Down Expand Up @@ -1094,7 +1164,7 @@ def print_usage(exitcode, reason=None):
print_usage(1)

if len(modes) == 0:
modes = all_modes
modes = default_modes

print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
print('Using librdkafka version %s (0x%x)' % confluent_kafka.libversion())
Expand Down Expand Up @@ -1132,6 +1202,11 @@ def print_usage(exitcode, reason=None):
print('=' * 30, 'Verifying stats_cb', '=' * 30)
verify_stats_cb()

# The throttle test is utilizing the producer.
if 'throttle' in modes:
print('=' * 30, 'Verifying throttle_cb', '=' * 30)
verify_throttle_cb()

if 'avro' in modes:
print('=' * 30, 'Verifying AVRO', '=' * 30)
verify_avro()
Expand Down
45 changes: 45 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,48 @@ From top-level directory run:
$ tox

**NOTE**: This requires `tox` ( please install with `pip install tox` ) and several supported versions of Python.

Integration tests
=================

**WARNING**: These tests require an active Kafka cluster and will create new topics.


To run all of the integration test `modes` uncomment the following line from `tox.ini` and add the addresses to your Kafka and Confluent Schema Registry instances.

#python examples/integration_test.py <bootstrap-servers> confluent-kafka-testing [<schema-registry-url>]

You can also run the integration tests outside of `tox` by running this command from the source root directory

examples/integration_test.py <kafka-broker> [<test-topic>] [<schema-registry>]

To run individual integration test `modes` use the following syntax

examples/integration_test.py --<test mode> <kafka-broker> [<test-topic>] [<schema-registry>]

For example:

examples/integration_test.py --producer <kafka-broker> [<test-topic>]

To get a list of modes you can run the integration test manually with the `--help` flag

examples/integration_tests.py --help


The throttle_cb integration test requires an additional step and as such is not included in the default test modes.
In order to execute the throttle_cb test you must first set a throttle for the client 'throttled_client' with the command below:

kafka-configs --zookeeper <zookeeper host>:<zookeeper port> \
--alter --add-config 'request_percentage=01' \
--entity-name throttled_client --entity-type clients

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curio, is it possible to do this with alterConfigs now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly, worth a try I suppose. I'll push the other review changes first then revisit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the throttle has been set you can proceed with the following command:

examples/integration_test.py --throttle <kafka-broker> [<test-topic>]


To remove the throttle you can execute the following

kafka-configs --zookeeper <zookeeper host>:<zookeeper port> \
--alter --delete-config 'request_percentage' \
--entity-name throttled_client --entity-type clients
Loading