Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka offset checking test #1212

Merged
Prev Previous commit
Next Next commit
Expose seek to beginning logic as a function, create kafka_consumer w…
…ith a group_id
  • Loading branch information
dagardner-nv committed Sep 22, 2023
commit 577bd5bf66c976ddc8e1a0636bc5faa23f2a71ec
27 changes: 17 additions & 10 deletions tests/_utils/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,28 @@ def kafka_bootstrap_servers_fixture(kafka_server: (subprocess.Popen, int)): # p
yield f"localhost:{kafka_port}"


@pytest.fixture(name='kafka_consumer', scope='function')
def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"):
_kafka_consumer.subscribe([kafka_topics.output_topic])

# Wait until we have assigned partitions
def seek_to_beginning(kafka_consumer: "KafkaConsumer", timeout: int = PARTITION_ASSIGNMENT_TIMEOUT):
"""
Seeks to the beginning of the Kafka topic
"""
start = time.time()
end = start + PARTITION_ASSIGNMENT_TIMEOUT
end = start + timeout
partitions_assigned = False
while not partitions_assigned and time.time() <= end:
_kafka_consumer.poll(timeout_ms=20)
partitions_assigned = len(_kafka_consumer.assignment()) > 0
kafka_consumer.poll(timeout_ms=20)
partitions_assigned = len(kafka_consumer.assignment()) > 0
if not partitions_assigned:
time.sleep(0.1)

assert partitions_assigned

_kafka_consumer.seek_to_beginning()
kafka_consumer.seek_to_beginning()


@pytest.fixture(name='kafka_consumer', scope='function')
def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"):
_kafka_consumer.subscribe([kafka_topics.output_topic])
seek_to_beginning(_kafka_consumer)

yield _kafka_consumer

Expand Down Expand Up @@ -103,7 +108,9 @@ def _init_pytest_kafka() -> (bool, Exception):
'zookeeper_proc',
teardown_fn=teardown_fn,
scope='session')
_kafka_consumer = pytest_kafka.make_kafka_consumer('kafka_server', scope='function')
_kafka_consumer = pytest_kafka.make_kafka_consumer('kafka_server',
scope='function',
group_id='morpheus_unittest_consumer')

return (True, None)
except Exception as e:
Expand Down