Skip to content

Commit

Permalink
Convert remaining KafkaConsumer tests to pytest (#1886)
Browse files Browse the repository at this point in the history
This makes it so the only remaining use of `unittest` is in the old
tests of the deprecated `Simple*` clients. All `KafkaConsumer` tests are
migrated to `pytest`.

I also had to bump the test iterations up on one of the tests, I think there was a race condition there that was more commonly hit under pytest , planning to cleanup that in a followup PR. See #1886 (comment) for details.
  • Loading branch information
jeffwidman authored Aug 23, 2019
1 parent 6e6d0cc commit 61fa0b2
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 256 deletions.
26 changes: 26 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import absolute_import

import uuid

import pytest

from test.testutil import env_kafka_version, random_string
Expand Down Expand Up @@ -137,3 +139,27 @@ def _set_conn_state(state):
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn


@pytest.fixture()
def send_messages(topic, kafka_producer, request):
"""A factory that returns a send_messages function with a pre-populated
topic topic / producer."""

def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request):
"""
messages is typically `range(0,100)`
partition is an int
"""
messages_and_futures = [] # [(message, produce_future),]
for i in number_range:
# request.node.name provides the test name (including parametrized values)
encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8')
future = kafka_producer.send(topic, value=encoded_msg, partition=partition)
messages_and_futures.append((encoded_msg, future))
kafka_producer.flush()
for (msg, f) in messages_and_futures:
assert f.succeeded()
return [msg for (msg, f) in messages_and_futures]

return _send_messages
2 changes: 2 additions & 0 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def test_consumer(kafka_broker, topic):
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
consumer.close()


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_consumer_topics(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
Expand All @@ -38,6 +39,7 @@ def test_consumer_topics(kafka_broker, topic):
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()


@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_broker, topic):
num_partitions = 4
Expand Down
Loading

0 comments on commit 61fa0b2

Please sign in to comment.