Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DFP pipeline module #510

Merged
64 commits merged into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
1427674
dfp modules testing
bsuryadevara Nov 14, 2022
3d769ae
Merge branch 'nv-morpheus:branch-22.11' into dfp-training-module
bsuryadevara Nov 14, 2022
df5066b
modules stage work
bsuryadevara Nov 18, 2022
ebb2db8
Added module stage implementation
bsuryadevara Nov 22, 2022
c736828
morpehus modules integration
bsuryadevara Nov 22, 2022
1d12aee
trivial changes
bsuryadevara Nov 22, 2022
3376ca7
renamed configuration file
bsuryadevara Nov 23, 2022
4253d34
added tests
bsuryadevara Nov 28, 2022
b5ded8b
update module factory
bsuryadevara Nov 28, 2022
4b13b99
update module factory
bsuryadevara Nov 28, 2022
4789d23
Updated linear modules stage
bsuryadevara Nov 29, 2022
5cdfd68
Updated linear modules stage
bsuryadevara Nov 29, 2022
d15322a
Merge branch 'nv-morpheus:branch-23.01' into dfp-pipeline-module
bsuryadevara Nov 29, 2022
bea98a7
Updated linear modules stage
bsuryadevara Nov 29, 2022
5a4aa4a
Merge branch 'dfp-pipeline-module' of github.com:bsuryadevara/Morpheu…
bsuryadevara Nov 29, 2022
f0268b6
renamed mlflow model writer module
bsuryadevara Nov 29, 2022
25d1c5a
added functiontools wraps to a decorator func
bsuryadevara Nov 29, 2022
e4a73f2
created dfp pipeline preprocessing and training modules
bsuryadevara Nov 30, 2022
e5b770f
created dfp pipeline preprocessing and training modules
bsuryadevara Nov 30, 2022
1575fcd
created dfp pipeline preprocessing and training modules
bsuryadevara Nov 30, 2022
70fc4cf
created dfp pipeline preprocessing and training modules
bsuryadevara Nov 30, 2022
5159b0b
used dill to persist source and preprocess schema
bsuryadevara Nov 30, 2022
9619c0e
used dill to persist source and preprocess schema
bsuryadevara Nov 30, 2022
88d03b1
renamed files
bsuryadevara Dec 1, 2022
899df5c
Updated dfp pipleines with modules
bsuryadevara Dec 7, 2022
6a0bcdd
Added tests
bsuryadevara Dec 7, 2022
e85ee77
resolved merge conflicts
bsuryadevara Dec 7, 2022
dd9f2d2
created dfp (azure, duo) training pipelines with modules
bsuryadevara Dec 7, 2022
0bacf6e
style correction
bsuryadevara Dec 7, 2022
d9983dd
style correction
bsuryadevara Dec 7, 2022
2185886
style correction
bsuryadevara Dec 8, 2022
e50d1d1
added dask and distributed packages to requirements.txt
bsuryadevara Dec 8, 2022
4fdd4c9
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Dec 8, 2022
0e6d5cf
Resloved conflicts
bsuryadevara Dec 8, 2022
9158130
addedd missing tests file
bsuryadevara Dec 9, 2022
e0e7309
Merge remote-tracking branch 'upstream/branch-23.01' into dfp-pipelin…
bsuryadevara Dec 9, 2022
c91aa7f
fix to failing test
bsuryadevara Dec 10, 2022
a9d3e9e
fix to failing test
bsuryadevara Dec 10, 2022
516e746
fix tests
bsuryadevara Dec 10, 2022
bdf384a
addressed feedback comments
bsuryadevara Dec 13, 2022
c6c49a9
Update examples/digital_fingerprinting/production/morpheus/dfp_azure_…
bsuryadevara Dec 13, 2022
c93ac48
Update examples/digital_fingerprinting/production/morpheus/dfp_duo_mo…
bsuryadevara Dec 13, 2022
7e30dea
input and output port names from a module as params
bsuryadevara Dec 13, 2022
8aa84f5
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Dec 13, 2022
2fb7a5e
Update morpheus/stages/general/linear_modules_stage.py
bsuryadevara Dec 14, 2022
a4ce99e
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Dec 14, 2022
6e35679
updated readme, filenames
bsuryadevara Dec 15, 2022
7da7189
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Dec 15, 2022
ae9393b
Updated DFP readme.md
bsuryadevara Dec 15, 2022
4059466
Merge remote-tracking branch 'upstream/branch-23.01' into dfp-pipelin…
bsuryadevara Dec 20, 2022
2fc2916
moved from srf to mrc
bsuryadevara Dec 20, 2022
62149a7
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Jan 4, 2023
e0867c4
changes related to feedback
bsuryadevara Jan 18, 2023
c1af70a
Merge remote-tracking branch 'origin/branch-23.01' into dfp-pipeline-…
bsuryadevara Jan 18, 2023
0a98249
updated header and logging format
bsuryadevara Jan 18, 2023
dd8d69a
fix incomplete headers
bsuryadevara Jan 18, 2023
3d5e592
Removed commented code
bsuryadevara Jan 19, 2023
6d30b13
changed module fucntion name to align with filename
bsuryadevara Jan 23, 2023
31538f0
Merge branch 'dfp-pipeline-module' of https://github.com/bsuryadevara…
bsuryadevara Jan 23, 2023
b0a42b8
added distributed dependency to dfp example
bsuryadevara Jan 23, 2023
904a92e
Merge remote-tracking branch 'upstream/branch-23.01' into dfp-pipelin…
bsuryadevara Jan 23, 2023
ba56921
Merge branch-23.01 to dfp-pipeline-module
bsuryadevara Jan 23, 2023
6d6e030
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Jan 23, 2023
5af5813
resolved merge conflicts
bsuryadevara Jan 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updated readme, filenames
  • Loading branch information
bsuryadevara committed Dec 15, 2022
commit 6e35679591e97281fc01ee4554b06145998e1838
1 change: 0 additions & 1 deletion ci/conda/recipes/morpheus/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ outputs:
- cudf_kafka {{ rapids_version }}
- cupy # Version determined from cudf
- datacompy 0.8
- dask
- distributed
- docker-py 5.0
- grpcio # Version determined from cudf
Expand Down
154 changes: 154 additions & 0 deletions docs/source/developer_guide/guides/5_digital_fingerprinting.md
Original file line number Diff line number Diff line change
Expand Up @@ -621,3 +621,157 @@ The `DFPPostprocessingStage` ([examples/digital_fingerprinting/production/morphe
| -------- | ---- | ----------- |
| `c` | `morpheus.config.Config` | Morpheus config object |
| `z_score_threshold` | `float` | Optional, sets the threshold value above which values of `mean_abs_z` must be above in order to be considered an anomaly, default is 2.0 |

## Morpheus Pipeline with Modules

A module is a type of work unit that can be utilized in the Morpheus stage and can be registered to a MRC segment module registry. Modules are beneficial when there is a possibility for the work-unit to be reused. We can load the module from the registry into a multiple contexts without having to be familiar with the inner workings of the Module; all that is needed is that we pass an input and it returns the output.

Let's first look at the module implementation structure before diving deeper into the DFP Training pipeline as a module.

> Note: Modules can be used for more than just creating middle nodes to connect sources and sinks. Additionally, it can be used to construct Source and Sink nodes.

```py
import srf

from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import register_module

@register_module("SimpleModule", "morpheus_modules")
def module_init(builder: srf.Builder):

module_id = "SimpleModule"

config: typing.Dict[str, str] = get_module_config(module_id, builder)

sep = config.get("sep", ",")

def on_data(message: str):

# Your implementation goes here...

def node_fn(obs: srf.Observable, sub: srf.Subscriber):
obs.pipe(ops.map(on_data), ops.filter(lambda x: x is not None)).subscribe(sub)

# Here we are creating a node.
node = builder.make_node_full(module_id, node_fn)

# Register input and output port name for a module.
builder.register_module_input("<input port name>", node)
builder.register_module_output("<output port name>", node)

```

The `register_module` decorator on the module initializer function registers the module with the `SimpleModule` (module_id) and the `morpheus_modules` (namespace).

> Note: While registering the module, the user has the opportunity to choose the input and output port names. When the module has been registered. To obtain the input /output port connection, the same names must be used.


Required key meta fields for module configuration as shown below.
- `module_id` : Unique identifier for a module in the module registry.
- `module_name` : Specifies the module name.
- `namespace` : Virtually cluster the modules.

```py
# Module configuration
module_config = {
"module_id": "SimpleModule",
"module_name": "simple_module",
"namespace": "morpheus_modules",
"sep": ":"
}
```

Module must be packaged as a stage, as illustrated below, in order to be used in the Morpheus pipeline.

```py
from morpheus.stages.general.linear_modules_stage import LinearModulesStage

# Morpheus configuration
c = Config()

module_stage = LinearModulesStage(c,
module_config,
input_port_name="<input port name>",
output_port_name="<input port name>",
output_type="<module output type>")


```
[LinearModulesStage](/morpheus/stages/general/linear_modules_stage.py) is an utility stage that loads an existing, registered, MRC SegmentModule and wraps it as a Morpheus SinglePortStage.

| Argument | Type | Description |
| -------- | ---- | ----------- |
| `c` | `morpheus.config.Config` | Morpheus config object |
| `module_config` | `dict` or `None` | module configuration |
| `input_port_name` | `str` | Name of a module input port, as used during registration |
| `output_port_name` | `str` | Name of a module output port, as used during registration |
| `output_type` | default `typing.Any` | Module output data type |


A module can serve as a wrapper for a chain of complex constructs-containing child modules. The example below demonstrates how to establish a chain module, presuming `modules_1` through `module_n` are already registered.

```py
import srf

from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import register_module

@register_module("ChainModule", "morpheus_modules")
def simple_module(builder: srf.Builder):

module_id = "ChainModule"

config: typing.Dict[str, str] = get_module_config(module_id, builder)

# Get module configurations
module_1_config = config.get("module_1", None)
...
...
module_n_config = config.get("module_n", None)

# Load modules from the registry
module_1 = load_module(module_1_config, builder=builder)
...
...
module_n = load_module(module_n_config, builder=builder)

# Make an edge between the modules.
# input/output port names used at 'builder.register_module_input' and 'builder.register_module_output'.
builder.make_edge(module_1.output_port("<output port name>"), module_2.input_port("<input port name>"))
...
...
builder.make_edge(module_n-1.output_port("<output port name>"), module_n.input_port("<input port name>"))

# Register input and output port for a module.
builder.register_module_input("<your input port name>", module_1.input_port("<input port name>"))
builder.register_module_output("<your output port name>", module_n.output_port("<output port name>"))
```

Let's look at the DFP Training process based on modules. On a higher level, the complete DFP training process has been divided into two modules.

* [**DFPPipelinePreprocessing**](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_pipeline_preprocessing.py)

This module constructs a chaining module by combining the separate modules listed below into a single module that contains all of the internal components for preprocessing the Azure Active Directory and Duo Authentication logs.
- [FileBatcher](/morpheus/modules/file_batcher.py)
- [FileToDataFrame](/morpheus/modules/file_to_df.py)
- [DFPSplitUsers](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_split_users.py)
- [DFPRollingWindow](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_rolling_window.py)
- [DFPPreprocessing](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_preprocessing.py)
* [**DFPPipelineTraining**](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_pipeline_training.py)

This module creates a chaining module by integrating the individual modules described below into a single module that incorporates all of the internals for consuming preprocessed data, training the model, and writing the trained model to MLFLow server to serve inference requests.
- [DFPTraining](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py)
- [MLFlowModelWriter](/morpheus/modules/mlflow_model_writer.py)


#### Run DFP Training Pipeline with Modules
To run the DFP pipelines using modules with the example datasets, within the container run:

* Duo Training Pipeline
```bash
python dfp_duo_modules_pipeline.py --train_users=all --start_time="2022-08-01" --input_file="/workspace/examples/data/dfp/duo-training-data/*.json"
```
* Azure Training Pipeline
```bash
python dfp_azure_modules_pipeline.py --train_users=all --start_time="2022-08-01" --input_file="/workspace/examples/data/dfp/azure-training-data/*.json"
```
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import dfp.modules.dfp_training # noqa: F401
import srf

import morpheus.modules.file_batcher_module # noqa: F401
import morpheus.modules.file_to_df_module # noqa: F401
import morpheus.modules.file_batcher # noqa: F401
import morpheus.modules.file_to_df # noqa: F401
from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import load_module
from morpheus.utils.module_utils import register_module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import dfp.modules.dfp_training # noqa: F401
import srf

import morpheus.modules.mlflow_model_writer_module # noqa: F401
import morpheus.modules.mlflow_model_writer # noqa: F401
from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import load_module
from morpheus.utils.module_utils import register_module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@
import srf
from srf.core import operators as ops

import dask
from dask.distributed import Client
from dask.distributed import LocalCluster

import cudf

from morpheus._lib.file_types import FileTypes
from morpheus.cli.utils import str_to_file_type
from morpheus.io.deserializers import read_file_to_df
from morpheus.utils.column_info import process_dataframe
from morpheus.utils.module_utils import get_module_config
Expand Down Expand Up @@ -68,20 +65,25 @@ def file_to_dataframe(builder: srf.Builder):
cache_dir = config.get("cache_dir", None)

download_method: typing.Literal["single_thread", "multiprocess", "dask",
"dask_thread"] = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", "dask_thread")
"dask_thread"] = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", "multiprocess")
cache_dir = os.path.join(cache_dir, "file_cache")

# Load input schema
schema = pickle.loads(bytes(schema_str, encoding))

available_file_types = {"csv": FileTypes.CSV, "json": FileTypes.JSON}

try:
file_type = available_file_types[file_type.lower()]
file_type = str_to_file_type(file_type.lower())
except Exception:
raise ValueError("Invalid input file type '{}'. Available file types are: CSV, JSON".format(file_type))

def get_dask_cluster():

try:
import dask
from dask.distributed import LocalCluster
except ModuleNotFoundError:
raise Exception("Install 'dask' and 'distributed' to allow file downloads using dask mode.")

logger.debug("Creating dask cluster...")

# Up the heartbeat interval which can get violated with long download times
Expand All @@ -93,6 +95,16 @@ def get_dask_cluster():

return dask_cluster

def get_dask_client(dask_cluster):

from dask.distributed import Client

logger.debug("Creating dask client...")
dask_client = Client(dask_cluster)
logger.debug("Creating dask client %s ... Done.", dask_client)

return dask_client

def close_dask_cluster():
if (dask_cluster is not None):
logger.debug("Stopping dask cluster...")
Expand Down Expand Up @@ -175,7 +187,7 @@ def get_or_create_dataframe_from_s3_batch(
if (download_method.startswith("dask")):
# Create the client each time to ensure all connections to the cluster are
# closed (they can time out)
with Client(dask_cluster) as client:
with get_dask_client(dask_cluster) as client:
dfs = client.map(download_method_func, download_buckets)

dfs = client.gather(dfs)
Expand Down Expand Up @@ -240,8 +252,6 @@ def convert_to_dataframe(s3_object_batch: typing.Tuple[fsspec.core.OpenFiles, in
def node_fn(obs: srf.Observable, sub: srf.Subscriber):
obs.pipe(ops.map(convert_to_dataframe), ops.on_completed(close_dask_cluster)).subscribe(sub)

dask_cluster = None

if (download_method.startswith("dask")):
dask_cluster = get_dask_cluster()

Expand Down
3 changes: 2 additions & 1 deletion morpheus/stages/general/linear_modules_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@

class LinearModulesStage(SinglePortStage):
drobison00 marked this conversation as resolved.
Show resolved Hide resolved
"""
Loads an existing, registered, MRC SegmentModule and wraps it as a Morpheus SinglePortStage.
Loads an existing, registered, MRC SegmentModule and wraps it as a Morpheus SinglePortStage.

Parameters
----------
c : `morpheus.config.Config`
Expand Down