Closed
Description
We have been working to resolve this issue for a few days now. The consumer seems to work in most circumstances but sometimes crashes. We have figured out that it is crashing whenever a message that has been compressed with zstd arrives.
The stack trace follows:
return self.next_v2()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 1200, in next_v2
return next(self._iterator)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 1115, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 654, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 707, in _poll_once
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records
self._next_partition_records = self._parse_fetched_data(completion)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 816, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set
for record in batch:
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 271, in __iter__
self._maybe_uncompress()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 180, in _maybe_uncompress
self._assert_has_codec(compression_type)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 114, in _assert_has_codec
if not checker():
UnboundLocalError: local variable 'checker' referenced before assignment```
Metadata
Metadata
Assignees
Labels
No labels