Skip to content

Commit

Permalink
Separate Pipeline type inference/checking & MRC pipeline construction (
Browse files Browse the repository at this point in the history
…#1233)

* Builds upon changes in PR #1175
* Type inference/checking moved to `Pipeline.pre_build` and doesn't require MRC
* Wraps input/output types between stages into a `StageSchema` class which can later be expanded to describe tensors and dataframe columns.
* `StreamWrapper` renamed to `BaseStage`.
* `BaseStage` defines a new abstract method `compute_schema`
* Fix unrelated bug in `generate_viz_frames_stage.py` where the `overwrite` argument was ignored leading to a failed assert.
* Calling `add_edge`, `add_segment_edge`, or `add_stage` after calling `build` will trigger a failed assert.

fixes #229
fixes #230 


## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1233
  • Loading branch information
dagardner-nv authored Oct 19, 2023
1 parent ec6f12c commit a0084e3
Show file tree
Hide file tree
Showing 114 changed files with 2,047 additions and 1,141 deletions.
10 changes: 5 additions & 5 deletions examples/developer_guide/1_simple_python_stage/pass_thru.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
from mrc.core import operators as ops

from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair


@register_stage("pass-thru")
class PassThruStage(SinglePortStage):
class PassThruStage(PassThruTypeMixin, SinglePortStage):
"""
A Simple Pass Through Stage
"""
Expand All @@ -43,8 +43,8 @@ def on_data(self, message: typing.Any):
# Return the message for the next stage
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)
builder.make_edge(input_node, node)

return node, input_stream[1]
return node
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair


@register_stage("recipient-features", modes=[PipelineModes.NLP])
class RecipientFeaturesStage(SinglePortStage):
class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage):
"""
Pre-processing stage which counts the number of recipients in an email's metadata.
Expand Down Expand Up @@ -87,8 +87,8 @@ def on_data(self, message: MessageMeta) -> MessageMeta:
# Return the message for the next stage
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)
builder.make_edge(input_node, node)

return node, input_stream[1]
return node
14 changes: 8 additions & 6 deletions examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.pipeline.stage_schema import StageSchema

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,27 +86,29 @@ def name(self) -> str:
def supports_cpp_node(self) -> bool:
return False

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)

def stop(self):
# Indicate we need to stop
self._stop_requested = True

return super().stop()

def _build_source(self, builder: mrc.Builder) -> StreamPair:
node = builder.make_source(self.unique_name, self.source_generator)
return node, MessageMeta
def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
return builder.make_source(self.unique_name, self.source_generator)

def source_generator(self):
try:
while not self._stop_requested:
(method_frame, header_frame, body) = self._channel.basic_get(self._queue_name)
(method_frame, _, body) = self._channel.basic_get(self._queue_name)
if method_frame is not None:
try:
buffer = StringIO(body.decode("utf-8"))
df = cudf.io.read_json(buffer, orient='records', lines=True)
yield MessageMeta(df=df)
except Exception as ex:
logger.exception("Error occurred converting RabbitMQ message to Dataframe: {}".format(ex))
logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex)
finally:
self._channel.basic_ack(method_frame.delivery_tag)
else:
Expand Down
12 changes: 6 additions & 6 deletions examples/developer_guide/2_2_rabbitmq/write_to_rabbitmq_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger(__name__)


@register_stage("to-rabbitmq")
class WriteToRabbitMQStage(SinglePortStage):
class WriteToRabbitMQStage(PassThruTypeMixin, SinglePortStage):
"""
Source stage used to load messages from a RabbitMQ queue.
Expand Down Expand Up @@ -68,10 +68,10 @@ def accepted_types(self) -> typing.Tuple:
def supports_cpp_node(self) -> bool:
return False

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_sink(self.unique_name, self.on_data, self.on_error, self.on_complete)
builder.make_edge(input_stream[0], node)
return (node, input_stream[1])
builder.make_edge(input_node, node)
return node

def on_data(self, message: MessageMeta) -> MessageMeta:
df = message.df
Expand All @@ -85,7 +85,7 @@ def on_data(self, message: MessageMeta) -> MessageMeta:
return message

def on_error(self, ex: Exception):
logger.exception("Error occurred : {}".format(ex))
logger.exception("Error occurred : %s", ex)
self._connection.close()

def on_complete(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
from mrc.core import operators as ops

from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair


@register_stage("pass-thru-cpp")
class PassThruStage(SinglePortStage):
class PassThruStage(PassThruTypeMixin, SinglePortStage):

@property
def name(self) -> str:
Expand All @@ -40,14 +40,14 @@ def on_data(self, message: typing.Any):
# Return the message for the next stage
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
if self._build_cpp_node():
from . import morpheus_example as morpheus_example_cpp
from _lib import morpheus_example as morpheus_example_cpp

# pylint: disable=c-extension-no-member
node = morpheus_example_cpp.PassThruStage(builder, self.unique_name)
else:
node = builder.make_node(self.unique_name, ops.map(self.on_data))

builder.make_edge(input_stream[0], node)
return node, input_stream[1]
builder.make_edge(input_node, node)
return node
3 changes: 2 additions & 1 deletion examples/developer_guide/3_simple_cpp_stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import logging
import os

from _lib.pass_thru import PassThruStage
from pass_thru import PassThruStage

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@

import cudf

from _lib import morpheus_rabbit as morpheus_rabbit_cpp
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.pipeline.stage_schema import StageSchema

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -80,12 +79,17 @@ def __init__(self,
def name(self) -> str:
return "from-rabbitmq"

@classmethod
def supports_cpp_node(cls) -> bool:
def supports_cpp_node(self) -> bool:
return True

def _build_source(self, builder: mrc.Builder) -> StreamPair:
def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)

def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
if self._build_cpp_node():
# pylint: disable=c-extension-no-member,no-name-in-module
from _lib import morpheus_rabbit as morpheus_rabbit_cpp

node = morpheus_rabbit_cpp.RabbitMQSourceStage(builder,
self.unique_name,
self._host,
Expand All @@ -96,7 +100,8 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair:
else:
self.connect()
node = builder.make_source(self.unique_name, self.source_generator)
return node, MessageMeta

return node

def connect(self):
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host))
Expand All @@ -114,14 +119,14 @@ def connect(self):
def source_generator(self):
try:
while not self._stop_requested:
(method_frame, header_frame, body) = self._channel.basic_get(self._queue_name)
(method_frame, _, body) = self._channel.basic_get(self._queue_name)
if method_frame is not None:
try:
buffer = StringIO(body.decode("utf-8"))
df = cudf.io.read_json(buffer, orient='records', lines=True)
yield MessageMeta(df=df)
except Exception as ex:
logger.exception("Error occurred converting RabbitMQ message to Dataframe: {}".format(ex))
logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex)
finally:
self._channel.basic_ack(method_frame.delivery_tag)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger(__name__)


@register_stage("to-rabbitmq")
class WriteToRabbitMQStage(SinglePortStage):
class WriteToRabbitMQStage(PassThruTypeMixin, SinglePortStage):
"""
Source stage used to load messages from a RabbitMQ queue.
Expand Down Expand Up @@ -68,10 +68,10 @@ def accepted_types(self) -> typing.Tuple:
def supports_cpp_node(self) -> bool:
return False

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_sink(self.unique_name, self.on_data, self.on_error, self.on_complete)
builder.make_edge(input_stream[0], node)
return (node, input_stream[1])
builder.make_edge(input_node, node)
return node

def on_data(self, message: MessageMeta) -> MessageMeta:
df = message.df
Expand All @@ -85,7 +85,7 @@ def on_data(self, message: MessageMeta) -> MessageMeta:
return message

def on_error(self, ex: Exception):
logger.exception("Error occurred : {}".format(ex))
logger.exception("Error occurred : %s", ex)
self._connection.close()

def on_complete(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from morpheus.config import Config
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.pipeline.stage_schema import StageSchema

logger = logging.getLogger(f"morpheus.{__name__}")

Expand Down Expand Up @@ -108,6 +108,9 @@ def accepted_types(self) -> typing.Tuple:
"""Accepted incoming types for this stage"""
return (fsspec.core.OpenFiles, )

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(typing.Tuple[fsspec.core.OpenFiles, int])

def on_data(self, file_objects: fsspec.core.OpenFiles) -> typing.List[typing.Tuple[fsspec.core.OpenFiles, int]]:
"""
Batches incoming data according to date, period and sampling, potentially filtering data based on file dates.
Expand Down Expand Up @@ -174,8 +177,8 @@ def on_data(self, file_objects: fsspec.core.OpenFiles) -> typing.List[typing.Tup

return output_batches

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
stream = builder.make_node(self.unique_name, ops.map(self.on_data), ops.flatten())
builder.make_edge(input_stream[0], stream)
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name, ops.map(self.on_data), ops.flatten())
builder.make_edge(input_node, node)

return stream, typing.Tuple[fsspec.core.OpenFiles, int]
return node
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from morpheus.controllers.file_to_df_controller import FileToDFController
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.column_info import DataFrameInputSchema

logger = logging.getLogger(f"morpheus.{__name__}")
Expand Down Expand Up @@ -84,10 +84,13 @@ def accepted_types(self) -> typing.Tuple:
"""Accepted input types."""
return (typing.Any, )

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
stream = builder.make_node(self.unique_name,
ops.map(self._controller.convert_to_dataframe),
ops.on_completed(self._controller.close))
builder.make_edge(input_stream[0], stream)
def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(pd.DataFrame)

return stream, pd.DataFrame
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name,
ops.map(self._controller.convert_to_dataframe),
ops.on_completed(self._controller.close))
builder.make_edge(input_node, node)

return node
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from morpheus.config import Config
from morpheus.messages.multi_ae_message import MultiAEMessage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.pipeline.stage_schema import StageSchema

from ..messages.multi_dfp_message import MultiDFPMessage
from ..utils.model_cache import ModelCache
Expand Down Expand Up @@ -72,6 +72,9 @@ def accepted_types(self) -> typing.Tuple:
"""Accepted input types."""
return (MultiDFPMessage, )

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MultiAEMessage)

def get_model(self, user: str) -> ModelCache:
"""
Return the model for the given user. If a model doesn't exist for the given user, the model for the generic
Expand Down Expand Up @@ -127,10 +130,10 @@ def on_data(self, message: MultiDFPMessage) -> MultiDFPMessage:

return output_message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)
builder.make_edge(input_node, node)

# node.launch_options.pe_count = self._config.num_threads

return node, MultiAEMessage
return node
Loading

0 comments on commit a0084e3

Please sign in to comment.