Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration tests migration to pytest framework #1293

Merged
merged 4 commits into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
return [responses[tp] for tp in original_ordering]

def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
return '<SimpleClient client_id=%s>' % (self.client_id)
jeffwidman marked this conversation as resolved.
Show resolved Hide resolved

def _raise_on_response_error(self, resp):

Expand Down
2 changes: 2 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,8 @@ def poll(self, timeout_ms=None, future=None):
timeout_ms = 100
elif timeout_ms is None:
timeout_ms = self.config['request_timeout_ms']
elif not isinstance(timeout_ms, (int, float)):
raise RuntimeError('Invalid type for timeout: %s' % type(timeout_ms))

# Loop for futures, break after first loop if None
responses = []
Expand Down
1 change: 1 addition & 0 deletions pylint.rc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[TYPECHECK]
ignored-classes=SyncManager,_socketobject
generated-members=py.*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The py module seems to have some dynamically generated members and pylint complains about not finding the ones used by the module:

Module 'py' has no 'path' member (no-member)

This is the same issue reported here. Adding this option to pylint just silences this error.

This will be temporary anyway, as I explain below.


[MESSAGES CONTROL]
disable=E1129
113 changes: 96 additions & 17 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,117 @@
from __future__ import absolute_import

import os
import inspect

import pytest
from decorator import decorate

from test.fixtures import KafkaFixture, ZookeeperFixture

from test.testutil import kafka_version, random_string

@pytest.fixture(scope="module")
def version():
if 'KAFKA_VERSION' not in os.environ:
return ()
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))

"""Return the Kafka version set in the OS environment"""
return kafka_version()

@pytest.fixture(scope="module")
def zookeeper(version, request):
assert version
jeffwidman marked this conversation as resolved.
Show resolved Hide resolved
zk = ZookeeperFixture.instance()
yield zk
zk.close()
def zookeeper():
"""Return a Zookeeper fixture"""
zk_instance = ZookeeperFixture.instance()
yield zk_instance
zk_instance.close()

@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
"""Return a Kafka broker fixture"""
return kafka_broker_factory()[0]

@pytest.fixture(scope="module")
def kafka_broker(version, zookeeper, request):
assert version
k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
partitions=4)
yield k
k.close()
def kafka_broker_factory(version, zookeeper):
"""Return a Kafka broker fixture factory"""
assert version, 'KAFKA_VERSION must be specified to run integration tests'

_brokers = []
def factory(**broker_params):
params = {} if broker_params is None else broker_params.copy()
params.setdefault('partitions', 4)
num_brokers = params.pop('num_brokers', 1)
brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
for x in range(num_brokers))
_brokers.extend(brokers)
return brokers

yield factory

for broker in _brokers:
broker.close()

@pytest.fixture
def simple_client(kafka_broker, request, topic):
"""Return a SimpleClient fixture"""
client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,))
client.ensure_topic_exists(topic)
yield client
client.close()

@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
yield client
client.close()

@pytest.fixture
def kafka_consumer(kafka_consumer_factory):
"""Return a KafkaConsumer fixture"""
return kafka_consumer_factory()

@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
_consumer = [None]

def factory(**kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
return _consumer[0]

yield factory

if _consumer[0]:
_consumer[0].close()

@pytest.fixture
def kafka_producer(kafka_producer_factory):
"""Return a KafkaProducer fixture"""
yield kafka_producer_factory()

@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
"""Return a KafkaProduce factory fixture"""
_producer = [None]

def factory(**kafka_producer_params):
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
_producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
return _producer[0]

yield factory

if _producer[0]:
_producer[0].close()

@pytest.fixture
def topic(kafka_broker, request):
"""Return a topic fixture"""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
return topic_name

@pytest.fixture
def conn(mocker):
"""Return a connection mocker fixture"""
from kafka.conn import ConnectionStates
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse
Expand Down
Loading