Skip to content

Commit

Permalink
Resolves #350 -- Adds support for segmenting LinearPipelines (#374)
Browse files Browse the repository at this point in the history
1. Adds a new API call to `LinearPipeline`, `add_segment_boundary([data_type], [is_shared_pointer])`, which will create an egress stage and connected to the most recent linear stage, and an ingress source stage in a new segment. At `build`, these stages will be connected.
Known issue here: nv-morpheus/MRC#176 : Currently specifying the data type has no effect and all underlying objects will utilize the default PythonObject path. This is due to the method we were using to match the `cpptype` of a wrapped python object to the cpptype of an Ingress/EgressPort adapter. Once we're able to add an out of band path for type detection/storage this will be resolve. 
2. Updates `Pipeline` to be segment aware, and makes the necessary updates to Pipeline internals to handle the changes.
3. Updates the `visualize(..)`

Authors:
  - Devin Robison (https://github.com/drobison00)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)
  - Christopher Harris (https://github.com/cwharris)

URL: #374
  • Loading branch information
drobison00 authored Sep 28, 2022
1 parent 85c05fd commit 94235c6
Show file tree
Hide file tree
Showing 16 changed files with 866 additions and 114 deletions.
1 change: 1 addition & 0 deletions examples/abp_pcap_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def run_pipeline(

# Add a monitor stage
pipeline.add_stage(MonitorStage(config, description="Write to file rate", unit="to-file"))

# Build the pipeline here to see types in the vizualization
pipeline.build()

Expand Down
14 changes: 13 additions & 1 deletion morpheus/_lib/src/python_modules/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@
#include <pybind11/functional.h> // IWYU pragma: keep
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <pybind11/stl.h> // IWYU pragma: keep
#include <pybind11/stl.h> // IWYU pragma: keep
#include <pysrf/edge_adapter.hpp>
#include <pysrf/node.hpp>
#include <pysrf/port_builders.hpp>
#include <pysrf/utils.hpp> // for pysrf::import
#include <srf/channel/status.hpp> // for Status
#include <srf/node/edge_connector.hpp>
#include <srf/node/port_registry.hpp>

#include <cstddef>
#include <filesystem>
Expand Down Expand Up @@ -73,6 +77,14 @@ PYBIND11_MODULE(messages, m)
// Allows python objects to keep DataTable objects alive
py::class_<IDataTable, std::shared_ptr<IDataTable>>(m, "DataTable");

srf::pysrf::PortBuilderUtil::register_port_util<std::shared_ptr<MessageMeta>>();
srf::pysrf::PortBuilderUtil::register_port_util<std::shared_ptr<MultiMessage>>();
srf::pysrf::PortBuilderUtil::register_port_util<std::shared_ptr<MultiInferenceMessage>>();
srf::pysrf::PortBuilderUtil::register_port_util<std::shared_ptr<MultiInferenceFILMessage>>();
srf::pysrf::PortBuilderUtil::register_port_util<std::shared_ptr<MultiInferenceNLPMessage>>();
srf::pysrf::PortBuilderUtil::register_port_util<std::shared_ptr<MultiResponseMessage>>();
srf::pysrf::PortBuilderUtil::register_port_util<std::shared_ptr<MultiResponseProbsMessage>>();

// EdgeConnectors for derived classes of MultiMessage to MultiMessage
srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();
Expand Down
54 changes: 49 additions & 5 deletions morpheus/pipeline/linear_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import morpheus.pipeline as _pipeline
from morpheus.config import Config
from morpheus.stages.boundary.linear_boundary_stage import LinearBoundaryEgressStage
from morpheus.stages.boundary.linear_boundary_stage import LinearBoundaryIngressStage

logger = logging.getLogger(__name__)

Expand All @@ -35,8 +37,17 @@ class LinearPipeline(_pipeline.Pipeline):
def __init__(self, c: Config):
super().__init__(c)

self._current_segment_id = ""
self._next_segment_index = 0
self.increment_segment_id()

self._linear_stages: typing.List[_pipeline.StreamWrapper] = []

def increment_segment_id(self):
self._linear_stages = []
self._current_segment_id = f"linear_segment_{self._next_segment_index}"
self._next_segment_index += 1

def set_source(self, source: _pipeline.SourceStage):
"""
Set a pipeline's source stage to consume messages before it begins executing stages. This must be
Expand All @@ -55,15 +66,14 @@ def set_source(self, source: _pipeline.SourceStage):

self._sources.clear()

# Store the source in sources
self._sources.add(source)

if (len(self._linear_stages) > 0):
# TODO(devin): This doesn't seem right, if we add another source, our underlying Pipeline could still have
# any number of dangling nodes.
logger.warning("Clearing %d stages from pipeline", len(self._linear_stages))
self._linear_stages.clear()

# Need to store the source in the pipeline
super().add_node(source)
super().add_node(source, self._current_segment_id)

# Store this as the first one in the linear stages. Must be index 0
self._linear_stages.append(source)
Expand All @@ -84,7 +94,41 @@ def add_stage(self, stage: _pipeline.SinglePortStage):
assert isinstance(stage, _pipeline.SinglePortStage), ("Only `SinglePortStage` stages are accepted in "
"`add_stage()`")

# Add this stage to the segment graph
super().add_node(stage, self._current_segment_id)

# Make an edge between the last node and this one
self.add_edge(self._linear_stages[-1], stage)
super().add_edge(self._linear_stages[-1], stage, self._current_segment_id)

self._linear_stages.append(stage)

def add_segment_boundary(self, data_type=None, as_shared_pointer=False):
if (len(self._linear_stages) == 0):
raise RuntimeError("Cannot create a segment boundary, current segment is empty.")

empty_config = Config()
boundary_egress = LinearBoundaryEgressStage(empty_config,
boundary_port_id=self._current_segment_id,
data_type=data_type)
boundary_ingress = LinearBoundaryIngressStage(empty_config,
boundary_port_id=self._current_segment_id,
data_type=data_type)

# TODO: update to use data_type once typeid is attached to registered objects out of band:
# https://github.com/nv-morpheus/SRF/issues/176
port_id_tuple = (self._current_segment_id, object, False) if data_type else self._current_segment_id

self.add_stage(boundary_egress)
egress_segment_id = self._current_segment_id

self.increment_segment_id()
ingress_segment_id = self._current_segment_id

self._linear_stages.append(boundary_ingress)

super().add_node(boundary_ingress, self._current_segment_id)
super().add_segment_edge(boundary_egress,
egress_segment_id,
boundary_ingress,
ingress_segment_id,
port_id_tuple)
Loading

0 comments on commit 94235c6

Please sign in to comment.