Description
Description
Some functions that take a timeout do not release the GIL. This will cause all python threads to block until the function returns.
Functions that seem to do this include:
Consumer.committed()
Consumer.get_watermark_offsets()
Consumer.offsets_for_times()
Producer.flush()
Consumer.get_watermark_offsets() is the only function I'm completely sure of. The others are just a guess from looking at the code.
How to reproduce
Create a topic with one partition. Run the below example. Once the example has consumed a few messages, kill the broker hosting the partition. Notice the other thread is longer printing it's debug message.
import time
import sys
import threading
import confluent_kafka
from confluent_kafka import Consumer, KafkaError
from uuid import uuid4
def debug_thread_func():
while True:
print("Test...")
time.sleep(1)
if __name__ == '__main__':
print(confluent_kafka.version(), confluent_kafka.libversion())
debug_thread = threading.Thread(target=debug_thread_func)
debug_thread.start()
client = Consumer({'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()),
'default.topic.config': {'auto.offset.reset': 'smallest'}})
def assigned(consumer, partitions):
print("Assigned:", partitions)
client.subscribe(['ibbot'], on_assign=assigned)
while True:
msg = client.poll(timeout=1)
for partition in client.assignment():
print(client.get_watermark_offsets(partition))
if msg is not None:
if msg.error():
print("Error: ", msg.error())
else:
print("Data")
client.close()
Note that this example is probably dependent on a bug in rdkafka, so it may not be reproducible 100% of the time. You can also reproduce this with a multi-partition and broker setup. In this case, the application only deadlocks for as long as it takes for a new leader to be elected.
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): kafka-python: ('0.11.4', 721920) rdkafka: ('0.11.4', 722175) - Apache Kafka broker version: 0.11.0.2
- Client configuration:
{'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()), 'default.topic.config': {'auto.offset.reset': 'smallest'}}
- Operating system: RHEL 7
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue