Description
Description
when trying to commit a message after not polling for anything more than the max.poll.interval.ms, I'm getting a non-recoverable segmentation fault that I can't handle in an exception handler, causing the python interpreter to exit
How to reproduce
consumer = Consumer({
"bootstrap.servers": cls._bootstrap_servers,
"group.id": group_id,
"enable.auto.commit": False,
"auto.offset.reset": "earliest"
})
consumer.subscribe([topic_name])
while True:
logger.info("polling...")
message = consumer.poll(timeout=10.0)
if message is None:
continue
consumer.pause(consumer.assignment())
logger.info("consumer has been paused")
try:
output = message.value()
logger.info(f"received message from kafka {output}")
output = json.loads(output)
logger.info("-------Consumed Message------")
logger.info(output)
processing_output_function(output) # this processing should take more than the max.poll.interval.ms, which would be 5 minutes by default
except TypeError as e:
logger.error(f"json type error: {e}")
except json.decoder.JSONDecodeError as e:
logger.error(f"json decode error: {e}")
except Exception as e:
logger.error(f"JAIS General Exception: {e}")
finally:
logger.info("committing message")
consumer.commit(message)
logger.info("message has been committed")
consumer.resume(consumer.assignment())
logger.info("consumer has been resumed")
in the section above, if processing_output_function(output) takes more than the max.poll.interval.ms, that particular loop with end correctly, commit the message, and then on the next one, I consume a message that says
Application maximum poll interval (300000ms) exceeded by 231ms
, decoding this fails with " json.decoder.JSONDecodeError" exception, when going to finally commit the message, the logs says
"commiting message"
segmentation fault
and I don't reach the part about
"message has been commited"
I'm not sure if this is a working as intended situation, but it seems weird that it will stop my pthon execution and with no way to handle an exit exception
for now I increased the polling max time but can fetch logs later on if requested
Checklist
Please provide the following information:
- confluent-kafka-python: ('2.3.0', 33751040)
- librdkafka version: ('2.3.0', 33751295)
- Apache Kafka broker version: 2.13-2.8.1
- Client configuration:
{...}
: "enable.auto.commit": False, "auto.offset.reset": "earliest" - Operating system: linux
- Critical issue: yes I think