-
Notifications
You must be signed in to change notification settings - Fork 915
first attempt at batch consumption of messages #282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
It looks like @tburmeister hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
confluent_kafka/src/Consumer.c
Outdated
@@ -15,6 +15,7 @@ | |||
*/ | |||
|
|||
#include "confluent_kafka.h" | |||
#include <stdio.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some debug printf
s in earlier, will remove this.
confluent_kafka/src/Consumer.c
Outdated
(Py_ssize_t *)&num_messages, &tmout)) | ||
return NULL; | ||
|
||
CallState_begin(self, &cs); // This unlocks GIL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment was for me; will remove.
confluent_kafka/src/Consumer.c
Outdated
|
||
CallState_begin(self, &cs); // This unlocks GIL | ||
|
||
rkqu = rd_kafka_queue_get_consumer(self->rk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think this returns NULL
if consumer was not properly initialized, which leads to a segfault further down; maybe just need to check this and exit if NULL
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should not happen, so no need to check
confluent_kafka/src/Consumer.c
Outdated
PyObject *kwargs) { | ||
size_t num_messages = 100; | ||
double tmout = -1.0f; | ||
static char *kws[] = { "timeout", "num_messages", NULL }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is backwards, should be { "num_messages", "timeout", NULL }
.
confluent_kafka/src/Consumer.c
Outdated
@@ -743,6 +743,67 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args, | |||
} | |||
|
|||
|
|||
static PyObject *Consumer_consume (Handle *self, PyObject *args, | |||
PyObject *kwargs) { | |||
size_t num_messages = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 seems like a better default.
confluent_kafka/src/Consumer.c
Outdated
|
||
CallState_begin(self, &cs); | ||
|
||
rkqu = rd_kafka_queue_get_consumer(self->rk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional future improvement: cache this on self
confluent_kafka/src/Consumer.c
Outdated
return NULL; | ||
} | ||
|
||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|nd", kws, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use I
for num_messages (unsigned int
) to avoid checking for < 0.
} | ||
|
||
if (n < 0) { | ||
free(rkmessages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should raise exception from rd_kafka_last_error() here, something like:
cfl_PyErr_Format(rd_kafka_last_error(), "%s", rd_kafka_err2str(rd_kafka_last_error());
and return NULL (to raise the exception)
confluent_kafka/src/Consumer.c
Outdated
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|nd", kws, | ||
(Py_ssize_t *)&num_messages, &tmout)) | ||
return NULL; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want to make sure num_messages is not completely out of bounds since we're using the value directly to allocate memory.
If it is higher than 1M messages, raise an ValueError with a proper error message
confluent_kafka/src/Consumer.c
Outdated
Py_ssize_t n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, | ||
tmout >= 0 ? (int)(tmout * 1000.0f) : -1, | ||
rkmessages, | ||
num_messages > 0 ? (size_t)num_messages : 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can drop this check if using unsigned int
confluent_kafka/src/Consumer.c
Outdated
" .. note: Callbacks may be called from this method, " | ||
"such as ``on_assign``, ``on_revoke``, et.al.\n" | ||
"\n" | ||
" :param int num_messages: Maximum number of messages to return.\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(default: 1)
confluent_kafka/src/Consumer.c
Outdated
"\n" | ||
" :param int num_messages: Maximum number of messages to return.\n" | ||
" :param float timeout: Maximum time to block waiting for message, event or callback.\n" | ||
" :returns: A list of Message objects or None on timeout\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to use a different return type for timeout? An empty list seems easier to handle from the application's perspective.
confluent_kafka/src/Consumer.c
Outdated
"such as ``on_assign``, ``on_revoke``, et.al.\n" | ||
"\n" | ||
" :param int num_messages: Maximum number of messages to return.\n" | ||
" :param float timeout: Maximum time to block waiting for message, event or callback.\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(default: infinite (-1))
confluent_kafka/src/Consumer.c
Outdated
|
||
CallState_begin(self, &cs); // This unlocks GIL | ||
|
||
rkqu = rd_kafka_queue_get_consumer(self->rk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should not happen, so no need to check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff!
What needs to be done:
- minor edits from code review
- unit test in tests/test_Consumer.py (to verify the API)
- integration test in examples/integration_test.py (to verify it works in practice).
[clabot:check] |
@confluentinc It looks like @tburmeister just signed our Contributor License Agreement. 👍 Always at your service, clabot |
Made suggested changes. Will work on unit tests and integration tests next, and might take a stab at caching the queue. |
Great! For caching the queue, to avoid locking issues, I suggest extracting the queue in Consumer_init() right after poll_set_consumer() and destryoing the queue in Consumer_close() after the rd_kafka_consumer_close() call, and in Consumer_dealloc() prior to rd_kafka_destroy() (and outside the Callstate ) |
confluent_kafka/src/Consumer.c
Outdated
} | ||
|
||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Id", kws, | ||
(unsigned int *)&num_messages, &tmout)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not safe to cast to integer pointers of possibly other size.
Use unsigned int
for num_message's type directly to avoid the case.
confluent_kafka/src/Consumer.c
Outdated
" :param int num_messages: Maximum number of messages to return (default: 1).\n" | ||
" :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)).\n" | ||
" :returns: A list of Message objects\n" | ||
" :rtype: list(Message) or None\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not return None anymore.
confluent_kafka/src/Consumer.c
Outdated
"\n" | ||
" :param int num_messages: Maximum number of messages to return (default: 1).\n" | ||
" :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)).\n" | ||
" :returns: A list of Message objects\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(possibly empty on timeout)
confluent_kafka/src/Consumer.c
Outdated
" :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)).\n" | ||
" :returns: A list of Message objects\n" | ||
" :rtype: list(Message) or None\n" | ||
" :raises: RuntimeError if called on a closed consumer\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or KafkaError in case of internal error.
Missed your comment before pushing my last commit with caching, but that all makes sense. |
I added some tests, but I'm having trouble running the tests because synchronous commits crash on my machine; |
What librdkafka version? |
Version 0.11.0. Edit: upgraded to 0.11.3 but still segfaulting. |
Ack, do you have a gdb backtrace to share? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting closer! :)
examples/integration_test.py
Outdated
|
||
if msgcnt == 1: | ||
t_first_msg = time.time() | ||
if msgcnt >= max_msgcnt: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elif
examples/integration_test.py
Outdated
if msgcnt >= max_msgcnt: | ||
break | ||
|
||
if msgcnt >= max_msgcnt: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to the while ..
|
||
if msgcnt > 0: | ||
t_spent = time.time() - t_first_msg | ||
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what thruput are you getting with the batch interface, compared to the standard one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having trouble running the integration test, but running a quick test I wrote on an actual Kafka stream I got:
consume took 1.33 seconds
1000000 messages, 662.56 MB, 497.54 MB/sec
poll tool 2.17 seconds
1000000 messages, 662.62 MB, 304.57 MB/sec
So definitely an improvement, though I found I had to tune the batch size to get the best results, e.g. 1k messages performs better than 10k messages or 100 messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
damn!
examples/integration_test.py
Outdated
# Consume until EOF or error | ||
|
||
# Consume message (error()==0) or event (error()!=0) | ||
msglist = c.consume(max_msgcnt, 1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowing that there is at least N messages in the topic (thanks to the produce tests), and where N > max_msgcnt (which it should be), we should check that we actually get max_msgcnt back for each call until we've hit our expected message count (which is some arbitrary value <=N).
Relying on EOF is not safe since the topic may feature multiple partitions and we don't know what the message distribution is like.
tests/test_Consumer.py
Outdated
@@ -41,6 +41,17 @@ def dummy_assign_revoke(consumer, partitions): | |||
if msg is not None: | |||
assert msg.timestamp() == (TIMESTAMP_NOT_AVAILABLE, -1) | |||
|
|||
msglist = kc.consume(num_messages=10, timeout=0.001) | |||
if len(msglist) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aren't we expecting exactly 0 messages here? use assert len(msglist) == 0, "expected 0 messages, not %d" % len(msglist)
@@ -756,6 +815,11 @@ static PyObject *Consumer_close (Handle *self, PyObject *ignore) { | |||
|
|||
rd_kafka_consumer_close(self->rk); | |||
|
|||
if (self->u.Consumer.rkqu) { | |||
rd_kafka_queue_destroy(self->u.Consumer.rkqu); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent looks weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix. I noticed that there are a bunch of tabs in the code currently, and I gather from the majority of the code and the librdkafka style guide that spaces are preferred - I can clean those up as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the tabs are part of the dark past.
But you don't need to change whitespace of existing code, we'll do that later in one big go across the code base as we also instate coding guidelines. But thanks anyway!
tests/test_Consumer.py
Outdated
break | ||
|
||
print('OK: consumed messages') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a failing test for invalid number of num_messages (e.g., -100 and > 1M)
Struggling to get debug symbols working properly on my Mac, so the best I can give you right now is:
I do not have this issue on my Ubuntu dev machine. |
That is weird since there is assert just above that checks that commit_return.c_parts is not NULL: Can you add some printfs around that part to figure out what is going on? |
The assert seems to not actually be checked. I added:
at line 477, and get:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only nits left now!
examples/integration_test.py
Outdated
@@ -589,21 +589,17 @@ def verify_batch_consumer(): | |||
max_msgcnt = 100 | |||
msgcnt = 0 | |||
|
|||
while True: | |||
while msgcnt < max_msgcnt: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let max_msgcnt=1000 and batch_cnt=100
examples/integration_test.py
Outdated
@@ -715,12 +706,9 @@ def my_on_revoke(consumer, partitions): | |||
|
|||
if msgcnt == 1: | |||
t_first_msg = time.time() | |||
if msgcnt >= max_msgcnt: | |||
elif msgcnt >= max_msgcnt: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can get rid of this one now with the while loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The crash might be that no offsets were committed, it is most likely not related to your changes though so we can fix them separately, you can ignore it for now (you can change it to return None if c_parts is empty in your local code).
examples/integration_test.py
Outdated
# Consume message (error()==0) or event (error()!=0) | ||
msglist = c.consume(max_msgcnt, 1.0) | ||
# Consume messages (error()==0) or event (error()!=0) | ||
msglist = c.consume(batch_cnt, 1.0) | ||
assert len(msglist) == max_msgcnt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is failing, right?
the assert should check for batch_cnt, not max_msgcnt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good catch.
examples/integration_test.py
Outdated
# Consume message (error()==0) or event (error()!=0) | ||
msglist = c.consume(max_msgcnt, 1.0) | ||
# Consume messages (error()==0) or event (error()!=0) | ||
msglist = c.consume(batch_cnt, 1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the test more robust by setting timeout to >session.timeout.ms, such as 10
examples/integration_test.py
Outdated
|
||
# Consume messages (error()==0) or event (error()!=0) | ||
msglist = c.consume(batch_cnt, 1.0) | ||
assert len(msglist) == max_msgcnt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should compare to batch_cnt
examples/integration_test.py
Outdated
|
||
msgcnt += 1 | ||
|
||
print('max_msgcnt %d reached' % msgcnt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong indent, should be outside while loop.
|
||
if msgcnt > 0: | ||
t_spent = time.time() - t_first_msg | ||
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
damn!
Ohhhh I figured out why commits were crashing and it is my fault - will push a fix soon. |
I can now run the full integration test locally, without errors. I really should have confirmed the tests worked locally on master first. |
So what needs to be done to get this merged? Looking at the CI tests, it looks like it's passing the actual tests but failing because of pep8 violations, which I believe are addressed in #279 |
Huge thanks for doing this! 😍 |
Awesome! Thanks! This feature will be super useful to me. By the way, we are big librdkafka users at AppNexus. |
Thanks! This is very useful to us. Any timeline for when this becomes part of a pypi release? |
@isamaru We're aiming for a feature release end of january. |
@edenhill Maybe you can upload a pre-release version to PyPi so we can try it? Thanks! https://packaging.python.org/tutorials/distributing-packages/#pre-release-versioning |
Awesome work adding this feature! Any updates as to when the pypi release will happen? |
Final release in about 2 weeks, we'll have an rc out hopefully this week |
Is the batch consumer released yet? Any links to usage / any documentation etc are highly appreciated! Thanks! |
@aswinjoseroy Take a look at the |
This is a first attempt at adding batch consumption, a la #252
Have not done a ton of testing yet, but I am able to retrieve messages. Let me know what you think.