Skip to content

Commit d788937

Browse files
authored
Merge pull request #245 from ctrochalakis/store_offsets
Added Consumer.store_offsets() API
2 parents 4549c1d + 3602487 commit d788937

File tree

2 files changed

+114
-0
lines changed

2 files changed

+114
-0
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,82 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
325325

326326

327327

328+
static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
329+
PyObject *kwargs) {
330+
331+
#if RD_KAFKA_VERSION < 0x000b0000
332+
PyErr_Format(PyExc_NotImplementedError,
333+
"Consumer store_offsets require "
334+
"confluent-kafka-python built for librdkafka "
335+
"version >=v0.11.0 (librdkafka runtime 0x%x, "
336+
"buildtime 0x%x)",
337+
rd_kafka_version(), RD_KAFKA_VERSION);
338+
return NULL;
339+
#else
340+
rd_kafka_resp_err_t err;
341+
PyObject *msg = NULL, *offsets = NULL;
342+
rd_kafka_topic_partition_list_t *c_offsets;
343+
static char *kws[] = { "message", "offsets", NULL };
344+
345+
346+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO", kws,
347+
&msg, &offsets))
348+
return NULL;
349+
350+
if (msg && offsets) {
351+
PyErr_SetString(PyExc_ValueError,
352+
"message and offsets are mutually exclusive");
353+
return NULL;
354+
}
355+
356+
if (!msg && !offsets) {
357+
PyErr_SetString(PyExc_ValueError,
358+
"expected either message or offsets");
359+
return NULL;
360+
}
361+
362+
if (offsets) {
363+
364+
if (!(c_offsets = py_to_c_parts(offsets)))
365+
return NULL;
366+
} else {
367+
Message *m;
368+
PyObject *uo8;
369+
370+
if (PyObject_Type((PyObject *)msg) !=
371+
(PyObject *)&MessageType) {
372+
PyErr_Format(PyExc_TypeError,
373+
"expected %s", MessageType.tp_name);
374+
return NULL;
375+
}
376+
377+
m = (Message *)msg;
378+
379+
c_offsets = rd_kafka_topic_partition_list_new(1);
380+
rd_kafka_topic_partition_list_add(
381+
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
382+
m->partition)->offset = m->offset + 1;
383+
Py_XDECREF(uo8);
384+
}
385+
386+
387+
err = rd_kafka_offsets_store(self->rk, c_offsets);
388+
rd_kafka_topic_partition_list_destroy(c_offsets);
389+
390+
391+
392+
if (err) {
393+
cfl_PyErr_Format(err,
394+
"StoreOffsets failed: %s", rd_kafka_err2str(err));
395+
return NULL;
396+
}
397+
398+
Py_RETURN_NONE;
399+
#endif
400+
}
401+
402+
403+
328404
static PyObject *Consumer_committed (Handle *self, PyObject *args,
329405
PyObject *kwargs) {
330406

@@ -570,6 +646,22 @@ static PyMethodDef Consumer_methods[] = {
570646
" :raises: KafkaException\n"
571647
"\n"
572648
},
649+
{ "store_offsets", (PyCFunction)Consumer_store_offsets, METH_VARARGS|METH_KEYWORDS,
650+
".. py:function:: store_offsets([message=None], [offsets=None])\n"
651+
"\n"
652+
" Store offsets for a message or a list of offsets.\n"
653+
"\n"
654+
" ``message`` and ``offsets`` are mutually exclusive. "
655+
"The stored offsets will be committed according to 'auto.commit.interval.ms' or manual "
656+
"offset-less :py:meth:`commit`. "
657+
"Note that 'enable.auto.offset.store' must be set to False when using this API.\n"
658+
"\n"
659+
" :param confluent_kafka.Message message: Store message's offset+1.\n"
660+
" :param list(TopicPartition) offsets: List of topic+partitions+offsets to store.\n"
661+
" :rtype: None\n"
662+
" :raises: KafkaException\n"
663+
"\n"
664+
},
573665
{ "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS,
574666
".. py:function:: commit([message=None], [offsets=None], [async=True])\n"
575667
"\n"

tests/test_Consumer.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,28 @@ def dummy_assign_revoke(consumer, partitions):
8181
kc.close()
8282

8383

84+
@pytest.mark.skipif(libversion()[1] < 0x000b0000,
85+
reason="requires librdkafka >=0.11.0")
86+
def test_store_offsets():
87+
""" Basic store_offsets() tests """
88+
89+
c = Consumer({'group.id': 'test',
90+
'enable.auto.commit': True,
91+
'enable.auto.offset.store': False,
92+
'socket.timeout.ms': 50,
93+
'session.timeout.ms': 100})
94+
95+
c.subscribe(["test"])
96+
97+
try:
98+
c.store_offsets(offsets=[TopicPartition("test", 0, 42)])
99+
except KafkaException as e:
100+
assert e.args[0].code() == KafkaError._UNKNOWN_PARTITION
101+
102+
c.unsubscribe()
103+
c.close()
104+
105+
84106
# librdkafka <=0.9.2 has a race-issue where it will hang indefinately
85107
# if a commit is issued when no coordinator is available.
86108
@pytest.mark.skipif(libversion()[1] <= 0x000902ff,

0 commit comments

Comments
 (0)