Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close KafkaConsumer instances during tests #1410

Merged
merged 3 commits into from
Mar 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_consumer(kafka_broker, version):
assert len(consumer._client._conns) > 0
node_id = list(consumer._client._conns.keys())[0]
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
consumer.close()


@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
Expand Down Expand Up @@ -153,6 +154,7 @@ def test_paused(kafka_broker, topic):

consumer.unsubscribe()
assert set() == consumer.paused()
consumer.close()


@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
Expand Down Expand Up @@ -183,3 +185,4 @@ def test_heartbeat_thread(kafka_broker, topic):
assert consumer._coordinator.heartbeat.last_poll == last_poll
consumer.poll(timeout_ms=100)
assert consumer._coordinator.heartbeat.last_poll > last_poll
consumer.close()
7 changes: 7 additions & 0 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):

assert len(messages[0]) == 100
assert len(messages[1]) == 100
kafka_consumer.close()


class TestConsumerIntegration(KafkaIntegrationTestCase):
Expand Down Expand Up @@ -558,6 +559,7 @@ def test_kafka_consumer__blocking(self):
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
consumer.close()

@kafka_versions('>=0.8.1')
def test_kafka_consumer__offset_commit_resume(self):
Expand Down Expand Up @@ -597,6 +599,7 @@ def test_kafka_consumer__offset_commit_resume(self):
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
consumer2.close()

@kafka_versions('>=0.10.1')
def test_kafka_consumer_max_bytes_simple(self):
Expand All @@ -617,6 +620,7 @@ def test_kafka_consumer_max_bytes_simple(self):
self.assertEqual(
seen_partitions, set([
TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
consumer.close()

@kafka_versions('>=0.10.1')
def test_kafka_consumer_max_bytes_one_msg(self):
Expand All @@ -642,6 +646,7 @@ def test_kafka_consumer_max_bytes_one_msg(self):

fetched_msgs = [next(consumer) for i in range(10)]
self.assertEqual(len(fetched_msgs), 10)
consumer.close()

@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_for_time(self):
Expand Down Expand Up @@ -695,6 +700,7 @@ def test_kafka_consumer_offsets_for_time(self):
self.assertEqual(offsets, {
tp: late_msg.offset + 1
})
consumer.close()

@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_search_many_partitions(self):
Expand Down Expand Up @@ -733,6 +739,7 @@ def test_kafka_consumer_offsets_search_many_partitions(self):
tp0: p0msg.offset + 1,
tp1: p1msg.offset + 1
})
consumer.close()

@kafka_versions('<0.10.1')
def test_kafka_consumer_offsets_for_time_old(self):
Expand Down
2 changes: 1 addition & 1 deletion test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def test_end_to_end(kafka_broker, compression):
futures.append(producer.send(topic, 'msg %d' % i))
ret = [f.get(timeout=30) for f in futures]
assert len(ret) == messages

producer.close()

consumer.subscribe([topic])
Expand All @@ -67,6 +66,7 @@ def test_end_to_end(kafka_broker, compression):
break

assert msgs == set(['msg %d' % i for i in range(messages)])
consumer.close()


@pytest.mark.skipif(platform.python_implementation() != 'CPython',
Expand Down