Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DFP pipeline module #510

Merged
64 commits merged into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
1427674
dfp modules testing
bsuryadevara Nov 14, 2022
3d769ae
Merge branch 'nv-morpheus:branch-22.11' into dfp-training-module
bsuryadevara Nov 14, 2022
df5066b
modules stage work
bsuryadevara Nov 18, 2022
ebb2db8
Added module stage implementation
bsuryadevara Nov 22, 2022
c736828
morpehus modules integration
bsuryadevara Nov 22, 2022
1d12aee
trivial changes
bsuryadevara Nov 22, 2022
3376ca7
renamed configuration file
bsuryadevara Nov 23, 2022
4253d34
added tests
bsuryadevara Nov 28, 2022
b5ded8b
update module factory
bsuryadevara Nov 28, 2022
4b13b99
update module factory
bsuryadevara Nov 28, 2022
4789d23
Updated linear modules stage
bsuryadevara Nov 29, 2022
5cdfd68
Updated linear modules stage
bsuryadevara Nov 29, 2022
d15322a
Merge branch 'nv-morpheus:branch-23.01' into dfp-pipeline-module
bsuryadevara Nov 29, 2022
bea98a7
Updated linear modules stage
bsuryadevara Nov 29, 2022
5a4aa4a
Merge branch 'dfp-pipeline-module' of github.com:bsuryadevara/Morpheu…
bsuryadevara Nov 29, 2022
f0268b6
renamed mlflow model writer module
bsuryadevara Nov 29, 2022
25d1c5a
added functiontools wraps to a decorator func
bsuryadevara Nov 29, 2022
e4a73f2
created dfp pipeline preprocessing and training modules
bsuryadevara Nov 30, 2022
e5b770f
created dfp pipeline preprocessing and training modules
bsuryadevara Nov 30, 2022
1575fcd
created dfp pipeline preprocessing and training modules
bsuryadevara Nov 30, 2022
70fc4cf
created dfp pipeline preprocessing and training modules
bsuryadevara Nov 30, 2022
5159b0b
used dill to persist source and preprocess schema
bsuryadevara Nov 30, 2022
9619c0e
used dill to persist source and preprocess schema
bsuryadevara Nov 30, 2022
88d03b1
renamed files
bsuryadevara Dec 1, 2022
899df5c
Updated dfp pipleines with modules
bsuryadevara Dec 7, 2022
6a0bcdd
Added tests
bsuryadevara Dec 7, 2022
e85ee77
resolved merge conflicts
bsuryadevara Dec 7, 2022
dd9f2d2
created dfp (azure, duo) training pipelines with modules
bsuryadevara Dec 7, 2022
0bacf6e
style correction
bsuryadevara Dec 7, 2022
d9983dd
style correction
bsuryadevara Dec 7, 2022
2185886
style correction
bsuryadevara Dec 8, 2022
e50d1d1
added dask and distributed packages to requirements.txt
bsuryadevara Dec 8, 2022
4fdd4c9
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Dec 8, 2022
0e6d5cf
Resloved conflicts
bsuryadevara Dec 8, 2022
9158130
addedd missing tests file
bsuryadevara Dec 9, 2022
e0e7309
Merge remote-tracking branch 'upstream/branch-23.01' into dfp-pipelin…
bsuryadevara Dec 9, 2022
c91aa7f
fix to failing test
bsuryadevara Dec 10, 2022
a9d3e9e
fix to failing test
bsuryadevara Dec 10, 2022
516e746
fix tests
bsuryadevara Dec 10, 2022
bdf384a
addressed feedback comments
bsuryadevara Dec 13, 2022
c6c49a9
Update examples/digital_fingerprinting/production/morpheus/dfp_azure_…
bsuryadevara Dec 13, 2022
c93ac48
Update examples/digital_fingerprinting/production/morpheus/dfp_duo_mo…
bsuryadevara Dec 13, 2022
7e30dea
input and output port names from a module as params
bsuryadevara Dec 13, 2022
8aa84f5
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Dec 13, 2022
2fb7a5e
Update morpheus/stages/general/linear_modules_stage.py
bsuryadevara Dec 14, 2022
a4ce99e
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Dec 14, 2022
6e35679
updated readme, filenames
bsuryadevara Dec 15, 2022
7da7189
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Dec 15, 2022
ae9393b
Updated DFP readme.md
bsuryadevara Dec 15, 2022
4059466
Merge remote-tracking branch 'upstream/branch-23.01' into dfp-pipelin…
bsuryadevara Dec 20, 2022
2fc2916
moved from srf to mrc
bsuryadevara Dec 20, 2022
62149a7
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Jan 4, 2023
e0867c4
changes related to feedback
bsuryadevara Jan 18, 2023
c1af70a
Merge remote-tracking branch 'origin/branch-23.01' into dfp-pipeline-…
bsuryadevara Jan 18, 2023
0a98249
updated header and logging format
bsuryadevara Jan 18, 2023
dd8d69a
fix incomplete headers
bsuryadevara Jan 18, 2023
3d5e592
Removed commented code
bsuryadevara Jan 19, 2023
6d30b13
changed module fucntion name to align with filename
bsuryadevara Jan 23, 2023
31538f0
Merge branch 'dfp-pipeline-module' of https://github.com/bsuryadevara…
bsuryadevara Jan 23, 2023
b0a42b8
added distributed dependency to dfp example
bsuryadevara Jan 23, 2023
904a92e
Merge remote-tracking branch 'upstream/branch-23.01' into dfp-pipelin…
bsuryadevara Jan 23, 2023
ba56921
Merge branch-23.01 to dfp-pipeline-module
bsuryadevara Jan 23, 2023
6d6e030
Merge branch 'branch-23.01' into dfp-pipeline-module
bsuryadevara Jan 23, 2023
5af5813
resolved merge conflicts
bsuryadevara Jan 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated linear modules stage
  • Loading branch information
bsuryadevara committed Nov 29, 2022
commit 4789d232efb6f7ac36bdaffe31416e43a0c926fd
2 changes: 1 addition & 1 deletion cmake/deps/Configure_srf.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function(find_and_configure_srf version)
${PROJECT_NAME}-exports
CPM_ARGS
GIT_REPOSITORY https://github.com/bsuryadevara/SRF.git
GIT_TAG morpheus_module_testing
GIT_TAG morpheus_modules_test
GIT_SHALLOW TRUE
OPTIONS "SRF_BUILD_EXAMPLES OFF"
"SRF_BUILD_TESTS OFF"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
import hashlib
import logging
import os
import random
import typing
import urllib.parse

import mlflow
import requests
import srf
from dfencoder import AutoEncoder
from dfp.utils.model_cache import user_to_model_name
from mlflow.exceptions import MlflowException
from mlflow.models.signature import ModelSignature
from mlflow.protos.databricks_pb2 import RESOURCE_ALREADY_EXISTS
from mlflow.protos.databricks_pb2 import ErrorCode
from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository
from mlflow.tracking import MlflowClient
from mlflow.types import ColSpec
from mlflow.types import Schema
from mlflow.types.utils import _infer_pandas_column
from mlflow.types.utils import _infer_schema
from srf.core import operators as ops

from morpheus.messages.multi_ae_message import MultiAEMessage
from morpheus.utils.decorator_utils import is_module_registered

from ..messages.multi_dfp_message import MultiDFPMessage

logger = logging.getLogger(f"morpheus.{__name__}")


class DFPModuleRegisterUtil:

def __init__(self):
self._registry = srf.ModuleRegistry
self._release_version = self.get_release_version()

def get_release_version(self) -> typing.List[int]:
ver_list = srf.__version__.split('.')
ver_list = [int(i) for i in ver_list]
return ver_list

def register_training_module(self, module_id, namespace):

def training_module(builder: srf.Builder):

config = builder.get_current_module_config()[module_id]

def on_data(message: MultiDFPMessage):
if (message is None or message.mess_count == 0):
return None

user_id = message.user_id

model = AutoEncoder(**config["model_kwargs"])

final_df = message.get_meta_dataframe()

# Only train on the feature columns
final_df = final_df[final_df.columns.intersection(config["feature_columns"])]

logger.debug("Training AE model for user: '%s'...", user_id)
model.fit(final_df, epochs=30)
logger.debug("Training AE model for user: '%s'... Complete.", user_id)

output_message = MultiAEMessage(message.meta,
mess_offset=message.mess_offset,
mess_count=message.mess_count,
model=model)

return output_message

def node_fn(obs: srf.Observable, sub: srf.Subscriber):
obs.pipe(ops.map(on_data), ops.filter(lambda x: x is not None)).subscribe(sub)

node = builder.make_node_full(self.get_unique_name(module_id), node_fn)

builder.register_module_input("input", node)
builder.register_module_output("output", node)

if not self._registry.contains(module_id, namespace):
self._registry.register_module(module_id, namespace, self._release_version, training_module)

def register_mlflow_writer_module(self, module_id, namespace):

def mlflow_writer_module(builder: srf.Builder):

config = builder.get_current_module_config()[module_id]

def user_id_to_model(user_id: str):

return user_to_model_name(user_id=user_id, model_name_formatter=config["model_name_formatter"])

def user_id_to_experiment(user_id: str):
kwargs = {
"user_id": user_id,
"user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(),
"reg_model_name": user_id_to_model(user_id=user_id)
}

return config["experiment_name_formatter"].format(**kwargs)

def _apply_model_permissions(reg_model_name: str):

# Check the required variables
databricks_host = os.environ.get("DATABRICKS_HOST", None)
databricks_token = os.environ.get("DATABRICKS_TOKEN", None)

if (databricks_host is None or databricks_token is None):
raise RuntimeError("Cannot set Databricks model permissions. "
"Environment variables `DATABRICKS_HOST` and `DATABRICKS_TOKEN` must be set")

headers = {"Authorization": f"Bearer {databricks_token}"}

url_base = f"{databricks_host}"

try:
# First get the registered model ID
get_registered_model_url = urllib.parse.urljoin(url_base,
"/api/2.0/mlflow/databricks/registered-models/get")

get_registered_model_response = requests.get(url=get_registered_model_url,
headers=headers,
params={"name": reg_model_name})

registered_model_response = get_registered_model_response.json()

reg_model_id = registered_model_response["registered_model_databricks"]["id"]

# Now apply the permissions. If it exists already, it will be overwritten or it is a no-op
patch_registered_model_permissions_url = urllib.parse.urljoin(
url_base, f"/api/2.0/preview/permissions/registered-models/{reg_model_id}")

patch_registered_model_permissions_body = {
"access_control_list": [{
"group_name": group, "permission_level": permission
} for group,
permission in config["databricks_permissions"].items()]
}

requests.patch(url=patch_registered_model_permissions_url,
headers=headers,
json=patch_registered_model_permissions_body)

except Exception:
logger.exception("Error occurred trying to apply model permissions to model: %s",
reg_model_name,
exc_info=True)

def on_data(message: MultiAEMessage):

user = message.meta.user_id

model: AutoEncoder = message.model

model_path = "dfencoder"
reg_model_name = user_id_to_model(user_id=user)

# Write to ML Flow
try:
mlflow.end_run()

experiment_name = user_id_to_experiment(user_id=user)

# Creates a new experiment if it doesnt exist
experiment = mlflow.set_experiment(experiment_name)

with mlflow.start_run(run_name="Duo autoencoder model training run",
experiment_id=experiment.experiment_id) as run:

model_path = f"{model_path}-{run.info.run_uuid}"

# Log all params in one dict to avoid round trips
mlflow.log_params({
"Algorithm": "Denosing Autoencoder",
"Epochs": model.lr_decay.state_dict().get("last_epoch", "unknown"),
"Learning rate": model.lr,
"Batch size": model.batch_size,
"Start Epoch": message.get_meta("timestamp").min(),
"End Epoch": message.get_meta("timestamp").max(),
"Log Count": message.mess_count,
})

metrics_dict: typing.Dict[str, float] = {}

# Add info on the embeddings
for k, v in model.categorical_fts.items():
embedding = v.get("embedding", None)

if (embedding is None):
continue

metrics_dict[f"embedding-{k}-num_embeddings"] = embedding.num_embeddings
metrics_dict[f"embedding-{k}-embedding_dim"] = embedding.embedding_dim

# Add metrics for all of the loss stats
if (hasattr(model, "feature_loss_stats")):
for k, v in model.feature_loss_stats.items():
metrics_dict[f"loss-{k}-mean"] = v.get("mean", "unknown")
metrics_dict[f"loss-{k}-std"] = v.get("std", "unknown")

mlflow.log_metrics(metrics_dict)

# Use the prepare_df function to setup the direct inputs to the model. Only include features
# returned by prepare_df to show the actual inputs to the model (any extra are discarded)
input_df = message.get_meta().iloc[0:1]
prepared_df = model.prepare_df(input_df)
output_values = model.get_anomaly_score(input_df)

input_schema = Schema([
ColSpec(type=_infer_pandas_column(input_df[col_name]), name=col_name)
for col_name in list(prepared_df.columns)
])
output_schema = _infer_schema(output_values)

model_sig = ModelSignature(inputs=input_schema, outputs=output_schema)

model_info = mlflow.pytorch.log_model(
pytorch_model=model,
artifact_path=model_path,
conda_env=config["conda_env"],
signature=model_sig,
)

client = MlflowClient()

# First ensure a registered model has been created
try:
create_model_response = client.create_registered_model(reg_model_name)
logger.debug("Successfully registered model '%s'.", create_model_response.name)
except MlflowException as e:
if e.error_code == ErrorCode.Name(RESOURCE_ALREADY_EXISTS):
pass
else:
raise e

# If we are using databricks, make sure we set the correct permissions
if (config["databricks_permissions"] is not None and mlflow.get_tracking_uri() == "databricks"):
# Need to apply permissions
_apply_model_permissions(reg_model_name=reg_model_name)

model_src = RunsArtifactRepository.get_underlying_uri(model_info.model_uri)

tags = {
"start": message.get_meta(config["timestamp_column_name"]).min(),
"end": message.get_meta(config["timestamp_column_name"]).max(),
"count": message.get_meta(config["timestamp_column_name"]).count()
}

# Now create the model version
mv = client.create_model_version(name=reg_model_name,
source=model_src,
run_id=run.info.run_id,
tags=tags)

logger.debug("ML Flow model upload complete: %s:%s:%s", user, reg_model_name, mv.version)

except Exception:
logger.exception("Error uploading model to ML Flow", exc_info=True)

return message

def node_fn(obs: srf.Observable, sub: srf.Subscriber):
obs.pipe(ops.map(on_data), ops.filter(lambda x: x is not None)).subscribe(sub)

node = builder.make_node_full("dfp_mlflow_writer", node_fn)

builder.register_module_input("input", node)
builder.register_module_output("output", node)

if not self._registry.contains(module_id, namespace):
self._registry.register_module(module_id, namespace, self._release_version, mlflow_writer_module)

@is_module_registered
def is_module_registered(self, module_id=None, namespace=None):
return

def register_training_pipeline_module(self, module_id, namespace, ordered_module_meta):

def training_pipeline_module(builder: srf.Builder):

config = builder.get_current_module_config()

prev_module = None
head_module = None

for item in ordered_module_meta:
self.is_module_registered(module_id=item["module_id"], namespace=item["namespace"])

curr_module = builder.load_module(item["module_id"], item["namespace"], "dfp_pipeline", config)

if prev_module:
builder.make_edge(prev_module.output_port("output"), curr_module.input_port("input"))
else:
head_module = curr_module

prev_module = curr_module

builder.register_module_input("input", head_module.input_port("input"))
builder.register_module_output("output", prev_module.output_port("output"))

if not self._registry.contains(module_id, namespace):
self._registry.register_module(module_id, namespace, self._release_version, training_pipeline_module)

def get_unique_name(self, id) -> str:
unique_name = id + "-" + str(random.randint(0, 1000))
return unique_name


dfp_util = DFPModuleRegisterUtil()

# Register DFP training module
dfp_util.register_training_module("DFPTraining", "morpheus_modules")

# Register DFP MLFlow writer module
dfp_util.register_mlflow_writer_module("DFPMLFlowWriter", "morpheus_modules")

ordered_module_meta = [{
"module_id": "DFPTraining", "namespace": "morpheus_modules"
}, {
"module_id": "DFPMLFlowWriter", "namespace": "morpheus_modules"
}]
# Register DFP training pipeline module
dfp_util.register_training_pipeline_module("DFPTrainingPipeline", "morpheus_modules", ordered_module_meta)
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,12 @@ def __init__(self,
c: Config,
model_name_formatter: str = "dfp-{user_id}",
experiment_name_formatter: str = "/dfp-models/{reg_model_name}",
databricks_permissions: dict = None,
version: typing.List = [22, 11, 0],
module_name: str = "DFPMLFlowWriterModule",
module_namespace: str = "DFP"):
databricks_permissions: dict = None):
super().__init__(c)

self._model_name_formatter = model_name_formatter
self._experiment_name_formatter = experiment_name_formatter
self._databricks_permissions = databricks_permissions
self._registry = srf.ModuleRegistry()
self._version = version
self._module_name = module_name
self._module_namespace = module_namespace

@property
def name(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

from morpheus.messages.multi_dfp_message import MultiDFPMessage
from ..messages.multi_dfp_message import MultiDFPMessage
from ..utils.column_info import DataFrameInputSchema
from ..utils.column_info import process_dataframe

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

from morpheus.messages.multi_dfp_message import DFPMessageMeta
from morpheus.messages.multi_dfp_message import MultiDFPMessage
from ..messages.multi_dfp_message import DFPMessageMeta
from ..messages.multi_dfp_message import MultiDFPMessage
from ..utils.logging_timer import log_time

# Setup conda environment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

from morpheus.messages.multi_dfp_message import DFPMessageMeta
from ..messages.multi_dfp_message import DFPMessageMeta
from ..utils.logging_timer import log_time

logger = logging.getLogger("morpheus.{}".format(__name__))
Expand Down
Loading