Skip to content

[BUG] Not all messages immediately visible when yielded from a Python source #330

Closed

Description

Describe the bug
When a Python source yields a message it isn't always immediately visible to downstream nodes until one of two things happens:

  1. The source yields more messages
  2. An on-complete occurrs

I ran into this with the rabbitmq source stage which will consume several thousand records and emit them as a single large MessageMeta which isn't seen by the pipeline, until shutdown or until more messages are published into the exchange.

Steps/Code to reproduce bug
The following code snippet will sleep after emitting every 5th message. However the monitor stage will only record n-1 messages.

import logging
import os
import time

import click
import srf

import cudf

from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.messages import MessageMeta
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.logger import configure_logging


class SimpleSrc(SingleOutputSource):

    def __init__(self, c: Config, count=20) -> None:
        super().__init__(c)
        self.count = count

    @property
    def name(self) -> str:
        return "simple"

    def supports_cpp_node(self):
        return False

    def _generate_frames(self):
        for i in range(1, self.count + 1):
            df = cudf.DataFrame({"col": [i]})
            m = MessageMeta(df)

            yield m

            if i % 5 == 0:
                time.sleep(10)

    def _build_source(self, builder: srf.Builder) -> StreamPair:
        src = builder.make_source(self.unique_name, self._generate_frames)
        return src, MessageMeta


@click.command()
@click.option('--use_cpp', default=False, is_flag=True)
@click.option(
    "--num_threads",
    default=os.cpu_count(),
    type=click.IntRange(min=1),
    help="Number of internal pipeline threads to use",
)
@click.option(
    "--num_messages",
    default=20,
    type=click.IntRange(min=1),
    help="Number of messages to emit",
)
def run_pipeline(use_cpp, num_threads, num_messages):
    configure_logging(log_level=logging.DEBUG)
    config = Config()
    CppConfig.set_should_use_cpp(use_cpp)
    config.mode = PipelineModes.OTHER
    config.num_threads = num_threads

    print("Config: \n%s", config.to_string())
    print("CPP Enabled: {}".format(CppConfig.get_should_use_cpp()))

    pipeline = LinearPipeline(config)
    pipeline.set_source(SimpleSrc(config, count=num_messages))
    pipeline.add_stage(MonitorStage(config, description="Source rate"))
    pipeline.add_stage(DeserializeStage(config))
    pipeline.add_stage(MonitorStage(config, description="Deserialize rate"))
    pipeline.add_stage(SerializeStage(config))
    pipeline.add_stage(MonitorStage(config, description="Serialize rate"))
    pipeline.add_stage(WriteToFileStage(config, filename="/tmp/test-source.csv", overwrite=True))
    pipeline.run()


if __name__ == "__main__":
    run_pipeline()

Expected behavior
All yielded messages are visible to the pipeline

Environment overview (please complete the following information)

  • Environment location: Bare-metal
  • Method of Morpheus install: from source
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

  • Status

    Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions