Closed
Description
Describe the bug
When a Python source yields a message it isn't always immediately visible to downstream nodes until one of two things happens:
- The source yields more messages
- An on-complete occurrs
I ran into this with the rabbitmq source stage which will consume several thousand records and emit them as a single large MessageMeta
which isn't seen by the pipeline, until shutdown or until more messages are published into the exchange.
Steps/Code to reproduce bug
The following code snippet will sleep after emitting every 5th message. However the monitor stage will only record n-1 messages.
import logging
import os
import time
import click
import srf
import cudf
from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.messages import MessageMeta
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.logger import configure_logging
class SimpleSrc(SingleOutputSource):
def __init__(self, c: Config, count=20) -> None:
super().__init__(c)
self.count = count
@property
def name(self) -> str:
return "simple"
def supports_cpp_node(self):
return False
def _generate_frames(self):
for i in range(1, self.count + 1):
df = cudf.DataFrame({"col": [i]})
m = MessageMeta(df)
yield m
if i % 5 == 0:
time.sleep(10)
def _build_source(self, builder: srf.Builder) -> StreamPair:
src = builder.make_source(self.unique_name, self._generate_frames)
return src, MessageMeta
@click.command()
@click.option('--use_cpp', default=False, is_flag=True)
@click.option(
"--num_threads",
default=os.cpu_count(),
type=click.IntRange(min=1),
help="Number of internal pipeline threads to use",
)
@click.option(
"--num_messages",
default=20,
type=click.IntRange(min=1),
help="Number of messages to emit",
)
def run_pipeline(use_cpp, num_threads, num_messages):
configure_logging(log_level=logging.DEBUG)
config = Config()
CppConfig.set_should_use_cpp(use_cpp)
config.mode = PipelineModes.OTHER
config.num_threads = num_threads
print("Config: \n%s", config.to_string())
print("CPP Enabled: {}".format(CppConfig.get_should_use_cpp()))
pipeline = LinearPipeline(config)
pipeline.set_source(SimpleSrc(config, count=num_messages))
pipeline.add_stage(MonitorStage(config, description="Source rate"))
pipeline.add_stage(DeserializeStage(config))
pipeline.add_stage(MonitorStage(config, description="Deserialize rate"))
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(MonitorStage(config, description="Serialize rate"))
pipeline.add_stage(WriteToFileStage(config, filename="/tmp/test-source.csv", overwrite=True))
pipeline.run()
if __name__ == "__main__":
run_pipeline()
Expected behavior
All yielded messages are visible to the pipeline
Environment overview (please complete the following information)
- Environment location: Bare-metal
- Method of Morpheus install: from source
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Metadata
Assignees
Labels
Type
Projects
Status
Done