Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka offset checking test #1212

Merged
Prev Previous commit
Next Next commit
Move the offset checking from a stage to the test body to be executed…
… after the pipeline completes. The reason is that the C++ impl commits the offsets after calling on_next, this also adds the ability to check async commits as well
  • Loading branch information
dagardner-nv committed Sep 22, 2023
commit 3b5d9b98fe35f3b1854b36f74999d5ce29e9a09d
94 changes: 27 additions & 67 deletions tests/test_kafka_source_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from _utils import TEST_DIRS
from _utils import assert_results
from _utils.kafka import seek_to_beginning
from _utils.kafka import write_data_to_kafka
from _utils.kafka import write_file_to_kafka
from _utils.stages.dfp_length_checker import DFPLengthChecker
Expand Down Expand Up @@ -93,90 +94,38 @@ def test_multi_topic_kafka_source_stage_pipe(config, kafka_bootstrap_servers: st
assert_results(comp_stage.get_results())


class OffsetChecker(SinglePortStage):
"""
Verifies that the kafka offsets are being updated as a way of verifying that the
consumer is performing a commit.
"""

def __init__(self, c: Config, bootstrap_servers: str, group_id: str):
super().__init__(c)

# Importing here so that running without the --run_kafka flag won't fail due
# to not having the kafka libs installed
from kafka import KafkaAdminClient

self._client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
self._group_id = group_id
self._offsets = None

@property
def name(self) -> str:
return "morpheus_offset_checker"

def accepted_types(self) -> typing.Tuple:
"""
Accepted input types for this stage are returned.

Returns
-------
typing.Tuple
Accepted input types.

"""
return (typing.Any, )

def supports_cpp_node(self):
return False

def _offset_checker(self, x):
at_least_one_gt = False
new_offsets = self._client.list_consumer_group_offsets(self._group_id)

if self._offsets is not None:
for (topic_partition, prev_offset) in self._offsets.items():
new_offset = new_offsets[topic_partition]

assert new_offset.offset >= prev_offset.offset

if new_offset.offset > prev_offset.offset:
at_least_one_gt = True

assert at_least_one_gt

self._offsets = new_offsets

return x

def _build_single(self, builder: mrc.Builder, input_stream):
node = builder.make_node(self.unique_name, ops.map(self._offset_checker))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]


@pytest.mark.kafka
@pytest.mark.parametrize('num_records', [10, 100, 1000])
def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str]) -> None:
def test_kafka_source_commit(num_records,
config,
kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str],
kafka_consumer: "KafkaConsumer") -> None:
group_id = 'morpheus'

data = [{'v': i} for i in range(num_records)]
num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data)
assert num_written == num_records

kafka_consumer.subscribe([kafka_topics.input_topic])
seek_to_beginning(kafka_consumer)
partitions = kafka_consumer.assignment()

# This method does not advance the consumer, and even if it did, this consumer has a different group_id than the
# source stage
expected_offsets = kafka_consumer.end_offsets(partitions)

pipe = LinearPipeline(config)
pipe.set_source(
KafkaSourceStage(config,
bootstrap_servers=kafka_bootstrap_servers,
input_topic=kafka_topics.input_topic,
auto_offset_reset="earliest",
poll_interval="1seconds",
group_id='morpheus',
group_id=group_id,
client_id='morpheus_kafka_source_commit',
stop_after=num_records,
async_commits=False))

pipe.add_stage(OffsetChecker(config, bootstrap_servers=kafka_bootstrap_servers, group_id='morpheus'))
pipe.add_stage(TriggerStage(config))

pipe.add_stage(DeserializeStage(config))
Expand All @@ -187,6 +136,17 @@ def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str,

assert_results(comp_stage.get_results())

from kafka import KafkaAdminClient
admin_client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_servers, client_id='offset_checker')
offsets = admin_client.list_consumer_group_offsets(group_id)

# The broker may have created additional partitions, offsets should be a superset of expected_offsets
for (topic_partition, expected_offset) in expected_offsets.items():
# The value of the offsets dict being returned is a tuple of (offset, metadata), while the value of the
# expected_offsets is just the offset.
actual_offset = offsets[topic_partition][0]
assert actual_offset == expected_offset


@pytest.mark.kafka
@pytest.mark.parametrize('num_records', [1000])
Expand Down