Closed
Description
There seems to be a KafkaException
thrown consistently when first applying a Consumer
's get_watermark_offsets
method. After this first failure, subsequent tries do work. Below is a minimal example in the IPython shell.
I would like to use this method to obtain the earliest offsets and pass them to Spark Streaming. I am using v0.9.4 with librdkafka 0.9.4 and Confluent 3.0.1/Kafka 0.10.0.1 on CentOS 7. Thanks for any help with this!
In [1]: from confluent_kafka import Consumer, TopicPartition
In [2]: CONF = {
...: 'api.version.request': True,
...: 'bootstrap.servers': 'host1:9092,host2:9092,host3:9092',
...: 'client.id': 'client-test-earliest',
...: 'group.id': 'group-test-earliest',
...: }
In [3]: TOPIC = 'topic-test-earliest'
In [4]: c = Consumer(CONF)
In [5]: c.get_watermark_offsets(TopicPartition(TOPIC, 0))
---------------------------------------------------------------------------
KafkaException Traceback (most recent call last)
<ipython-input-5-9a7ab522527d> in <module>()
----> 1 c.get_watermark_offsets(TopicPartition(TOPIC, 0))
KafkaException: KafkaError{code=LEADER_NOT_AVAILABLE,val=5,str="Failed to get watermark offsets: Broker: Leader not available"}
In [6]: c.get_watermark_offsets(TopicPartition(TOPIC, 0))
Out[6]: (0L, 1L)
In [7]: c.get_watermark_offsets(TopicPartition(TOPIC, 0))
Out[7]: (0L, 1L)