Skip to content

first attempt at batch consumption of messages #282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 103 additions & 7 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ static int Consumer_clear (Handle *self) {
Py_DECREF(self->u.Consumer.on_commit);
self->u.Consumer.on_commit = NULL;
}
if (self->u.Consumer.rkqu) {
rd_kafka_queue_destroy(self->u.Consumer.rkqu);
self->u.Consumer.rkqu = NULL;
}

Handle_clear(self);

Expand Down Expand Up @@ -424,9 +428,9 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
}

if (async) {
/* Async mode: Use consumer queue for offset commit callback,
/* Async mode: Use consumer queue for offset commit
* served by consumer_poll() */
rkqu = rd_kafka_queue_get_consumer(self->rk);
rkqu = self->u.Consumer.rkqu;

} else {
/* Sync mode: Let commit_queue() trigger the callback. */
Expand All @@ -446,11 +450,7 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
if (c_offsets)
rd_kafka_topic_partition_list_destroy(c_offsets);

if (async) {
/* Loose reference to consumer queue */
rd_kafka_queue_destroy(rkqu);

} else {
if (!async) {
/* Re-lock GIL */
PyEval_RestoreThread(thread_state);

Expand Down Expand Up @@ -743,6 +743,71 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
}


static PyObject *Consumer_consume (Handle *self, PyObject *args,
PyObject *kwargs) {
unsigned int num_messages = 1;
double tmout = -1.0f;
static char *kws[] = { "num_messages", "timeout", NULL };
rd_kafka_message_t **rkmessages;
PyObject *msglist;
rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu;
CallState cs;
Py_ssize_t i;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Id", kws,
&num_messages, &tmout))
return NULL;

if (num_messages > 1000000) {
PyErr_SetString(PyExc_ValueError,
"num_messages must be between 0 and 1000000 (1M)");
return NULL;
}

CallState_begin(self, &cs);

rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *));

Py_ssize_t n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
tmout >= 0 ? (int)(tmout * 1000.0f) : -1,
rkmessages,
num_messages);

if (!CallState_end(self, &cs)) {
for (i = 0; i < n; i++) {
rd_kafka_message_destroy(rkmessages[i]);
}
free(rkmessages);
return NULL;
}

if (n < 0) {
free(rkmessages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should raise exception from rd_kafka_last_error() here, something like:
cfl_PyErr_Format(rd_kafka_last_error(), "%s", rd_kafka_err2str(rd_kafka_last_error());

and return NULL (to raise the exception)

cfl_PyErr_Format(rd_kafka_last_error(),
"%s", rd_kafka_err2str(rd_kafka_last_error()));
return NULL;
}

msglist = PyList_New(n);

for (i = 0; i < n; i++) {
PyObject *msgobj = Message_new0(self, rkmessages[i]);
PyList_SET_ITEM(msglist, i, msgobj);
rd_kafka_message_destroy(rkmessages[i]);
}

free(rkmessages);

return msglist;
}


static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
CallState cs;

Expand All @@ -756,6 +821,11 @@ static PyObject *Consumer_close (Handle *self, PyObject *ignore) {

rd_kafka_consumer_close(self->rk);

if (self->u.Consumer.rkqu) {
rd_kafka_queue_destroy(self->u.Consumer.rkqu);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent looks weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix. I noticed that there are a bunch of tabs in the code currently, and I gather from the majority of the code and the librdkafka style guide that spaces are preferred - I can clean those up as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the tabs are part of the dark past.
But you don't need to change whitespace of existing code, we'll do that later in one big go across the code base as we also instate coding guidelines. But thanks anyway!

self->u.Consumer.rkqu = NULL;
}

rd_kafka_destroy(self->rk);
self->rk = NULL;

Expand Down Expand Up @@ -825,6 +895,30 @@ static PyMethodDef Consumer_methods[] = {
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "consume", (PyCFunction)Consumer_consume,
METH_VARARGS|METH_KEYWORDS,
".. py:function:: consume([num_messages=1], [timeout=-1])\n"
"\n"
" Consume messages, calls callbacks and returns list of messages "
"(possibly empty on timeout).\n"
"\n"
" The application must check the returned :py:class:`Message` "
"object's :py:func:`Message.error()` method to distinguish "
"between proper messages (error() returns None), or an event or "
"error for each :py:class:`Message` in the list (see error().code() "
"for specifics).\n"
"\n"
" .. note: Callbacks may be called from this method, "
"such as ``on_assign``, ``on_revoke``, et.al.\n"
"\n"
" :param int num_messages: Maximum number of messages to return (default: 1).\n"
" :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)).\n"
" :returns: A list of Message objects (possibly empty on timeout)\n"
" :rtype: list(Message)\n"
" :raises: RuntimeError if called on a closed consumer, KafkaError "
"in case of internal error, or ValueError if num_messages > 1M.\n"
"\n"
},
{ "assign", (PyCFunction)Consumer_assign, METH_O,
".. py:function:: assign(partitions)\n"
"\n"
Expand Down Expand Up @@ -1053,6 +1147,8 @@ static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {

rd_kafka_poll_set_consumer(self->rk);

self->u.Consumer.rkqu = rd_kafka_queue_get_consumer(self->rk);

return 0;
}

Expand Down
1 change: 1 addition & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ typedef struct {
PyObject *on_assign; /* Rebalance: on_assign callback */
PyObject *on_revoke; /* Rebalance: on_revoke callback */
PyObject *on_commit; /* Commit callback */
rd_kafka_queue_t *rkqu; /* Consumer queue */

} Consumer;
} u;
Expand Down
162 changes: 162 additions & 0 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,162 @@ def my_on_revoke(consumer, partitions):
c.close()


def verify_batch_consumer():
""" Verify basic batch Consumer functionality """

# Consumer config
conf = {'bootstrap.servers': bootstrap_servers,
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'api.version.request': api_version_request,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}

# Create consumer
c = confluent_kafka.Consumer(**conf)

# Subscribe to a list of topics
c.subscribe([topic])

max_msgcnt = 1000
batch_cnt = 100
msgcnt = 0

while msgcnt < max_msgcnt:
# Consume until we hit max_msgcnt

# Consume messages (error()==0) or event (error()!=0)
msglist = c.consume(batch_cnt, 10.0)
assert len(msglist) == batch_cnt

for msg in msglist:
if msg.error():
print('Consumer error: %s: ignoring' % msg.error())
continue

tstype, timestamp = msg.timestamp()
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' %
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value(), tstype, timestamp))

if (msg.offset() % 5) == 0:
# Async commit
c.commit(msg, async=True)
elif (msg.offset() % 4) == 0:
offsets = c.commit(msg, async=False)
assert len(offsets) == 1, 'expected 1 offset, not %s' % (offsets)
assert offsets[0].offset == msg.offset()+1, \
'expected offset %d to be committed, not %s' % \
(msg.offset(), offsets)
print('Sync committed offset: %s' % offsets)

msgcnt += 1

print('max_msgcnt %d reached' % msgcnt)

# Get current assignment
assignment = c.assignment()

# Get cached watermark offsets
# Since we're not making use of statistics the low offset is not known so ignore it.
lo, hi = c.get_watermark_offsets(assignment[0], cached=True)
print('Cached offsets for %s: %d - %d' % (assignment[0], lo, hi))

# Query broker for offsets
lo, hi = c.get_watermark_offsets(assignment[0], timeout=1.0)
print('Queried offsets for %s: %d - %d' % (assignment[0], lo, hi))

# Close consumer
c.close()

# Start a new client and get the committed offsets
c = confluent_kafka.Consumer(**conf)
offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0, 3))))
for tp in offsets:
print(tp)

c.close()


def verify_batch_consumer_performance():
""" Verify batch Consumer performance """

conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'error_cb': error_cb,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}

c = confluent_kafka.Consumer(**conf)

def my_on_assign(consumer, partitions):
print('on_assign:', len(partitions), 'partitions:')
for p in partitions:
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
consumer.assign(partitions)

def my_on_revoke(consumer, partitions):
print('on_revoke:', len(partitions), 'partitions:')
for p in partitions:
print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset))
consumer.unassign()

c.subscribe([topic], on_assign=my_on_assign, on_revoke=my_on_revoke)

max_msgcnt = 1000000
bytecnt = 0
msgcnt = 0
batch_size = 1000

print('Will now consume %d messages' % max_msgcnt)

if with_progress:
bar = Bar('Consuming', max=max_msgcnt,
suffix='%(index)d/%(max)d [%(eta_td)s]')
else:
bar = None

while msgcnt < max_msgcnt:
# Consume until we hit max_msgcnt

msglist = c.consume(num_messages=batch_size, timeout=20.0)

for msg in msglist:
if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
# Reached EOF for a partition, ignore.
continue
else:
raise confluent_kafka.KafkaException(msg.error())

bytecnt += len(msg)
msgcnt += 1

if bar is not None and (msgcnt % 10000) == 0:
bar.next(n=10000)

if msgcnt == 1:
t_first_msg = time.time()

if bar is not None:
bar.finish()

if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what thruput are you getting with the batch interface, compared to the standard one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having trouble running the integration test, but running a quick test I wrote on an actual Kafka stream I got:

consume took 1.33 seconds
1000000 messages, 662.56 MB, 497.54 MB/sec

poll tool 2.17 seconds
1000000 messages, 662.62 MB, 304.57 MB/sec

So definitely an improvement, though I found I had to tune the batch size to get the best results, e.g. 1k messages performs better than 10k messages or 100 messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

damn!

(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))

print('closing consumer')
c.close()


def verify_stats_cb():
""" Verify stats_cb """

Expand Down Expand Up @@ -663,6 +819,9 @@ def stats_cb(stats_json_str):
print('=' * 30, 'Verifying Consumer', '=' * 30)
verify_consumer()

print('=' * 30, 'Verifying batch Consumer', '=' * 30)
verify_batch_consumer()

print('=' * 30, 'Verifying Producer performance (with dr_cb)', '=' * 30)
verify_producer_performance(with_dr_cb=True)

Expand All @@ -672,6 +831,9 @@ def stats_cb(stats_json_str):
print('=' * 30, 'Verifying Consumer performance', '=' * 30)
verify_consumer_performance()

print('=' * 30, 'Verifying batch Consumer performance', '=' * 30)
verify_batch_consumer_performance()

print('=' * 30, 'Verifying stats_cb', '=' * 30)
verify_stats_cb()

Expand Down
15 changes: 15 additions & 0 deletions tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ def dummy_assign_revoke(consumer, partitions):
if msg is not None:
assert msg.timestamp() == (TIMESTAMP_NOT_AVAILABLE, -1)

msglist = kc.consume(num_messages=10, timeout=0.001)
assert len(msglist) == 0, "expected 0 messages, not %d" % len(msglist)

with pytest.raises(ValueError) as ex:
kc.consume(-100)
assert 'num_messages must be between 0 and 1000000 (1M)' == str(ex.value)

with pytest.raises(ValueError) as ex:
kc.consume(1000001)
assert 'num_messages must be between 0 and 1000000 (1M)' == str(ex.value)

partitions = list(map(lambda part: TopicPartition("test", part), range(0, 100, 3)))
kc.assign(partitions)

Expand Down Expand Up @@ -204,6 +215,10 @@ def test_any_method_after_close_throws_exception():
c.poll()
assert 'Consumer closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.consume()
assert 'Consumer closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.assign([TopicPartition('test', 0)])
assert 'Consumer closed' == str(ex.value)
Expand Down