Skip to content

Synchronous Functions Should Release the GIL #412

Closed
@bpowers39

Description

@bpowers39

Description

Some functions that take a timeout do not release the GIL. This will cause all python threads to block until the function returns.

Functions that seem to do this include:
Consumer.committed()
Consumer.get_watermark_offsets()
Consumer.offsets_for_times()
Producer.flush()

Consumer.get_watermark_offsets() is the only function I'm completely sure of. The others are just a guess from looking at the code.

How to reproduce

Create a topic with one partition. Run the below example. Once the example has consumed a few messages, kill the broker hosting the partition. Notice the other thread is longer printing it's debug message.

import time
import sys
import threading
import confluent_kafka
from confluent_kafka import Consumer, KafkaError
from uuid import uuid4

def debug_thread_func():
    while True:
        print("Test...")
        time.sleep(1)

if __name__ == '__main__':
    print(confluent_kafka.version(), confluent_kafka.libversion())

    debug_thread = threading.Thread(target=debug_thread_func)
    debug_thread.start()

    client = Consumer({'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()),
              'default.topic.config': {'auto.offset.reset': 'smallest'}})

    def assigned(consumer, partitions):
        print("Assigned:", partitions)
        

    client.subscribe(['ibbot'], on_assign=assigned)

    while True:
        msg = client.poll(timeout=1)
        for partition in client.assignment():
            print(client.get_watermark_offsets(partition))

        if msg is not None:
            if msg.error():
                print("Error: ", msg.error())
            else:
                print("Data")

    client.close()
   

Note that this example is probably dependent on a bug in rdkafka, so it may not be reproducible 100% of the time. You can also reproduce this with a multi-partition and broker setup. In this case, the application only deadlocks for as long as it takes for a new leader to be elected.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): kafka-python: ('0.11.4', 721920) rdkafka: ('0.11.4', 722175)
  • Apache Kafka broker version: 0.11.0.2
  • Client configuration: {'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()), 'default.topic.config': {'auto.offset.reset': 'smallest'}}
  • Operating system: RHEL 7
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions