Skip to content

Commit

Permalink
Replace deprecated usage of make_node and make_node_full (#839)
Browse files Browse the repository at this point in the history
* `make_node_full` has been deprecated in MRC as has passing an on-next method directly to `make_node`
* Removed a few debug warnings from examples

fixes #782

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #839
  • Loading branch information
dagardner-nv authored Apr 13, 2023
1 parent a09d462 commit 93b81c1
Show file tree
Hide file tree
Showing 60 changed files with 238 additions and 276 deletions.
10 changes: 6 additions & 4 deletions docs/source/developer_guide/guides/1_simple_python_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ We start our class definition with a few basic imports:
import typing

import mrc
from mrc.core import operators as ops

from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
Expand Down Expand Up @@ -76,15 +77,15 @@ Our `on_data` method accepts an incoming message and returns a message. The retu
Finally, the `_build_single` method will be used at stage build time to construct our node and wire it into the pipeline. `_build_single` receives an instance of an MRC segment builder (`mrc.Builder`) along with a `StreamPair` instance, which is a tuple consisting of our parent node and its output type. We will be using the builder instance to construct a node from our stage and connecting it to the Morpheus pipeline. The return type of `_build_single` is also a `StreamPair` which will be comprised of our node along with its data type.
```python
def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
```

For our purposes, a Morpheus _stage_ defines the input data type the stage will accept, the unit of work to be performed on that data, and the output data type. In contrast each individual node or nodes comprising a _stage_'s unit of work are wired into the underlying MRC execution graph. To build the node, we will call the `make_node` method of the builder instance, passing it our `unique_name` and `on_data` methods. We used the `unique_name` property, which will take the `name` property which we already defined and append a unique id to it.
For our purposes, a Morpheus _stage_ defines the input data type the stage will accept, the unit of work to be performed on that data, and the output data type. In contrast each individual node or nodes comprising a _stage_'s unit of work are wired into the underlying MRC execution pipeline. To build the node, we will call the `make_node` method of the builder instance, passing it the `unique_name` property method and applying MRC's map operator to the `on_data` method. We used the `unique_name` property, which will take the `name` property which we already defined and append a unique id to it.
```python
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))
```

Next, we will define an edge connecting our new node to our parent node:
Expand All @@ -102,6 +103,7 @@ return node, input_stream[1]
import typing

import mrc
from mrc.core import operators as ops

from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
Expand Down Expand Up @@ -129,7 +131,7 @@ class PassThruStage(SinglePortStage):
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
Expand Down
9 changes: 5 additions & 4 deletions docs/source/developer_guide/guides/2_real_world_phishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ Our `_build_single` method remains unchanged from the previous example; even tho
import typing

import mrc
from mrc.core import operators as ops

from morpheus.common import TypeId
from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages.message_meta import MessageMeta
Expand Down Expand Up @@ -169,7 +170,7 @@ class RecipientFeaturesStage(SinglePortStage):
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
Expand Down Expand Up @@ -548,7 +549,7 @@ morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phis
preprocess --vocab_hash_file=data/bert-base-uncased-hash.txt --truncation=true --do_lower_case=true --add_special_tokens=false \
inf-triton --model_name=phishing-bert-onnx --server_url=localhost:8001 --force_convert_inputs=true \
monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \
filter --threshold=0.9 \
filter --threshold=0.9 --filter_source=TENSOR \
serialize \
to-file --filename=/tmp/detections.jsonlines --overwrite
```
Expand Down Expand Up @@ -625,7 +626,7 @@ class RecipientFeaturesStage(SinglePortStage):
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
Expand Down
2 changes: 1 addition & 1 deletion docs/source/developer_guide/guides/3_simple_cpp_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea
if self._build_cpp_node():
node = morpheus_example_cpp.PassThruStage(builder, self.unique_name)
else:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))

builder.make_edge(input_stream[0], node)
return node, input_stream[1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,11 @@ Let's first look at the module implementation structure before diving deeper int
> Note: Modules can be used for more than just creating middle nodes to connect sources and sinks. Additionally, it can be used to construct Source and Sink nodes.
```py
import mrc
import typing

import mrc
from mrc.core import operators as ops

from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import register_module

Expand All @@ -395,11 +397,8 @@ def module_init(builder: mrc.Builder):

# Your implementation goes here...

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(on_data), ops.filter(lambda x: x is not None)).subscribe(sub)

# Here we are creating a node.
node = builder.make_node_full(module_id, node_fn)
node = builder.make_node(module_id, ops.map(on_data), ops.filter(lambda x: x is not None))

# Register input and output port name for a module.
builder.register_module_input("<input port name>", node)
Expand Down
2 changes: 1 addition & 1 deletion examples/abp_pcap_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Alternately the Morpheus command line could have been used to accomplish the sam
From the root of the Morpheus repo run:
```bash
morpheus --log_level INFO --plugin "examples/abp_pcap_detection/abp_pcap_preprocessing.py" \
run --use_cpp False --pipeline_batch_size 50000 --model_max_batch_size 40000 \
run --use_cpp False --pipeline_batch_size 100000 --model_max_batch_size 100000 \
pipeline-fil --model_fea_length 13 --label=probs \
from-file --filename examples/data/abp_pcap_dump.jsonlines --filter_null False \
deserialize \
Expand Down
4 changes: 2 additions & 2 deletions examples/abp_pcap_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@
)
@click.option(
"--pipeline_batch_size",
default=50000,
default=100000,
type=click.IntRange(min=1),
help=("Internal batch size for the pipeline. Can be much larger than the model batch size. "
"Also used for Kafka consumers."),
)
@click.option(
"--model_max_batch_size",
default=40000,
default=100000,
type=click.IntRange(min=1),
help="Max batch size to use for the model.",
)
Expand Down
3 changes: 2 additions & 1 deletion examples/developer_guide/1_simple_python_stage/pass_thru.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import typing

import mrc
from mrc.core import operators as ops

from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
Expand Down Expand Up @@ -43,7 +44,7 @@ def on_data(self, message: typing.Any):
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import typing

import mrc
from mrc.core import operators as ops

from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
Expand Down Expand Up @@ -87,7 +88,7 @@ def on_data(self, message: MessageMeta) -> MessageMeta:
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
3 changes: 2 additions & 1 deletion examples/developer_guide/2_1_real_world_phishing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from recipient_features_stage import RecipientFeaturesStage

import morpheus
from morpheus.common import FilterSource
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.pipeline import LinearPipeline
Expand Down Expand Up @@ -92,7 +93,7 @@ def run_pipeline():
pipeline.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))

# Filter values lower than 0.9
pipeline.add_stage(FilterDetectionsStage(config, threshold=0.9))
pipeline.add_stage(FilterDetectionsStage(config, threshold=0.9, filter_source=FilterSource.TENSOR))

# Write the to the output file
pipeline.add_stage(SerializeStage(config))
Expand Down
3 changes: 2 additions & 1 deletion examples/developer_guide/3_simple_cpp_stage/pass_thru.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import typing

import mrc
from mrc.core import operators as ops

from _lib import morpheus_example as morpheus_example_cpp
from morpheus.cli.register_stage import register_stage
Expand Down Expand Up @@ -44,7 +45,7 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea
if self._build_cpp_node():
node = morpheus_example_cpp.PassThruStage(builder, self.unique_name)
else:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))

builder.make_edge(input_stream[0], node)
return node, input_stream[1]
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,7 @@ def on_data(self, file_objects: fsspec.core.OpenFiles):
return output_batches

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(self.on_data), ops.flatten()).subscribe(sub)

stream = builder.make_node_full(self.unique_name, node_fn)
stream = builder.make_node(self.unique_name, ops.map(self.on_data), ops.flatten())
builder.make_edge(input_stream[0], stream)

return stream, typing.List[fsspec.core.OpenFiles]
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,9 @@ def convert_to_dataframe(self, s3_object_batch: typing.Tuple[fsspec.core.OpenFil
raise

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(self.convert_to_dataframe), ops.on_completed(self._close_dask_cluster)).subscribe(sub)

stream = builder.make_node_full(self.unique_name, node_fn)
stream = builder.make_node(self.unique_name,
ops.map(self.convert_to_dataframe),
ops.on_completed(self._close_dask_cluster))
builder.make_edge(input_stream[0], stream)

return stream, cudf.DataFrame
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import mrc
from mlflow.tracking.client import MlflowClient
from mrc.core import operators as ops

from morpheus.config import Config
from morpheus.messages.multi_ae_message import MultiAEMessage
Expand Down Expand Up @@ -109,7 +110,7 @@ def on_data(self, message: MultiDFPMessage):
return output_message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)

# node.launch_options.pe_count = self._config.num_threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,7 @@ def on_data(self, message: MultiAEMessage):
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(self.on_data)).subscribe(sub)

stream = builder.make_node_full(self.unique_name, node_fn)
stream = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], stream)

return stream, MultiAEMessage
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ def on_data(self, message: MultiAEMessage):
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(self.on_data), ops.filter(lambda x: x is not None)).subscribe(sub)

stream = builder.make_node_full(self.unique_name, node_fn)
stream = builder.make_node(self.unique_name, ops.map(self.on_data), ops.filter(lambda x: x is not None))
builder.make_edge(input_stream[0], stream)

return stream, input_stream[1]
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ def process_features(self, message: MultiDFPMessage):
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(self.process_features)).subscribe(sub)

node = builder.make_node_full(self.unique_name, node_fn)
node = builder.make_node(self.unique_name, ops.map(self.process_features))
builder.make_edge(input_stream[0], node)

# node.launch_options.pe_count = self._config.num_threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,7 @@ def on_data(self, message: DFPMessageMeta):
return result

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(self.on_data), ops.filter(lambda x: x is not None)).subscribe(sub)

stream = builder.make_node_full(self.unique_name, node_fn)
stream = builder.make_node(self.unique_name, ops.map(self.on_data), ops.filter(lambda x: x is not None))
builder.make_edge(input_stream[0], stream)

return stream, MultiDFPMessage
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,7 @@ def extract_users(self, message: cudf.DataFrame):
return output_messages

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(self.extract_users), ops.flatten()).subscribe(sub)

stream = builder.make_node_full(self.unique_name, node_fn)
stream = builder.make_node(self.unique_name, ops.map(self.extract_users), ops.flatten())
builder.make_edge(input_stream[0], stream)

return stream, DFPMessageMeta
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,7 @@ def on_data(self, message: MultiDFPMessage):
return output_message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(self.on_data), ops.filter(lambda x: x is not None)).subscribe(sub)

stream = builder.make_node_full(self.unique_name, node_fn)
stream = builder.make_node(self.unique_name, ops.map(self.on_data), ops.filter(lambda x: x is not None))
builder.make_edge(input_stream[0], stream)

return stream, MultiAEMessage
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import mrc
import pandas as pd
from mrc.core import operators as ops

from morpheus.config import Config
from morpheus.io import serializers
Expand Down Expand Up @@ -119,7 +120,7 @@ def write_to_files(x: MultiAEMessage):

return x

dfp_viz_postproc = builder.make_node(self.unique_name, write_to_files)
dfp_viz_postproc = builder.make_node(self.unique_name, ops.map(write_to_files))

builder.make_edge(stream, dfp_viz_postproc)
stream = dfp_viz_postproc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import typing

import mrc
from mrc.core import operators as ops

from morpheus.config import Config
from morpheus.pipeline.single_port_stage import SinglePortStage
Expand Down Expand Up @@ -61,7 +62,7 @@ def supports_cpp_node(self):
def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
stream = input_stream[0]

node = builder.make_node(self.unique_name, self._s3_writer)
node = builder.make_node(self.unique_name, ops.map(self._s3_writer))
builder.make_edge(stream, node)

stream = node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import typing

import mrc
from mrc.core import operators as ops

import cuml

Expand Down Expand Up @@ -70,6 +71,6 @@ def _process_message(self, message: GraphSAGEMultiMessage):
return message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self._process_message)
node = builder.make_node(self.unique_name, ops.map(self._process_message))
builder.make_edge(input_stream[0], node)
return node, MultiMessage
Loading

0 comments on commit 93b81c1

Please sign in to comment.