From 5e461a7e017130fb9115add8d64291d6966267e9 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Fri, 12 Jul 2024 01:37:38 -0400 Subject: [PATCH] Patch pylint warnings so tests pass again (#184) * stop pylint complaint for uncovered conditional flow * add todo to revisit * formatting makes me happy :) * Fix errors raised by new version of Pylint so tests pass again --- kafka/admin/client.py | 5 +++++ kafka/coordinator/consumer.py | 5 +++++ kafka/record/default_records.py | 8 +++++--- kafka/record/legacy_records.py | 2 ++ 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 5b01f8fe6..f74e09a80 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -503,6 +503,8 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): topics=topics, allow_auto_topic_creation=auto_topic_creation ) + else: + raise IncompatibleBrokerVersion(f"MetadataRequest for {version} is not supported") future = self._send_request_to_node( self._client.least_loaded_node(), @@ -1010,6 +1012,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" if response.API_VERSION <= 3: + group_description = None assert len(response.groups) == 1 for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): if isinstance(response_field, Array): @@ -1045,6 +1048,8 @@ def _describe_consumer_groups_process_response(self, response): if response.API_VERSION <=2: described_group_information_list.append(None) group_description = GroupInformation._make(described_group_information_list) + if group_description is None: + raise Errors.BrokerResponseError("No group description received") error_code = group_description.error_code error_type = Errors.for_code(error_code) # Java has the note: KAFKA-6789, we can retry based on the error code diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cf82b69fe..351641981 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -628,10 +628,15 @@ def _send_offset_commit_request(self, offsets): ) for partition, offset in partitions.items()] ) for topic, partitions in offset_data.items()] ) + else: + # TODO: We really shouldn't need this here to begin with, but I'd like to get + # pylint to stop complaining. + raise Exception(f"Unsupported Broker API: {self.config['api_version']}") log.debug("Sending offset-commit request with %s for group %s to %s", offsets, self.group_id, node_id) + future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 8b630cc8b..06be57621 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -187,12 +187,14 @@ def _maybe_uncompress(self) -> None: data = memoryview(self._buffer)[self._pos:] if compression_type == self.CODEC_GZIP: uncompressed = gzip_decode(data) - if compression_type == self.CODEC_SNAPPY: + elif compression_type == self.CODEC_SNAPPY: uncompressed = snappy_decode(data.tobytes()) - if compression_type == self.CODEC_LZ4: + elif compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) - if compression_type == self.CODEC_ZSTD: + elif compression_type == self.CODEC_ZSTD: uncompressed = zstd_decode(data.tobytes()) + else: + raise NotImplementedError(f"Compression type {compression_type} is not supported") self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 4439462f6..44b365b06 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -461,6 +461,8 @@ def _maybe_compress(self) -> bool: compressed = lz4_encode_old_kafka(data) else: compressed = lz4_encode(data) + else: + raise NotImplementedError(f"Compression type {self._compression_type} is not supported") size = self.size_in_bytes( 0, timestamp=0, key=None, value=compressed) # We will try to reuse the same buffer if we have enough space