Skip to content

Commit

Permalink
Don't use kafka.common internally
Browse files Browse the repository at this point in the history
This finishes the split from `kafka.common` to `kafka.errors`/`kafka.structs`.
  • Loading branch information
jeffwidman committed Jun 5, 2018
1 parent 81cda59 commit bc4cc43
Show file tree
Hide file tree
Showing 16 changed files with 32 additions and 36 deletions.
2 changes: 1 addition & 1 deletion kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def emit(self, record):
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message)
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.serializer import Serializer, Deserializer
from kafka.structs import TopicPartition, OffsetAndMetadata

# To be deprecated when KafkaProducer interface is released
from kafka.client import SimpleClient
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from kafka.vendor.six.moves import queue # pylint: disable=import-error

from kafka.common import KafkaError
from kafka.errors import KafkaError
from kafka.consumer.base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
Expand Down Expand Up @@ -92,7 +92,7 @@ def _mp_consume(client, group, topic, message_queue, size, events, **consumer_op

except KafkaError as e:
# Retry with exponential backoff
log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
log.exception("Problem communicating with Kafka, retrying in %d seconds...", interval)
time.sleep(interval)
interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS

Expand Down
6 changes: 3 additions & 3 deletions kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
ITER_TIMEOUT_SECONDS,
NO_MESSAGES_WAIT_TIME_SECONDS
)
from kafka.common import (
FetchRequestPayload, KafkaError, OffsetRequestPayload,
ConsumerFetchSizeTooSmall,
from kafka.errors import (
KafkaError, ConsumerFetchSizeTooSmall,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from kafka.protocol.message import PartialMessage
from kafka.structs import FetchRequestPayload, OffsetRequestPayload


log = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/assignors/roundrobin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.common import TopicPartition
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.structs import TopicPartition

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
from kafka import errors as Errors
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

from kafka.vendor import six

from kafka.structs import (
ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
from kafka.errors import (
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.structs import (
ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)

log = logging.getLogger('kafka.producer')

Expand Down
8 changes: 4 additions & 4 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@

from kafka.vendor import six

from kafka import errors as Errors
import kafka.errors as Errors
from kafka.client_async import KafkaClient, selectors
from kafka.codec import has_gzip, has_snappy, has_lz4
from kafka.metrics import MetricConfig, Metrics
from kafka.partitioner.default import DefaultPartitioner
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
from kafka.producer.sender import Sender
from kafka.record.default_records import DefaultRecordBatchBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
from kafka.structs import TopicPartition
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
from kafka.producer.sender import Sender


log = logging.getLogger(__name__)
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import threading
import time

from kafka import errors as Errors
import kafka.errors as Errors
from kafka.producer.buffer import SimpleBufferPool
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.structs import TopicPartition
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.structs import TopicPartition


log = logging.getLogger(__name__)
Expand Down
5 changes: 2 additions & 3 deletions kafka/protocol/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from kafka.codec import gzip_encode, snappy_encode
from kafka.errors import ProtocolError, UnsupportedCodecError
from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, relative_unpack,
write_int_string, group_by_topic_and_partition)
Expand Down Expand Up @@ -322,7 +321,7 @@ def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
@classmethod
def decode_consumer_metadata_response(cls, data):
"""
Decode bytes to a ConsumerMetadataResponse
Decode bytes to a kafka.structs.ConsumerMetadataResponse
Arguments:
data: bytes to decode
Expand All @@ -331,7 +330,7 @@ def decode_consumer_metadata_response(cls, data):
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)

return ConsumerMetadataResponse(error, nodeId, host, port)
return kafka.structs.ConsumerMetadataResponse(error, nodeId, host, port)

@classmethod
def encode_offset_commit_request(cls, group, payloads):
Expand Down
4 changes: 0 additions & 4 deletions kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,3 @@
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])


# Support legacy imports from kafka.common
from kafka.errors import *
3 changes: 1 addition & 2 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
import pytest

from kafka.client_async import KafkaClient, IdleConnectionManager
from kafka.cluster import ClusterMetadata
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse, MetadataRequest
from kafka.protocol.produce import ProduceRequest
from kafka.structs import BrokerMetadata
from kafka.cluster import ClusterMetadata
from kafka.future import Future


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.produce import ProduceRequest

import kafka.common as Errors
import kafka.errors as Errors


@pytest.fixture
Expand Down
6 changes: 3 additions & 3 deletions test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pytest

from kafka.client_async import KafkaClient
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.coordinator.assignors.range import RangePartitionAssignor
Expand All @@ -21,6 +20,7 @@
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.util import WeakMethod


Expand All @@ -34,7 +34,7 @@ def coordinator(client):


def test_init(client, coordinator):
# metadata update on init
# metadata update on init
assert client.cluster._need_update is True
assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners

Expand Down Expand Up @@ -542,7 +542,7 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])])
_f.success(response)
patched_coord._handle_offset_fetch_response.assert_called_with(
future, response)
future, response)


@pytest.mark.parametrize('response,error,dead', [
Expand Down
4 changes: 2 additions & 2 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
)
from kafka.consumer.subscription_state import SubscriptionState
from kafka.future import Future
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest, FetchResponse
from kafka.protocol.offset import OffsetResponse
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
from kafka.structs import TopicPartition


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from . import unittest

import kafka.errors
import kafka.util
import kafka.structs
import kafka.util


class UtilTest(unittest.TestCase):
Expand Down
10 changes: 6 additions & 4 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from . import unittest

from kafka import SimpleClient, create_message
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \
NotLeaderForPartitionError, UnknownTopicOrPartitionError, \
FailedPayloadsError
from kafka.errors import (
LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
FailedPayloadsError
)
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order

def kafka_versions(*versions):
Expand Down

0 comments on commit bc4cc43

Please sign in to comment.