Description
Description
I have a consumer implemented like this:
`
class KafkaReader:
def __init__(self, topics: List[str], config: Dict, timeout: float = 1):
self._consumer = Consumer(config)
self._consumer.subscribe(topics)
self._topics = topics
self._timeout = timeout
def read(self):
while True:
message = self._consumer.poll(timeout=self._timeout)
if message is None:
continue
self._consumer.store_offsets(message)
if message.error():
logger.error(
"Kafka error", topic=message.topic(), error=message.error()
)
continue
yield KafkaMessage(...)
`
It randomly fails with SIGSEGV when trying to re-connect to broker. I've captured the following log using gdb (edited for anonymization). It shows both successful re-connection and failure with segmentation fault.
`
%7|1589814027.729|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id " is rebalancing in state up (join-state started) with assignment: group is rebalancing
%7|1589814027.729|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": new assignment of 0 partition(s) in join state wait-revoke-rebalance_cb
%7|1589814027.729|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": unassigning 5 partition(s) (v6)
%7|1589814027.729|JOIN|rdkafka#consumer-1| [thrd:main]: 10.61.32.88:9092/1004: Joining group "kafka-group-id " with 1 subscribed topic(s)
%7|1589814028.733|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": new assignment of 5 partition(s) in join state wait-assign-rebalance_cb
%7|1589814028.734|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1004: Fetch committed offsets for 5/5 partition(s)
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [0] start fetching at offset 176905
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [1] start fetching at offset 177036
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [2] start fetching at offset 183135
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [3] start fetching at offset 175680
%7|1589814028.734|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [4] start fetching at offset 177818
%7|1589815172.579|BROKERFAIL|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection reset by peer)
%7|1589815172.579|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state UP -> DOWN
%7|1589815172.579|REQERR|rdkafka#consumer-1| [thrd:main]: broker.test.com:9092/bootstrap: MetadataRequest failed: Local: Broker transport failure: actions Retry
%7|1589815172.579|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state DOWN -> INIT
%7|1589815172.679|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1589815172.679|RETRY|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Moved 1 retry buffer(s) to output queue
%7|1589815173.579|CONNECT|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1589815173.579|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1589815173.604|CONNECT|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 9
%7|1589815173.605|CONNECT|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Connected to ipv4#127.0.0.1:9092
%7|1589815173.605|CONNECTED|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Connected (#2)
%7|1589815173.605|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1589815173.606|PROTOERR|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589815173.606|PROTOERR|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: ApiArrayCnt -1 out of range
%7|1589815173.606|APIVERSION|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: ApiVersionRequest v3 failed due to UNSUPPORTED_VERSION: retrying with v0
%7|1589815173.606|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state APIVERSION_QUERY - > UP
%7|1589815776.829|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id " is rebalancing in state up (join-state started) with assignment: group is rebalancing
%7|1589815776.829|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": new assignment of 0 partition(s) in join state wait-revoke-rebalance_cb
%7|1589815776.829|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": unassigning 5 partition(s) (v11)
%7|1589815776.829|JOIN|rdkafka#consumer-1| [thrd:main]: 10.61.32.88:9092/1004: Joining group "kafka-group-id " with 1 subscribed topic(s)
%7|1589815776.831|ASSIGNOR|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": "range" assignor run for 3 member(s)
%7|1589815776.833|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "kafka-group-id ": new assignment of 5 partition(s) in join state wait-assign-rebalance_cb
%7|1589815776.833|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1004: Fetch committed offsets for 5/5 partition(s)
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [0] start fetching at offset 176905
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [1] start fetching at offset 177036
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [2] start fetching at offset 183135
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [3] start fetching at offset 175680
%7|1589815776.834|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my.topic [4] start fetching at offset 177818
%7|1589816373.600|BROKERFAIL|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection reset by peer)
%7|1589816373.600|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state UP -> DOWN
%7|1589816373.600|STATE|rdkafka#consumer-1| [thrd:broker.test.com:9092/bootstrap]: broker.test.com:9092/bootstrap: Broker changed state DOWN -> INIT
3818 Objects/unicodeobject.c: No such file or directory.
Thread 1 "python" received signal SIGSEGV, Segmentation fault.
0x00007ffff7dc441a in PyUnicode_AsUTF8AndSize (unicode=0x0, psize=0x0)
at Objects/unicodeobject.c:3818
(gdb) quit
`
Is there something that I have to configure additionally to avoid this failure?
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
):
"confluent_kafka.version(): ('1.4.0', 17039360)"
"confluent_kafka.libversion(): ('1.4.0', 17039615)" - Apache Kafka broker version:
- Client configuration:
`
{
"bootstrap.servers": ...,
"group.id": ...,
"enable.auto.offset.store": False,
"auto.offset.reset": "latest",
"security.protocol": ...,
"debug": "consumer,broker",
}
`
- Operating system:
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian - Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue