diff --git a/kafka/__init__.py b/kafka/__init__.py index f108eff1c..ff364d345 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -25,8 +25,8 @@ def emit(self, record): from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message) from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner -from kafka.structs import TopicPartition, OffsetAndMetadata from kafka.serializer import Serializer, Deserializer +from kafka.structs import TopicPartition, OffsetAndMetadata # To be deprecated when KafkaProducer interface is released from kafka.client import SimpleClient diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 1da4a3353..758bb92f8 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -8,7 +8,7 @@ from kafka.vendor.six.moves import queue # pylint: disable=import-error -from kafka.common import KafkaError +from kafka.errors import KafkaError from kafka.consumer.base import ( Consumer, AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, @@ -92,7 +92,7 @@ def _mp_consume(client, group, topic, message_queue, size, events, **consumer_op except KafkaError as e: # Retry with exponential backoff - log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval)) + log.exception("Problem communicating with Kafka, retrying in %d seconds...", interval) time.sleep(interval) interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index c0c1b1ed3..b60a5865b 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -24,13 +24,13 @@ ITER_TIMEOUT_SECONDS, NO_MESSAGES_WAIT_TIME_SECONDS ) -from kafka.common import ( - FetchRequestPayload, KafkaError, OffsetRequestPayload, - ConsumerFetchSizeTooSmall, +from kafka.errors import ( + KafkaError, ConsumerFetchSizeTooSmall, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error ) from kafka.protocol.message import PartialMessage +from kafka.structs import FetchRequestPayload, OffsetRequestPayload log = logging.getLogger(__name__) diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index a8310338c..2d24a5c8b 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -7,8 +7,8 @@ from kafka.vendor import six from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.common import TopicPartition from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from kafka.structs import TopicPartition log = logging.getLogger(__name__) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cb1de0d2e..f90d1821d 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -11,7 +11,7 @@ from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.protocol import ConsumerProtocol -from kafka import errors as Errors +import kafka.errors as Errors from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c9dd6c3a1..956cef6c5 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -14,13 +14,13 @@ from kafka.vendor import six -from kafka.structs import ( - ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions) from kafka.errors import ( kafka_errors, UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError, AsyncProducerQueueFull, UnknownError, RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set +from kafka.structs import ( + ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions) log = logging.getLogger('kafka.producer') diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f285ab474..7d52bdfa7 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -10,18 +10,18 @@ from kafka.vendor import six -from kafka import errors as Errors +import kafka.errors as Errors from kafka.client_async import KafkaClient, selectors from kafka.codec import has_gzip, has_snappy, has_lz4 from kafka.metrics import MetricConfig, Metrics from kafka.partitioner.default import DefaultPartitioner +from kafka.producer.future import FutureRecordMetadata, FutureProduceResult +from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator +from kafka.producer.sender import Sender from kafka.record.default_records import DefaultRecordBatchBuilder from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer from kafka.structs import TopicPartition -from kafka.producer.future import FutureRecordMetadata, FutureProduceResult -from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator -from kafka.producer.sender import Sender log = logging.getLogger(__name__) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 61f1e0e2a..1cd541356 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -6,12 +6,12 @@ import threading import time -from kafka import errors as Errors +import kafka.errors as Errors from kafka.producer.buffer import SimpleBufferPool from kafka.producer.future import FutureRecordMetadata, FutureProduceResult -from kafka.structs import TopicPartition from kafka.record.memory_records import MemoryRecordsBuilder from kafka.record.legacy_records import LegacyRecordBatchBuilder +from kafka.structs import TopicPartition log = logging.getLogger(__name__) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index b8f84e717..7dd258032 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -15,7 +15,6 @@ from kafka.codec import gzip_encode, snappy_encode from kafka.errors import ProtocolError, UnsupportedCodecError -from kafka.structs import ConsumerMetadataResponse from kafka.util import ( crc32, read_short_string, relative_unpack, write_int_string, group_by_topic_and_partition) @@ -322,7 +321,7 @@ def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): @classmethod def decode_consumer_metadata_response(cls, data): """ - Decode bytes to a ConsumerMetadataResponse + Decode bytes to a kafka.structs.ConsumerMetadataResponse Arguments: data: bytes to decode @@ -331,7 +330,7 @@ def decode_consumer_metadata_response(cls, data): (host, cur) = read_short_string(data, cur) ((port,), cur) = relative_unpack('>i', data, cur) - return ConsumerMetadataResponse(error, nodeId, host, port) + return kafka.structs.ConsumerMetadataResponse(error, nodeId, host, port) @classmethod def encode_offset_commit_request(cls, group, payloads): diff --git a/kafka/structs.py b/kafka/structs.py index 62f36dd4c..e15e92ed6 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -93,7 +93,3 @@ # Limit value: int >= 0, 0 means no retries RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) - - -# Support legacy imports from kafka.common -from kafka.errors import * diff --git a/test/test_client_async.py b/test/test_client_async.py index eccb56421..09781ac2c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -13,14 +13,13 @@ import pytest from kafka.client_async import KafkaClient, IdleConnectionManager +from kafka.cluster import ClusterMetadata from kafka.conn import ConnectionStates import kafka.errors as Errors from kafka.future import Future from kafka.protocol.metadata import MetadataResponse, MetadataRequest from kafka.protocol.produce import ProduceRequest from kafka.structs import BrokerMetadata -from kafka.cluster import ClusterMetadata -from kafka.future import Future @pytest.fixture diff --git a/test/test_conn.py b/test/test_conn.py index fbdeeb9e7..27d77beb3 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -13,7 +13,7 @@ from kafka.protocol.metadata import MetadataRequest from kafka.protocol.produce import ProduceRequest -import kafka.common as Errors +import kafka.errors as Errors @pytest.fixture diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 7a2627ea0..4afdcd9ac 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -5,7 +5,6 @@ import pytest from kafka.client_async import KafkaClient -from kafka.structs import TopicPartition, OffsetAndMetadata from kafka.consumer.subscription_state import ( SubscriptionState, ConsumerRebalanceListener) from kafka.coordinator.assignors.range import RangePartitionAssignor @@ -21,6 +20,7 @@ OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse) from kafka.protocol.metadata import MetadataResponse +from kafka.structs import TopicPartition, OffsetAndMetadata from kafka.util import WeakMethod @@ -34,7 +34,7 @@ def coordinator(client): def test_init(client, coordinator): - # metadata update on init + # metadata update on init assert client.cluster._need_update is True assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners @@ -542,7 +542,7 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]) _f.success(response) patched_coord._handle_offset_fetch_response.assert_called_with( - future, response) + future, response) @pytest.mark.parametrize('response,error,dead', [ diff --git a/test/test_fetcher.py b/test/test_fetcher.py index fc031f742..c82101818 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -12,16 +12,16 @@ CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError ) from kafka.consumer.subscription_state import SubscriptionState +from kafka.future import Future from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.offset import OffsetResponse -from kafka.structs import TopicPartition -from kafka.future import Future from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError ) from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords +from kafka.structs import TopicPartition @pytest.fixture diff --git a/test/test_util.py b/test/test_util.py index 58e5ab840..fb592e8e6 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -5,8 +5,8 @@ from . import unittest import kafka.errors -import kafka.util import kafka.structs +import kafka.util class UtilTest(unittest.TestCase): diff --git a/test/testutil.py b/test/testutil.py index 365e47f3b..a1383a0a0 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -11,10 +11,12 @@ from . import unittest from kafka import SimpleClient, create_message -from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError -from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \ - NotLeaderForPartitionError, UnknownTopicOrPartitionError, \ - FailedPayloadsError +from kafka.errors import ( + LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError, + NotLeaderForPartitionError, UnknownTopicOrPartitionError, + FailedPayloadsError +) +from kafka.structs import OffsetRequestPayload, ProduceRequestPayload from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order def kafka_versions(*versions):