-
Notifications
You must be signed in to change notification settings - Fork 915
first attempt at batch consumption of messages #282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9089894
fca622d
1aaf4fe
3867ea0
e091bab
d38691b
eb88d46
9a5444b
c212230
64a325a
316f5ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,10 @@ static int Consumer_clear (Handle *self) { | |
Py_DECREF(self->u.Consumer.on_commit); | ||
self->u.Consumer.on_commit = NULL; | ||
} | ||
if (self->u.Consumer.rkqu) { | ||
rd_kafka_queue_destroy(self->u.Consumer.rkqu); | ||
self->u.Consumer.rkqu = NULL; | ||
} | ||
|
||
Handle_clear(self); | ||
|
||
|
@@ -424,9 +428,9 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args, | |
} | ||
|
||
if (async) { | ||
/* Async mode: Use consumer queue for offset commit callback, | ||
/* Async mode: Use consumer queue for offset commit | ||
* served by consumer_poll() */ | ||
rkqu = rd_kafka_queue_get_consumer(self->rk); | ||
rkqu = self->u.Consumer.rkqu; | ||
|
||
} else { | ||
/* Sync mode: Let commit_queue() trigger the callback. */ | ||
|
@@ -446,11 +450,7 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args, | |
if (c_offsets) | ||
rd_kafka_topic_partition_list_destroy(c_offsets); | ||
|
||
if (async) { | ||
/* Loose reference to consumer queue */ | ||
rd_kafka_queue_destroy(rkqu); | ||
|
||
} else { | ||
if (!async) { | ||
/* Re-lock GIL */ | ||
PyEval_RestoreThread(thread_state); | ||
|
||
|
@@ -743,6 +743,71 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args, | |
} | ||
|
||
|
||
static PyObject *Consumer_consume (Handle *self, PyObject *args, | ||
PyObject *kwargs) { | ||
unsigned int num_messages = 1; | ||
double tmout = -1.0f; | ||
static char *kws[] = { "num_messages", "timeout", NULL }; | ||
rd_kafka_message_t **rkmessages; | ||
PyObject *msglist; | ||
rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu; | ||
CallState cs; | ||
Py_ssize_t i; | ||
|
||
if (!self->rk) { | ||
PyErr_SetString(PyExc_RuntimeError, | ||
"Consumer closed"); | ||
return NULL; | ||
} | ||
|
||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Id", kws, | ||
&num_messages, &tmout)) | ||
return NULL; | ||
|
||
if (num_messages > 1000000) { | ||
PyErr_SetString(PyExc_ValueError, | ||
"num_messages must be between 0 and 1000000 (1M)"); | ||
return NULL; | ||
} | ||
|
||
CallState_begin(self, &cs); | ||
|
||
rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *)); | ||
|
||
Py_ssize_t n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, | ||
tmout >= 0 ? (int)(tmout * 1000.0f) : -1, | ||
rkmessages, | ||
num_messages); | ||
|
||
if (!CallState_end(self, &cs)) { | ||
for (i = 0; i < n; i++) { | ||
rd_kafka_message_destroy(rkmessages[i]); | ||
} | ||
free(rkmessages); | ||
return NULL; | ||
} | ||
|
||
if (n < 0) { | ||
free(rkmessages); | ||
cfl_PyErr_Format(rd_kafka_last_error(), | ||
"%s", rd_kafka_err2str(rd_kafka_last_error())); | ||
return NULL; | ||
} | ||
|
||
msglist = PyList_New(n); | ||
|
||
for (i = 0; i < n; i++) { | ||
PyObject *msgobj = Message_new0(self, rkmessages[i]); | ||
PyList_SET_ITEM(msglist, i, msgobj); | ||
rd_kafka_message_destroy(rkmessages[i]); | ||
} | ||
|
||
free(rkmessages); | ||
|
||
return msglist; | ||
} | ||
|
||
|
||
static PyObject *Consumer_close (Handle *self, PyObject *ignore) { | ||
CallState cs; | ||
|
||
|
@@ -756,6 +821,11 @@ static PyObject *Consumer_close (Handle *self, PyObject *ignore) { | |
|
||
rd_kafka_consumer_close(self->rk); | ||
|
||
if (self->u.Consumer.rkqu) { | ||
rd_kafka_queue_destroy(self->u.Consumer.rkqu); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indent looks weird. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will fix. I noticed that there are a bunch of tabs in the code currently, and I gather from the majority of the code and the librdkafka style guide that spaces are preferred - I can clean those up as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the tabs are part of the dark past. |
||
self->u.Consumer.rkqu = NULL; | ||
} | ||
|
||
rd_kafka_destroy(self->rk); | ||
self->rk = NULL; | ||
|
||
|
@@ -825,6 +895,30 @@ static PyMethodDef Consumer_methods[] = { | |
" :raises: RuntimeError if called on a closed consumer\n" | ||
"\n" | ||
}, | ||
{ "consume", (PyCFunction)Consumer_consume, | ||
METH_VARARGS|METH_KEYWORDS, | ||
".. py:function:: consume([num_messages=1], [timeout=-1])\n" | ||
"\n" | ||
" Consume messages, calls callbacks and returns list of messages " | ||
"(possibly empty on timeout).\n" | ||
"\n" | ||
" The application must check the returned :py:class:`Message` " | ||
"object's :py:func:`Message.error()` method to distinguish " | ||
"between proper messages (error() returns None), or an event or " | ||
"error for each :py:class:`Message` in the list (see error().code() " | ||
"for specifics).\n" | ||
"\n" | ||
" .. note: Callbacks may be called from this method, " | ||
"such as ``on_assign``, ``on_revoke``, et.al.\n" | ||
"\n" | ||
" :param int num_messages: Maximum number of messages to return (default: 1).\n" | ||
" :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)).\n" | ||
" :returns: A list of Message objects (possibly empty on timeout)\n" | ||
" :rtype: list(Message)\n" | ||
" :raises: RuntimeError if called on a closed consumer, KafkaError " | ||
"in case of internal error, or ValueError if num_messages > 1M.\n" | ||
"\n" | ||
}, | ||
{ "assign", (PyCFunction)Consumer_assign, METH_O, | ||
".. py:function:: assign(partitions)\n" | ||
"\n" | ||
|
@@ -1053,6 +1147,8 @@ static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) { | |
|
||
rd_kafka_poll_set_consumer(self->rk); | ||
|
||
self->u.Consumer.rkqu = rd_kafka_queue_get_consumer(self->rk); | ||
|
||
return 0; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -565,6 +565,162 @@ def my_on_revoke(consumer, partitions): | |
c.close() | ||
|
||
|
||
def verify_batch_consumer(): | ||
""" Verify basic batch Consumer functionality """ | ||
|
||
# Consumer config | ||
conf = {'bootstrap.servers': bootstrap_servers, | ||
'group.id': 'test.py', | ||
'session.timeout.ms': 6000, | ||
'enable.auto.commit': False, | ||
'api.version.request': api_version_request, | ||
'on_commit': print_commit_result, | ||
'error_cb': error_cb, | ||
'default.topic.config': { | ||
'auto.offset.reset': 'earliest' | ||
}} | ||
|
||
# Create consumer | ||
c = confluent_kafka.Consumer(**conf) | ||
|
||
# Subscribe to a list of topics | ||
c.subscribe([topic]) | ||
|
||
max_msgcnt = 1000 | ||
batch_cnt = 100 | ||
msgcnt = 0 | ||
|
||
while msgcnt < max_msgcnt: | ||
# Consume until we hit max_msgcnt | ||
|
||
# Consume messages (error()==0) or event (error()!=0) | ||
msglist = c.consume(batch_cnt, 10.0) | ||
assert len(msglist) == batch_cnt | ||
|
||
for msg in msglist: | ||
if msg.error(): | ||
print('Consumer error: %s: ignoring' % msg.error()) | ||
continue | ||
|
||
tstype, timestamp = msg.timestamp() | ||
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % | ||
(msg.topic(), msg.partition(), msg.offset(), | ||
msg.key(), msg.value(), tstype, timestamp)) | ||
|
||
if (msg.offset() % 5) == 0: | ||
# Async commit | ||
c.commit(msg, async=True) | ||
elif (msg.offset() % 4) == 0: | ||
offsets = c.commit(msg, async=False) | ||
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets) | ||
assert offsets[0].offset == msg.offset()+1, \ | ||
'expected offset %d to be committed, not %s' % \ | ||
(msg.offset(), offsets) | ||
print('Sync committed offset: %s' % offsets) | ||
|
||
msgcnt += 1 | ||
|
||
print('max_msgcnt %d reached' % msgcnt) | ||
|
||
# Get current assignment | ||
assignment = c.assignment() | ||
|
||
# Get cached watermark offsets | ||
# Since we're not making use of statistics the low offset is not known so ignore it. | ||
lo, hi = c.get_watermark_offsets(assignment[0], cached=True) | ||
print('Cached offsets for %s: %d - %d' % (assignment[0], lo, hi)) | ||
|
||
# Query broker for offsets | ||
lo, hi = c.get_watermark_offsets(assignment[0], timeout=1.0) | ||
print('Queried offsets for %s: %d - %d' % (assignment[0], lo, hi)) | ||
|
||
# Close consumer | ||
c.close() | ||
|
||
# Start a new client and get the committed offsets | ||
c = confluent_kafka.Consumer(**conf) | ||
offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0, 3)))) | ||
for tp in offsets: | ||
print(tp) | ||
|
||
c.close() | ||
|
||
|
||
def verify_batch_consumer_performance(): | ||
""" Verify batch Consumer performance """ | ||
|
||
conf = {'bootstrap.servers': bootstrap_servers, | ||
'group.id': uuid.uuid1(), | ||
'session.timeout.ms': 6000, | ||
'error_cb': error_cb, | ||
'default.topic.config': { | ||
'auto.offset.reset': 'earliest' | ||
}} | ||
|
||
c = confluent_kafka.Consumer(**conf) | ||
|
||
def my_on_assign(consumer, partitions): | ||
print('on_assign:', len(partitions), 'partitions:') | ||
for p in partitions: | ||
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset)) | ||
consumer.assign(partitions) | ||
|
||
def my_on_revoke(consumer, partitions): | ||
print('on_revoke:', len(partitions), 'partitions:') | ||
for p in partitions: | ||
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset)) | ||
consumer.unassign() | ||
|
||
c.subscribe([topic], on_assign=my_on_assign, on_revoke=my_on_revoke) | ||
|
||
max_msgcnt = 1000000 | ||
bytecnt = 0 | ||
msgcnt = 0 | ||
batch_size = 1000 | ||
|
||
print('Will now consume %d messages' % max_msgcnt) | ||
|
||
if with_progress: | ||
bar = Bar('Consuming', max=max_msgcnt, | ||
suffix='%(index)d/%(max)d [%(eta_td)s]') | ||
else: | ||
bar = None | ||
|
||
while msgcnt < max_msgcnt: | ||
# Consume until we hit max_msgcnt | ||
|
||
msglist = c.consume(num_messages=batch_size, timeout=20.0) | ||
|
||
for msg in msglist: | ||
if msg.error(): | ||
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF: | ||
# Reached EOF for a partition, ignore. | ||
continue | ||
else: | ||
raise confluent_kafka.KafkaException(msg.error()) | ||
|
||
bytecnt += len(msg) | ||
msgcnt += 1 | ||
|
||
if bar is not None and (msgcnt % 10000) == 0: | ||
bar.next(n=10000) | ||
|
||
if msgcnt == 1: | ||
t_first_msg = time.time() | ||
|
||
if bar is not None: | ||
bar.finish() | ||
|
||
if msgcnt > 0: | ||
t_spent = time.time() - t_first_msg | ||
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what thruput are you getting with the batch interface, compared to the standard one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having trouble running the integration test, but running a quick test I wrote on an actual Kafka stream I got:
So definitely an improvement, though I found I had to tune the batch size to get the best results, e.g. 1k messages performs better than 10k messages or 100 messages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. damn! |
||
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent, | ||
(bytecnt / t_spent) / (1024*1024))) | ||
|
||
print('closing consumer') | ||
c.close() | ||
|
||
|
||
def verify_stats_cb(): | ||
""" Verify stats_cb """ | ||
|
||
|
@@ -663,6 +819,9 @@ def stats_cb(stats_json_str): | |
print('=' * 30, 'Verifying Consumer', '=' * 30) | ||
verify_consumer() | ||
|
||
print('=' * 30, 'Verifying batch Consumer', '=' * 30) | ||
verify_batch_consumer() | ||
|
||
print('=' * 30, 'Verifying Producer performance (with dr_cb)', '=' * 30) | ||
verify_producer_performance(with_dr_cb=True) | ||
|
||
|
@@ -672,6 +831,9 @@ def stats_cb(stats_json_str): | |
print('=' * 30, 'Verifying Consumer performance', '=' * 30) | ||
verify_consumer_performance() | ||
|
||
print('=' * 30, 'Verifying batch Consumer performance', '=' * 30) | ||
verify_batch_consumer_performance() | ||
|
||
print('=' * 30, 'Verifying stats_cb', '=' * 30) | ||
verify_stats_cb() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should raise exception from rd_kafka_last_error() here, something like:
cfl_PyErr_Format(rd_kafka_last_error(), "%s", rd_kafka_err2str(rd_kafka_last_error());
and return NULL (to raise the exception)