Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka offset checking test #1212

Merged
27 changes: 17 additions & 10 deletions tests/_utils/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,28 @@ def kafka_bootstrap_servers_fixture(kafka_server: (subprocess.Popen, int)): # p
yield f"localhost:{kafka_port}"


@pytest.fixture(name='kafka_consumer', scope='function')
def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"):
_kafka_consumer.subscribe([kafka_topics.output_topic])

# Wait until we have assigned partitions
def seek_to_beginning(kafka_consumer: "KafkaConsumer", timeout: int = PARTITION_ASSIGNMENT_TIMEOUT):
"""
Seeks to the beginning of the Kafka topic
"""
start = time.time()
end = start + PARTITION_ASSIGNMENT_TIMEOUT
end = start + timeout
partitions_assigned = False
while not partitions_assigned and time.time() <= end:
_kafka_consumer.poll(timeout_ms=20)
partitions_assigned = len(_kafka_consumer.assignment()) > 0
kafka_consumer.poll(timeout_ms=20)
partitions_assigned = len(kafka_consumer.assignment()) > 0
if not partitions_assigned:
time.sleep(0.1)

assert partitions_assigned

_kafka_consumer.seek_to_beginning()
kafka_consumer.seek_to_beginning()


@pytest.fixture(name='kafka_consumer', scope='function')
def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"):
_kafka_consumer.subscribe([kafka_topics.output_topic])
seek_to_beginning(_kafka_consumer)

yield _kafka_consumer

Expand Down Expand Up @@ -103,7 +108,9 @@ def _init_pytest_kafka() -> (bool, Exception):
'zookeeper_proc',
teardown_fn=teardown_fn,
scope='session')
_kafka_consumer = pytest_kafka.make_kafka_consumer('kafka_server', scope='function')
_kafka_consumer = pytest_kafka.make_kafka_consumer('kafka_server',
scope='function',
group_id='morpheus_unittest_consumer')

return (True, None)
except Exception as e:
Expand Down
104 changes: 33 additions & 71 deletions tests/test_kafka_source_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,26 @@
import os
import typing

import mrc
import pandas as pd
import pytest
from mrc.core import operators as ops

from _utils import TEST_DIRS
from _utils import assert_results
from _utils.kafka import seek_to_beginning
from _utils.kafka import write_data_to_kafka
from _utils.kafka import write_file_to_kafka
from _utils.stages.dfp_length_checker import DFPLengthChecker
from morpheus.config import Config
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.stages.input.kafka_source_stage import KafkaSourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

if (typing.TYPE_CHECKING):
from kafka import KafkaConsumer


@pytest.mark.kafka
def test_kafka_source_stage_pipe(config, kafka_bootstrap_servers: str, kafka_topics: typing.Tuple[str, str]) -> None:
Expand Down Expand Up @@ -93,90 +94,40 @@ def test_multi_topic_kafka_source_stage_pipe(config, kafka_bootstrap_servers: st
assert_results(comp_stage.get_results())


class OffsetChecker(SinglePortStage):
"""
Verifies that the kafka offsets are being updated as a way of verifying that the
consumer is performing a commit.
"""

def __init__(self, c: Config, bootstrap_servers: str, group_id: str):
super().__init__(c)

# Importing here so that running without the --run_kafka flag won't fail due
# to not having the kafka libs installed
from kafka import KafkaAdminClient

self._client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
self._group_id = group_id
self._offsets = None

@property
def name(self) -> str:
return "morpheus_offset_checker"

def accepted_types(self) -> typing.Tuple:
"""
Accepted input types for this stage are returned.

Returns
-------
typing.Tuple
Accepted input types.

"""
return (typing.Any, )

def supports_cpp_node(self):
return False

def _offset_checker(self, x):
at_least_one_gt = False
new_offsets = self._client.list_consumer_group_offsets(self._group_id)

if self._offsets is not None:
for (topic_partition, prev_offset) in self._offsets.items():
new_offset = new_offsets[topic_partition]

assert new_offset.offset >= prev_offset.offset

if new_offset.offset > prev_offset.offset:
at_least_one_gt = True

assert at_least_one_gt

self._offsets = new_offsets

return x

def _build_single(self, builder: mrc.Builder, input_stream):
node = builder.make_node(self.unique_name, ops.map(self._offset_checker))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]


@pytest.mark.kafka
@pytest.mark.parametrize('async_commits', [True, False])
@pytest.mark.parametrize('num_records', [10, 100, 1000])
def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str]) -> None:
def test_kafka_source_commit(num_records: int,
async_commits: bool,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str],
kafka_consumer: "KafkaConsumer") -> None:
group_id = 'morpheus'

data = [{'v': i} for i in range(num_records)]
num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data)
assert num_written == num_records

kafka_consumer.subscribe([kafka_topics.input_topic])
seek_to_beginning(kafka_consumer)
partitions = kafka_consumer.assignment()

# This method does not advance the consumer, and even if it did, this consumer has a different group_id than the
# source stage
expected_offsets = kafka_consumer.end_offsets(partitions)

pipe = LinearPipeline(config)
pipe.set_source(
KafkaSourceStage(config,
bootstrap_servers=kafka_bootstrap_servers,
input_topic=kafka_topics.input_topic,
auto_offset_reset="earliest",
poll_interval="1seconds",
group_id='morpheus',
group_id=group_id,
client_id='morpheus_kafka_source_commit',
stop_after=num_records,
async_commits=False))

pipe.add_stage(OffsetChecker(config, bootstrap_servers=kafka_bootstrap_servers, group_id='morpheus'))
async_commits=async_commits))
pipe.add_stage(TriggerStage(config))

pipe.add_stage(DeserializeStage(config))
Expand All @@ -187,6 +138,17 @@ def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str,

assert_results(comp_stage.get_results())

from kafka import KafkaAdminClient
admin_client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_servers, client_id='offset_checker')
offsets = admin_client.list_consumer_group_offsets(group_id)

# The broker may have created additional partitions, offsets should be a superset of expected_offsets
for (topic_partition, expected_offset) in expected_offsets.items():
# The value of the offsets dict being returned is a tuple of (offset, metadata), while the value of the
# expected_offsets is just the offset.
actual_offset = offsets[topic_partition][0]
assert actual_offset == expected_offset


@pytest.mark.kafka
@pytest.mark.parametrize('num_records', [1000])
Expand Down