Skip to content

Can I clear the Producer output queue when flush() times out? #538

Closed
@peteryin21

Description

@peteryin21

Description

Currently, if a producer times out (i.e when a broker goes down), messages are added to an internal queue to be published. The number of remaining messages is returned via the flush() command.

In my use case, I would like failed messages to be cleared from the outgoing queue. However, with current functionality, the next time I produce a message, both it and the other outgoing queued messages will be sent. Looking through the documentation, I didn't see any straightforward way to do this.

How to reproduce

producer = AvroProducer(kafka_producer_config)
producer.produce(
        key=key,
        value=value,
        topic=topic,
        key_schema=key_schema,
        value_schema=value_schema,
        on_delivery=delivery_callback,
    )
remaining_messages = self.client.flush(timeout)
# Here I would like to clear the outgoing queue 
if remaining_messages > 0:
    raise KafkaProducerException(
            f"Kafka producer timed out after {self.flush_timeout} seconds"
            f" with {remaining_messages} messages remaining to be published."
        )

Create any producer and give it a config with invalid port numbers in the bootstrap.servers section to simulate a broker it can't connect to. Attempt to produce several messages, and each time the remaining_messages goes up. I would like a way to clear the output queue, to ensure that the next time a successful message comes through, none of the previously failed messages are sent.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
('0.11.6', 722687)
>>> confluent_kafka.version()
('0.11.6', 722432)
  • Apache Kafka broker version: confluentinc/cp-kafka:5.0.0
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions