Skip to content

Expose offsets_for_times consumer method. closes #224 #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,56 @@ static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args,
}


static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args,
PyObject *kwargs) {
#if RD_KAFKA_VERSION < 0x000b0000
PyErr_Format(PyExc_NotImplementedError,
"Consumer offsets_for_times require "
"confluent-kafka-python built for librdkafka "
"version >=v0.11.0 (librdkafka runtime 0x%x, "
"buildtime 0x%x)",
rd_kafka_version(), RD_KAFKA_VERSION);
return NULL;
#else

PyObject *plist;
double tmout = -1.0f;
rd_kafka_topic_partition_list_t *c_parts;
rd_kafka_resp_err_t err;
static char *kws[] = { "partitions", "timeout", NULL };

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws,
&plist, &tmout))
return NULL;

if (!(c_parts = py_to_c_parts(plist)))
return NULL;

err = rd_kafka_offsets_for_times(self->rk,
c_parts,
tmout >= 0 ? (int)(tmout * 1000.0f) : -1);

if (err) {
rd_kafka_topic_partition_list_destroy(c_parts);
cfl_PyErr_Format(err,
"Failed to get offsets: %s",
rd_kafka_err2str(err));
return NULL;
}

plist = c_parts_to_py(c_parts);
rd_kafka_topic_partition_list_destroy(c_parts);

return plist;
#endif
}


static PyObject *Consumer_poll (Handle *self, PyObject *args,
PyObject *kwargs) {
Expand Down Expand Up @@ -936,6 +986,24 @@ static PyMethodDef Consumer_methods[] = {
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "offsets_for_times", (PyCFunction)Consumer_offsets_for_times,
METH_VARARGS|METH_KEYWORDS,
".. py:function:: offsets_for_times(partitions, [timeout=None])\n"
"\n"
" offsets_for_times looks up offsets by timestamp for the given partitions.\n"
"\n"
" The returned offsets for each partition is the earliest offset whose\n"
" timestamp is greater than or equal to the given timestamp in the\n"
" corresponding partition.\n"
"\n"
" :param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field."
" :param float timeout: Request timeout.\n"
" :returns: list of topic+partition with offset field set and possibly error set\n"
" :rtype: list(TopicPartition)\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "close", (PyCFunction)Consumer_close, METH_NOARGS,
"\n"
" Close down and terminate the Kafka Consumer.\n"
Expand Down
8 changes: 8 additions & 0 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,18 @@ def verify_consumer():
lo, hi = c.get_watermark_offsets(assignment[0], cached=True)
print('Cached offsets for %s: %d - %d' % (assignment[0], lo, hi))


# Query broker for offsets
lo, hi = c.get_watermark_offsets(assignment[0], timeout=1.0)
print('Queried offsets for %s: %d - %d' % (assignment[0], lo, hi))

# Query offsets for timestamps by setting the topic partition offset to a timestamp. 123456789000 + 1
topic_partions_to_search = list(map(lambda p: confluent_kafka.TopicPartition(topic, p, 123456789001), range(0, 3)))
print("Searching for offsets with %s" % topic_partions_to_search)

offsets = c.offsets_for_times(topic_partions_to_search, timeout=1.0)
print("offsets_for_times results: %s" % offsets)

# Close consumer
c.close()

Expand Down
22 changes: 22 additions & 0 deletions tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ def poll(self, somearg):
sc.close()


@pytest.mark.skipif(libversion()[1] < 0x000b0000,
reason="requires librdkafka >=0.11.0")
def test_offsets_for_times():
c = Consumer({'group.id': 'test',
'enable.auto.commit': True,
'enable.auto.offset.store': False,
'socket.timeout.ms': 50,
'session.timeout.ms': 100})
# Query broker for timestamps for partition
try:
test_topic_partition = TopicPartition("test", 0, 100)
c.offsets_for_times([test_topic_partition], timeout=0.1)
except KafkaException as e:
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._WAIT_COORD, KafkaError.LEADER_NOT_AVAILABLE),\
str(e.args([0]))
c.close()


def test_multiple_close_throw_exception():
""" Calling Consumer.close() multiple times should throw Runtime Exception
"""
Expand Down Expand Up @@ -251,3 +269,7 @@ def test_calling_store_offsets_after_close_throws_erro():
with pytest.raises(RuntimeError) as ex:
c.store_offsets(offsets=[TopicPartition("test", 0, 42)])
assert 'Consumer closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.offsets_for_times([TopicPartition("test", 0)])
assert 'Consumer closed' == str(ex.value)