Skip to content

Commit c2e2807

Browse files
committed
Expose offsets_for_times consumer method. closes #224
Expose offsets_for_times consumer method. closes #224 Expose offsets_for_times consumer method. closes #224
1 parent 7ebc807 commit c2e2807

File tree

2 files changed

+72
-0
lines changed

2 files changed

+72
-0
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,47 @@ static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args,
704704
}
705705

706706

707+
static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args,
708+
PyObject *kwargs) {
709+
710+
PyObject *plist;
711+
double tmout = -1.0f;
712+
rd_kafka_topic_partition_list_t *c_parts;
713+
rd_kafka_resp_err_t err;
714+
static char *kws[] = { "partitions", "timeout", NULL };
715+
716+
if (!self->rk) {
717+
PyErr_SetString(PyExc_RuntimeError,
718+
"Consumer closed");
719+
return NULL;
720+
}
721+
722+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws,
723+
&plist, &tmout))
724+
return NULL;
725+
726+
if (!(c_parts = py_to_c_parts(plist)))
727+
return NULL;
728+
729+
err = rd_kafka_offsets_for_times(self->rk,
730+
c_parts,
731+
tmout >= 0 ? (int)(tmout * 1000.0f) : -1);
732+
733+
if (err) {
734+
rd_kafka_topic_partition_list_destroy(c_parts);
735+
cfl_PyErr_Format(err,
736+
"Failed to get offsets: %s",
737+
rd_kafka_err2str(err));
738+
return NULL;
739+
}
740+
741+
plist = c_parts_to_py(c_parts);
742+
rd_kafka_topic_partition_list_destroy(c_parts);
743+
744+
return plist;
745+
746+
}
747+
707748

708749
static PyObject *Consumer_poll (Handle *self, PyObject *args,
709750
PyObject *kwargs) {
@@ -936,6 +977,24 @@ static PyMethodDef Consumer_methods[] = {
936977
" :raises: RuntimeError if called on a closed consumer\n"
937978
"\n"
938979
},
980+
{ "offsets_for_times", (PyCFunction)Consumer_offsets_for_times,
981+
METH_VARARGS|METH_KEYWORDS,
982+
".. py:function:: offsets_for_times(partitions, [timeout=None])\n"
983+
"\n"
984+
" offsets_for_times looks up offsets by timestamp for the given partitions.\n"
985+
"\n"
986+
" The returned offsets for each partition is the earliest offset whose\n"
987+
" timestamp is greater than or equal to the given timestamp in the\n"
988+
" corresponding partition.\n"
989+
"\n"
990+
" :param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field."
991+
" :param float timeout: Request timeout (when cached=False).\n"
992+
" :returns: list of topic+partition with offset field set and possibly error set\n"
993+
" :rtype: list(TopicPartition)\n"
994+
" :raises: KafkaException\n"
995+
" :raises: RuntimeError if called on a closed consumer\n"
996+
"\n"
997+
},
939998
{ "close", (PyCFunction)Consumer_close, METH_NOARGS,
940999
"\n"
9411000
" Close down and terminate the Kafka Consumer.\n"

tests/test_Consumer.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ def dummy_assign_revoke(consumer, partitions):
6060
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._WAIT_COORD, KafkaError.LEADER_NOT_AVAILABLE),\
6161
str(e.args([0]))
6262

63+
# Query broker for timestamps for partition
64+
try:
65+
test_topic_partition = TopicPartition("test", 0, 100)
66+
offsets = kc.offsets_for_times([test_topic_partition], timeout=0.1)
67+
except KafkaException as e:
68+
import ipdb; ipdb.set_trace()
69+
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._WAIT_COORD, KafkaError.LEADER_NOT_AVAILABLE),\
70+
str(e.args([0]))
71+
6372
kc.unassign()
6473

6574
kc.commit(async=True)
@@ -232,6 +241,10 @@ def test_any_method_after_close_throws_exception():
232241
lo, hi = c.get_watermark_offsets(TopicPartition("test", 0))
233242
assert 'Consumer closed' == str(ex.value)
234243

244+
with pytest.raises(RuntimeError) as ex:
245+
c.offsets_for_times([TopicPartition("test", 0)])
246+
assert 'Consumer closed' == str(ex.value)
247+
235248

236249
@pytest.mark.skipif(libversion()[1] < 0x000b0000,
237250
reason="requires librdkafka >=0.11.0")

0 commit comments

Comments
 (0)