Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka offset checking test #1212

Merged
Prev Previous commit
Next Next commit
Parametrize on async_commits
  • Loading branch information
dagardner-nv committed Sep 22, 2023
commit 56332b189b8b82ae53f9f4c798d9568f1b563ed1
14 changes: 8 additions & 6 deletions tests/test_kafka_source_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
import os
import typing

import mrc
import pandas as pd
import pytest
from mrc.core import operators as ops

from _utils import TEST_DIRS
from _utils import assert_results
Expand All @@ -30,13 +28,15 @@
from _utils.stages.dfp_length_checker import DFPLengthChecker
from morpheus.config import Config
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.stages.input.kafka_source_stage import KafkaSourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

if (typing.TYPE_CHECKING):
from kafka import KafkaConsumer


@pytest.mark.kafka
def test_kafka_source_stage_pipe(config, kafka_bootstrap_servers: str, kafka_topics: typing.Tuple[str, str]) -> None:
Expand Down Expand Up @@ -95,9 +95,11 @@ def test_multi_topic_kafka_source_stage_pipe(config, kafka_bootstrap_servers: st


@pytest.mark.kafka
@pytest.mark.parametrize('async_commits', [True, False])
@pytest.mark.parametrize('num_records', [10, 100, 1000])
def test_kafka_source_commit(num_records,
config,
def test_kafka_source_commit(num_records: int,
async_commits: bool,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str],
kafka_consumer: "KafkaConsumer") -> None:
Expand Down Expand Up @@ -125,7 +127,7 @@ def test_kafka_source_commit(num_records,
group_id=group_id,
client_id='morpheus_kafka_source_commit',
stop_after=num_records,
async_commits=False))
async_commits=async_commits))
pipe.add_stage(TriggerStage(config))

pipe.add_stage(DeserializeStage(config))
Expand Down
Loading