Skip to content

Consumer crashes when consuming messages that are compressed with zstd #2044

Closed
@Green-Angry-Bird

Description

@Green-Angry-Bird

We have been working to resolve this issue for a few days now. The consumer seems to work in most circumstances but sometimes crashes. We have figured out that it is crashing whenever a message that has been compressed with zstd arrives.

The stack trace follows:

    return self.next_v2()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 1200, in next_v2
    return next(self._iterator)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 1115, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 654, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/group.py", line 707, in _poll_once
    records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records
    self._next_partition_records = self._parse_fetched_data(completion)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 816, in _parse_fetched_data
    unpacked = list(self._unpack_message_set(tp, records))
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set
    for record in batch:
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 271, in __iter__
    self._maybe_uncompress()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 180, in _maybe_uncompress
    self._assert_has_codec(compression_type)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/kafka/record/default_records.py", line 114, in _assert_has_codec
    if not checker():
UnboundLocalError: local variable 'checker' referenced before assignment```

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions