Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken developer guide code examples #333

Merged
5 commits merged into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ add_custom_target(${PROJECT_NAME}_style_checks
# Include the main morpheus code
add_subdirectory(morpheus)

if(MORPHEUS_BUILD_EXAMPLES)
add_subdirectory(examples)
endif()

# Get all of the python files to copy to the build directory
file(GLOB_RECURSE MORPHEUS_PYTHON_FILES
RELATIVE ${PROJECT_SOURCE_DIR}
Expand Down
6 changes: 6 additions & 0 deletions ci/scripts/jenkins/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ source ${WORKSPACE}/ci/scripts/jenkins/common.sh

gpuci_logger "Creating conda env"
rm -rf ${MORPHEUS_ROOT}/.cache/ ${MORPHEUS_ROOT}/build/

conda config --add pkgs_dirs /opt/conda/pkgs
conda config --env --add channels conda-forge
conda config --env --set channel_alias ${CONDA_CHANNEL_ALIAS:-"https://conda.anaconda.org"}
Expand Down Expand Up @@ -48,9 +49,14 @@ cmake --build build -j --parallel ${PARALLEL_LEVEL}

gpuci_logger "sccache usage for morpheus build:"
sccache --show-stats
sccache --zero-stats &> /dev/null

gpuci_logger "Installing Morpheus"
cmake -DCOMPONENT=Wheel -P ${MORPHEUS_ROOT}/build/cmake_install.cmake
pip install ${MORPHEUS_ROOT}/build/wheel

gpuci_logger "sccache usage for building C++ examples:"
sccache --show-stats

gpuci_logger "Archiving results"
mamba pack --quiet --force --ignore-missing-files --n-threads ${PARALLEL_LEVEL} -n morpheus -o ${WORKSPACE_TMP}/conda_env.tar.gz
Expand Down
52 changes: 46 additions & 6 deletions docs/source/developer_guide/guides/1_simple_python_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,29 @@ Morpheus makes use of the SRF graph-execution framework. Morpheus pipelines are

To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn't do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed.

Defining this stage requires us to specify the stage type. Morpheus stages contain a single input and a single output inherited from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class. We start our class definition with a few basic imports:
Defining this stage requires us to specify the stage type. Morpheus stages contain a single input and a single output inherited from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class.

Optionally, stages can be registered as a command with the Morpheus CLI using the `register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [numpydoc](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly the class's docstrings will be exposed in the help string of the stage on the command line.

We start our class definition with a few basic imports:

```python
import typing

import srf

from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair


@register_stage("pass-thru")
class PassThruStage(SinglePortStage):
```

There are three methods that need to be defined in our new subclass to implement the stage interface: `name`, `accepted_types`, and `_build_single`. In practice, it is often necessary to define at least one more method which will perform the actual work of the stage; by convention, this method is typically named `on_data`, which we will define in our examples.
There are four methods that need to be defined in our new subclass to implement the stage interface: `name`, `accepted_types`, `supports_cpp_node`, and `_build_single`. In practice, it is often necessary to define at least one more method which will perform the actual work of the stage; by convention, this method is typically named `on_data`, which we will define in our examples.

`name` is a property method; it should return a user-friendly name for the stage. Currently, this property is only used for debugging purposes, and there are no requirements on the content or format of the name.
`name` is a property method; it should return a user-friendly name for the stage. Currently, this property is only used for debugging purposes, and there are no requirements on the content or format of the name. However by convention the string returned by this method should be the same as the string passed to the `register_stage` decorator.
```python
@property
def name(self) -> str:
Expand All @@ -52,6 +60,12 @@ The `accepted_types` method returns a tuple of message classes that this stage a
return (typing.Any,)
```

The `supports_cpp_node` method returns a boolean indicating if the stage has a C++ implementation. Since our example only contains a Python implementation we will return `False` here.
```python
def supports_cpp_node(self) -> bool:
return False
```

Our `on_data` method accepts the incoming message and returns a message. The returned message can be the same message instance that we received as our input or it could be a new message instance. The method is named `on_data` by convention; however, it is not part of the API. In the next section, we will register it as a callback in Morpheus.
```python
def on_data(self, message: typing.Any):
Expand All @@ -64,6 +78,8 @@ Finally, the `_build_single` method will be used at build time to wire our stage
def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
```

In most cases, a Morpheus stage will define and build a single SRF node. In some advanced cases, a stage can construct more than one node. For our purposes, a Morpheus _stage_ defines information about the type of node(s) it builds, while the _node_ is the instance of the stage that is wired into the SRF pipeline. To build the node, we will call the `make_node` method of the segment instance, passing to it our name and our `on_data` method. We used the `unique_name` property, which will take the name property which we already defined and append a unique id to it.
Expand All @@ -87,16 +103,26 @@ import typing

import srf

from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair


@register_stage("pass-thru")
class PassThruStage(SinglePortStage):
"""
A Simple Pass Through Stage
"""

@property
def name(self) -> str:
return "pass-thru"

def accepted_types(self) -> typing.Tuple:
return (typing.Any,)
return (typing.Any, )

def supports_cpp_node(self) -> bool:
return False

def on_data(self, message: typing.Any):
# Return the message for the next stage
Expand All @@ -120,13 +146,13 @@ First we will need to import a few things from Morpheus for this example to work
import logging
import os

from pass_thru import PassThruStage

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging

from pass_thru import PassThruStage
```

Before constructing the pipeline, we need to do a bit of environment configuration, starting with the Morpheus logger:
Expand Down Expand Up @@ -189,6 +215,7 @@ from morpheus.utils.logger import configure_logging

from pass_thru import PassThruStage


def run_pipeline():
# Enable the Morpheus logger
configure_logging(log_level=logging.DEBUG)
Expand Down Expand Up @@ -216,3 +243,16 @@ def run_pipeline():
if __name__ == "__main__":
run_pipeline()
```


### Alternate Morpheus CLI example
The above example makes use of the Morpheus Python API, alternately we could have constructed the same pipeline using the Morpheus command line tool. We will need to pass in the path to our stage via the `--plugin` argument so that it will be visible to the command line tool.

From the root of the Morpheus repo run:
```bash
morpheus --log_level=debug --plugin examples/developer_guide/1_simple_python_stage/pass_thru.py \
run pipeline-other \
from-file --filename=examples/data/email_with_addresses.jsonlines \
pass-thru \
monitor
```
Loading