Description
Description
Currently, if a producer times out (i.e when a broker goes down), messages are added to an internal queue to be published. The number of remaining messages is returned via the flush()
command.
In my use case, I would like failed messages to be cleared from the outgoing queue. However, with current functionality, the next time I produce a message, both it and the other outgoing queued messages will be sent. Looking through the documentation, I didn't see any straightforward way to do this.
How to reproduce
producer = AvroProducer(kafka_producer_config)
producer.produce(
key=key,
value=value,
topic=topic,
key_schema=key_schema,
value_schema=value_schema,
on_delivery=delivery_callback,
)
remaining_messages = self.client.flush(timeout)
# Here I would like to clear the outgoing queue
if remaining_messages > 0:
raise KafkaProducerException(
f"Kafka producer timed out after {self.flush_timeout} seconds"
f" with {remaining_messages} messages remaining to be published."
)
Create any producer and give it a config with invalid port numbers in the bootstrap.servers
section to simulate a broker it can't connect to. Attempt to produce several messages, and each time the remaining_messages
goes up. I would like a way to clear the output queue, to ensure that the next time a successful message comes through, none of the previously failed messages are sent.
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
):
('0.11.6', 722687)
>>> confluent_kafka.version()
('0.11.6', 722432)
- Apache Kafka broker version: confluentinc/cp-kafka:5.0.0
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue