Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source & stage decorators #1364

Merged
merged 116 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
116 commits
Select commit Hold shift + click to select a range
7fd1c4e
Remove unused method
dagardner-nv Nov 3, 2023
fcac874
Update out of date comments and variable names
dagardner-nv Nov 3, 2023
19acdf0
Don't run milvus tests by default, running 'pytest --run_slow' now ta…
dagardner-nv Oct 17, 2023
8e82ba3
Consolidate milvus fixtures
dagardner-nv Oct 17, 2023
5c9379b
Set an explicit storage dir for milvus, ensuring that we always get a…
dagardner-nv Oct 17, 2023
5513dad
Defining MORPHEUS_MILVUS_URI env var avoids milvus startup, milvus_se…
dagardner-nv Oct 18, 2023
e849600
Add _get_random_port method
dagardner-nv Nov 3, 2023
2c48a07
Use new tuple type-hint
dagardner-nv Nov 4, 2023
f79475a
Update code snippets, todo: Updating text
dagardner-nv Nov 4, 2023
bbde0a8
Update to reflect recent changes [no ci]
dagardner-nv Nov 6, 2023
e906235
Use newer tuple type hint
dagardner-nv Nov 6, 2023
42e1388
Update docs for RecipientFeaturesStage
dagardner-nv Nov 6, 2023
4c49102
wip [no ci]
dagardner-nv Nov 6, 2023
65ae66e
wip [no ci]
dagardner-nv Nov 6, 2023
e289018
wip [no ci]
dagardner-nv Nov 6, 2023
9d71d75
Set log level to INFO to ensure monitor output [no ci]
dagardner-nv Nov 6, 2023
efedc35
wip [no ci]
dagardner-nv Nov 6, 2023
077af27
Fix long-standing bug in pass-thru example, the C++ impl only works w…
dagardner-nv Nov 6, 2023
04f2396
Update the example to include two instances of the PassThruStage, the…
dagardner-nv Nov 6, 2023
a154a04
WIP [no ci]
dagardner-nv Nov 6, 2023
f65d8b4
Remove uneeded return statement
dagardner-nv Nov 6, 2023
fa7738c
WIP [no ci]
dagardner-nv Nov 6, 2023
e776419
WIP [no ci]
dagardner-nv Nov 6, 2023
ff76387
No need to rename the stage
dagardner-nv Nov 6, 2023
d2ee5a7
WIP [no ci]
dagardner-nv Nov 6, 2023
e9b7d44
Remove pylint directive from docs [no ci]
dagardner-nv Nov 6, 2023
70d16a5
Remove unused imports and formatting [no ci]
dagardner-nv Nov 6, 2023
480c86c
formatting [no ci]
dagardner-nv Nov 6, 2023
3e91fb7
WIP [no ci]
dagardner-nv Nov 6, 2023
27135a9
Formatting [no ci]
dagardner-nv Nov 7, 2023
442da9b
WIP [no ci]
dagardner-nv Nov 7, 2023
73b3797
WIP
dagardner-nv Nov 7, 2023
4d74e25
WIP [no ci]
dagardner-nv Nov 7, 2023
e090091
Ensure the module instance always receives a unique name by prependin…
dagardner-nv Nov 7, 2023
fcc8844
Add missing module config to builder.load_module calls, update module…
dagardner-nv Nov 7, 2023
affa261
Update to use a simple df of floats
dagardner-nv Nov 7, 2023
e3546ff
wip: [no ci]
dagardner-nv Nov 7, 2023
5231c81
Formatting [no ci]
dagardner-nv Nov 7, 2023
15fad5a
formatting [no ci]
dagardner-nv Nov 7, 2023
b6bd48f
formatting [no ci]
dagardner-nv Nov 7, 2023
d5f9201
Update code snippets to match api changes [no ci]
dagardner-nv Nov 7, 2023
50293db
Add end-to-end test for the code snippets used in the python_modules …
dagardner-nv Nov 7, 2023
262800f
Formatting [no ci]
dagardner-nv Nov 7, 2023
9bf6508
Make task names lower-case to be consistent [no ci]
dagardner-nv Nov 7, 2023
8d3336d
Rename out_streams to out_nodes
dagardner-nv Nov 7, 2023
b9290af
Fix path to renamed script
dagardner-nv Nov 7, 2023
57ecb18
Fix import order
dagardner-nv Nov 7, 2023
e1d75a7
First pass at a stage decorator
dagardner-nv Nov 14, 2023
7dab9f0
Add source decorator
dagardner-nv Nov 15, 2023
aef5ece
Sort imports
dagardner-nv Nov 15, 2023
5d5bcfc
WIP
dagardner-nv Nov 15, 2023
5ef9326
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 15, 2023
250c330
Docstring cleanups [no ci]
dagardner-nv Nov 15, 2023
1ecaa96
WIP [no ci]
dagardner-nv Nov 16, 2023
37d7926
WIP docstrings [no ci]
dagardner-nv Nov 16, 2023
d3699c2
WIP docstrings [no ci]
dagardner-nv Nov 16, 2023
5155dcf
First pass at source decorator [no ci]
dagardner-nv Nov 16, 2023
83f290a
lint cleanups [no ci]
dagardner-nv Nov 16, 2023
45725b5
Fix annotations [no ci]
dagardner-nv Nov 16, 2023
78c3ec3
Disable pylint warnings [no ci]
dagardner-nv Nov 16, 2023
d15f7dc
Fix handling of *args, and more tests [no ci]
dagardner-nv Nov 17, 2023
a430eff
wip [no ci]
dagardner-nv Nov 17, 2023
a7e35db
WIP: [no ci]
dagardner-nv Nov 17, 2023
ab16c49
wip [no ci]
dagardner-nv Nov 17, 2023
113e642
wip [no ci]
dagardner-nv Nov 17, 2023
618444a
wip [no ci]
dagardner-nv Nov 17, 2023
97f7dcd
wip [no ci]
dagardner-nv Nov 17, 2023
6e5cd52
Formatting fixes
dagardner-nv Nov 17, 2023
943e8b7
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 17, 2023
b1f783f
Expose the needed_columns functionality to the stage decorator
dagardner-nv Nov 20, 2023
054a150
wip
dagardner-nv Nov 20, 2023
9a08075
wip
dagardner-nv Nov 20, 2023
44a2c70
Expose the needed_columns functionality to the stage decorator
dagardner-nv Nov 20, 2023
a317299
Add decorated version of recipient_features_stage
dagardner-nv Nov 21, 2023
90acd43
Optionally use the decorator version of the recipient features stage
dagardner-nv Nov 21, 2023
749fa09
Bind default values for keyword arguments to method
dagardner-nv Nov 21, 2023
ee3ae31
Remove binding the keyword argument
dagardner-nv Nov 21, 2023
46c69f5
Test default values for keyword args
dagardner-nv Nov 21, 2023
5763504
Merge branch 'david-stage-decorator' into david-stage-decorator-docs
dagardner-nv Nov 21, 2023
af67576
Make sep_token an explicit keyword only argument
dagardner-nv Nov 21, 2023
f92d44b
Add test for the decorated version of the pass-thru stage
dagardner-nv Nov 21, 2023
a0bbdac
Expand tests to test the decorated version of the recipient features …
dagardner-nv Nov 21, 2023
6e5d83a
Document stage decorator
dagardner-nv Nov 21, 2023
d1dd3ae
Expose the other options as command line flags
dagardner-nv Nov 21, 2023
d08b9d0
WIP
dagardner-nv Nov 21, 2023
824a59a
Rename use_decorator to use_stage_function
dagardner-nv Nov 21, 2023
ce4286b
Remove unused import
dagardner-nv Nov 22, 2023
d321771
Merge branch 'branch-23.11' into david-stage-decorator
dagardner-nv Nov 22, 2023
7163dba
Merge branch 'branch-23.11' into david-23.11-docs-1252
dagardner-nv Nov 22, 2023
c708b4c
Merge branch 'branch-23.11' into david-stage-decorator
dagardner-nv Nov 22, 2023
bbaaa20
Merge branch 'david-stage-decorator' into david-stage-decorator-docs
dagardner-nv Nov 22, 2023
979cc03
Document that C++ impls are available to classes only
dagardner-nv Nov 22, 2023
0c356d6
Merge branch 'david-23.11-docs-1252' into david-stage-decorator-docs
dagardner-nv Nov 22, 2023
6b577b1
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 22, 2023
1b6ea45
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 27, 2023
7b72d03
Merge branch 'david-stage-decorator' into david-stage-decorator-docs …
dagardner-nv Nov 27, 2023
3420efa
Make type annotations a requirement [no ci]
dagardner-nv Nov 27, 2023
9989a79
wip [no ci]
dagardner-nv Nov 27, 2023
b16a25e
wip [no ci]
dagardner-nv Nov 27, 2023
48a22fb
wip [no ci]
dagardner-nv Nov 27, 2023
fe0f7a4
Handle return type of typing.Any [no ci]
dagardner-nv Nov 27, 2023
4d11732
Cleanups [no ci]
dagardner-nv Nov 27, 2023
b4b4a20
Allow config to be specified positionally [no ci]
dagardner-nv Nov 27, 2023
762501f
Allow config to be a positional argument
dagardner-nv Nov 27, 2023
6907139
Update tests [no ci]
dagardner-nv Nov 27, 2023
73ba7a0
WIP: Updating tests [no ci]
dagardner-nv Nov 28, 2023
fac9826
Fill out tests
dagardner-nv Nov 28, 2023
77f67c5
linting fixes
dagardner-nv Nov 28, 2023
02983e8
linting fixes
dagardner-nv Nov 28, 2023
ee10ad0
Merge branch 'branch-23.11' into david-stage-decorator
dagardner-nv Nov 28, 2023
650cee0
Updates for developer guide
dagardner-nv Nov 28, 2023
d18ea97
misc cleanups
dagardner-nv Nov 28, 2023
e6c1407
Document usage of the source decorator [no ci]
dagardner-nv Nov 28, 2023
0787b9b
document the name argument
dagardner-nv Nov 28, 2023
739f887
Remove explicit name, so that when we run the pipeline with both pass…
dagardner-nv Nov 28, 2023
cc7cb00
lint fix
dagardner-nv Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 103 additions & 22 deletions docs/source/developer_guide/guides/1_simple_python_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,62 @@ Morpheus makes use of the MRC graph-execution framework. Morpheus pipelines are

## The Pass Through Stage

To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn't do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed.
To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn't do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed. Morpheus provides two ways of defining a stage, as a stand-alone function or as a class.

Defining this stage requires us to specify the stage type. Morpheus stages which contain a single input and a single output typically inherit from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class.
### Stand-alone Function

The stand-alone function approach is the simplest way to define a stage. The function should accept a single argument, which will be the input message, and return a single value, which will be the output message. The function should be decorated with the `morpheus.pipeline.stage_decorator.stage` decorator.

```python
import typing

from morpheus.pipeline.stage_decorator import stage


@stage
def pass_thru_stage(message: typing.Any) -> typing.Any:
# Return the message for the next stage
return message
```

When using the `stage` decorator it is required to use type annotations for the function parameters and return type, as this will be used by the stage as the accept and output types. In the above example the stage decorator will use the name of the function as the name of the stage. If we wanted to specify a different name for the stage we could do so by passing the name to the decorator as follows:

```python
@stage(name="pass-thru")
def pass_thru_stage(message: typing.Any) -> typing.Any:
# Return the message for the next stage
return message
```

We can then add our stage to a pipeline as follows:
```python
config = Config()
pipeline = LinearPipeline(config)
# ... rest of pipeline setup
pipeline.add_stage(pass_thru_stage(config))
```

It is possible to provide additional keyword arguments to the function. Consider the following example:
```python
@stage
def multiplier(message: MessageMeta, *, column: str, value: int | float = 2.0) -> MessageMeta:
with message.mutable_dataframe() as df:
df[column] = df[column] * value

return message

# ...

pipe.add_stage(multiplier(config, column='probs', value=5))
```

### Stage Class

The class based aproach to defining a stage offers a bit more flexibility, specifically the ability to validate constructor arguments, and perform any needed setup prior to being invoked in a pipeline. Defining this stage requires us to specify the stage type. Morpheus stages which contain a single input and a single output typically inherit from `SinglePortStage`. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the `SingleOutputSource` base class.

Stages in Morpheus define what types of data they accept, and the type of data that they emit. In this example we are emitting messages of the same type that is received, this is actually quite common and Morpheus provides a mixin class, `PassThruTypeMixin`, to simplify this.

Optionally, stages can be registered as a command with the Morpheus CLI using the `register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [numpydoc](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly, the class's docstrings will be exposed in the help string of the stage on the command line.
Optionally, stages can be registered as a command with the Morpheus CLI using the `register_stage` decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using [numpydoc](https://numpydoc.readthedocs.io/en/latest/) and exposed as command line flags. Similarly, the class's docstrings will be exposed in the help string of the stage on the command line.

We start our class definition with a few basic imports:

Expand Down Expand Up @@ -155,17 +204,19 @@ class PassThruStage(PassThruTypeMixin, SinglePortStage):
```

## Testing our new Stage
To start testing our new pass through stage, we are going to construct a simple pipeline and add our new stage to it. This pipeline will do the minimum work necessary to verify our pass through stage. Data will flow through our simple pipeline as follows:
To start testing both our new function-based and class-based stages, we are going to construct a simple pipeline and add both stages to it. This pipeline will do the minimum work necessary to verify our pass through stages. Data will flow through our simple pipeline as follows:
1. A source stage will produce data and emit it into the pipeline.
1. This data will be read and processed by our pass through stage, in this case simply forwarding on the data.
1. A monitoring stage will record the messages from our pass through stage and terminate the pipeline.

First we will need to import a few things from Morpheus for this example to work. Note that this test script, which we will name "run.py", assumes that we saved the code for the `PassThruStage`` in a file named "pass_thru.py" in the same directory.
First we will need to import a few things from Morpheus for this example to work. Note that this test script, which we will name "run.py", assumes that we saved the code for the class based `PassThruStage` in a file named "pass_thru.py" and the function based `pass_thru_stage` named "pass_thru_deco.py" in the same directory.

```python
import logging
import os

from pass_thru import PassThruStage
from pass_thru_deco import pass_thru_stage

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
Expand All @@ -174,6 +225,7 @@ from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging
```


Before constructing the pipeline, we need to do a bit of environment configuration, starting with the Morpheus logger:
```python
configure_logging(log_level=logging.DEBUG)
Expand All @@ -189,10 +241,19 @@ In this example, we will use the `FileSourceStage` class to read a large file in
pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))
```

Next, we will add our new stage to the pipeline as well as a `MonitorStage` which will measure the throughput of our pass through stage:
Next, we will add both versions of our new stage to the pipeline as well as a `MonitorStage` instance for each to measure the throughput of our stages:

```python
# Add the decorated function stage
pipeline.add_stage(pass_thru_stage(config))

# Add monitor to record the performance of the function based stage
pipeline.add_stage(MonitorStage(config))

# Add the class based stage
pipeline.add_stage(PassThruStage(config))

# Add monitor to record the performance of the class based stage
pipeline.add_stage(MonitorStage(config))
```

Expand All @@ -203,20 +264,30 @@ pipeline.run()

The output should display:
```
====Registering Pipeline====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Building Pipeline====
Added source: <from-file-0; FileSourceStage(filename=examples/data/email_with_addresses.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True)>
====Pipeline Pre-build====
====Pre-Building Segment: linear_segment_0====
====Pre-Building Segment Complete!====
====Pipeline Pre-build Complete!====
====Registering Pipeline====
====Building Pipeline====
====Building Pipeline Complete!====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Segment: linear_segment_0====
Added source: <from-file-0; FileSourceStage(filename=examples/data/email_with_addresses.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, parser_kwargs=None)>
└─> morpheus.MessageMeta
Added stage: <pass-thru-1; PassThruStage(args=(), kwargs={})>
Added stage: <pass_thru_stage-1; WrappedFunctionStage(on_data_fn=<function pass_thru_stage at 0x7f001f72bd00>, on_data_args=(), accept_type=None, return_type=None, needed_columns=None, on_data_kwargs={})>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <monitor-2; MonitorStage(description=Progress, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <monitor-2; MonitorStage(description=Progress, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
Added stage: <pass-thru-3; PassThruStage()>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1648834587.3092508
====Pipeline Started====
Progress[Complete]: 25229messages [00:00, 57695.02messages/s]
Added stage: <monitor-4; MonitorStage(description=Progress, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
Progress[Complete]: 100 messages [00:01, 69.97 messages/s]
Progress[Complete]: 100 messages [00:01, 69.76 messages/s]
====Pipeline Complete====
```

Expand All @@ -226,14 +297,15 @@ Note that this code assumes the `MORPHEUS_ROOT` environment variable is set to t
import logging
import os

from pass_thru import PassThruStage
from pass_thru_deco import pass_thru_stage

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging

from pass_thru import PassThruStage


def run_pipeline():
# Enable the Morpheus logger
Expand All @@ -250,22 +322,31 @@ def run_pipeline():
# Set source stage
pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))

# Add our own stage
# Add the decorated function stage
pipeline.add_stage(pass_thru_stage(config))

# Add monitor to record the performance of the function based stage
pipeline.add_stage(MonitorStage(config))

# Add the class based stage
pipeline.add_stage(PassThruStage(config))

# Add monitor to record the performance of our new stage
# Add monitor to record the performance of the class based stage
pipeline.add_stage(MonitorStage(config))

# Run the pipeline
pipeline.run()


if __name__ == "__main__":
run_pipeline()
```


### Alternate Morpheus CLI example
The above example makes use of the Morpheus Python API. Alternately, we could have constructed the same pipeline using the Morpheus command line tool. We will need to pass in the path to our stage via the `--plugin` argument so that it will be visible to the command line tool.
The above example makes use of the Morpheus Python API. Alternately, we could test the class-based stage in a pipeline constructed using the Morpheus command line tool. We will need to pass in the path to our stage via the `--plugin` argument so that it will be visible to the command line tool.

> **Note**: For now, registering a stage with the CLI tool is currently only available to class based stages.

From the root of the Morpheus repo, run:
```bash
Expand Down
Loading