Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI Relative Path Fallback #232

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 65 additions & 11 deletions morpheus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import os
import typing
import warnings
from functools import update_wrapper

Expand Down Expand Up @@ -73,6 +74,48 @@ def get_command(self, ctx, cmd_name):
return super().get_command(ctx, cmd_name)


class MorpheusRelativePath(click.Path):
"""
A specialization of the `click.Path` class that falls back to using package relative paths if the file cannot be
found. Takes the exact same parameters as `click.Path`
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Append "data" to the name so it can be different than normal click.Path
self.name = "data " + self.name

def convert(self,
value: typing.Any,
param: typing.Optional["click.Parameter"],
ctx: typing.Optional["click.Context"]) -> typing.Any:

# First check if the path is relative
if (not os.path.isabs(value)):

# See if the file exists.
does_exist = os.path.exists(value)

if (not does_exist):
# If it doesnt exist, then try to make it relative to the morpheus library root
morpheus_root = os.path.dirname(morpheus.__file__)

value_abs_to_root = os.path.join(morpheus_root, value)

# If the file relative to our package exists, use that instead
if (os.path.exists(value_abs_to_root)):
logger.debug(("Parameter, '%s', with relative path, '%s', does not exist. "
"Using package relative location: '%s'"),
param.name,
value,
value_abs_to_root)

return super().convert(value_abs_to_root, param, ctx)

return super().convert(value, param, ctx)


def _without_empty_args(passed_args):
return {k: v for k, v in passed_args.items() if v is not None}

Expand Down Expand Up @@ -312,6 +355,17 @@ def run(ctx: click.Context, **kwargs):
pass


def validate_rolls(ctx, param, value):
if isinstance(value, tuple):
return value

try:
rolls, _, dice = value.partition("d")
return int(dice), int(rolls)
except ValueError:
raise click.BadParameter("format must be 'NdM'")


@click.group(chain=True,
short_help="Run the inference pipeline with a NLP model",
no_args_is_help=True,
Expand All @@ -325,8 +379,8 @@ def run(ctx: click.Context, **kwargs):
"do_truncate == False, there will be multiple returned sequences containing the "
"overflowing token-ids. Default value is 256"))
@click.option('--labels_file',
default=os.path.join(morpheus.DATA_DIR, "labels_nlp.txt"),
type=click.Path(dir_okay=False, exists=True, file_okay=True),
default="data/labels_nlp.txt",
type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True),
help=("Specifies a file to read labels from in order to convert class IDs into labels. "
"A label file is a simple text file where each line corresponds to a label"))
@click.option('--viz_file',
Expand Down Expand Up @@ -382,13 +436,13 @@ def pipeline_nlp(ctx: click.Context, **kwargs):
help="Number of features trained in the model")
@click.option('--labels_file',
default=None,
type=click.Path(dir_okay=False, exists=True, file_okay=True),
type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True),
help=("Specifies a file to read labels from in order to convert class IDs into labels. "
"A label file is a simple text file where each line corresponds to a label. "
"If unspecified, only a single output label is created for FIL"))
@click.option('--columns_file',
default=os.path.join(morpheus.DATA_DIR, "columns_fil.txt"),
type=click.Path(dir_okay=False, exists=True, file_okay=True),
default="data/columns_fil.txt",
type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True),
help=("Specifies a file to read column features."))
@click.option('--viz_file',
default=None,
Expand Down Expand Up @@ -450,12 +504,12 @@ def pipeline_fil(ctx: click.Context, **kwargs):
cls=AliasedGroup,
**command_kwargs)
@click.option('--columns_file',
default=os.path.join(morpheus.DATA_DIR, "columns_ae.txt"),
type=click.Path(dir_okay=False, exists=True, file_okay=True),
default="data/columns_ae.txt",
type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True),
help=(""))
@click.option('--labels_file',
default=None,
type=click.Path(dir_okay=False, exists=True, file_okay=True),
type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True),
help=("Specifies a file to read labels from in order to convert class IDs into labels. "
"A label file is a simple text file where each line corresponds to a label. "
"If unspecified, only a single output label is created for FIL"))
Expand Down Expand Up @@ -841,11 +895,11 @@ def train_ae(ctx: click.Context, **kwargs):

@click.command(name="preprocess", short_help="Convert messages to tokens", **command_kwargs)
@click.option('--vocab_hash_file',
default=os.path.join(morpheus.DATA_DIR, "bert-base-cased-hash.txt"),
type=click.Path(exists=True, dir_okay=False),
default="data/bert-base-cased-hash.txt",
type=MorpheusRelativePath(exists=True, dir_okay=False, resolve_path=True),
help=("Path to hash file containing vocabulary of words with token-ids. "
"This can be created from the raw vocabulary using the cudf.utils.hash_vocab_utils.hash_vocab "
"function. Default value is 'data/bert-base-cased-hash.txt'"))
"function."))
@click.option('--truncation',
default=False,
type=bool,
Expand Down
151 changes: 151 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
# limitations under the License.

import os
import shutil

import click
import mlflow
import pytest
from click.testing import CliRunner
from mlflow.tracking import fluent

import morpheus
from morpheus import cli
from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
Expand Down Expand Up @@ -790,3 +793,151 @@ def test_pipeline_nlp_relative_paths(self, config, callback_values, tmp_path):
[file_source, deserialize, process_nlp, triton_inf, monitor, add_class, validation, serialize, to_file] = stages

assert process_nlp._vocab_hash_file == vocab_file_name

@pytest.mark.usefixtures("chdir_tmpdir")
@pytest.mark.replace_callback('pipeline_nlp')
def test_pipeline_nlp_relative_path_precedence(self, config, callback_values, tmp_path):
"""
Ensure that relative paths are choosen over the morpheus data directory paths
"""

morpheus_root = os.path.dirname(morpheus.__file__)

vocab_file = "data/bert-base-cased-hash.txt"
labels_file = "data/labels_nlp.txt"

vocab_file_local = os.path.join(tmp_path, vocab_file)
labels_file_local = os.path.join(tmp_path, labels_file)

shutil.copytree(os.path.join(morpheus_root, "data"), os.path.join(tmp_path, "data"), dirs_exist_ok=True)

# Use different labels
test_labels = ["label1", "label2", "label3"]

# Overwrite the copied labels
with open(labels_file_local, mode="w") as f:
f.writelines("\n".join(test_labels))

args = (GENERAL_ARGS + ['pipeline-nlp', f"--labels_file={labels_file}"] + FILE_SRC_ARGS +
['deserialize', 'preprocess', f"--vocab_hash_file={vocab_file}"] + INF_TRITON_ARGS + MONITOR_ARGS +
['add-class'] + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS)

obj = {}
runner = CliRunner()
result = runner.invoke(cli.cli, args, obj=obj)
assert result.exit_code == 47, result.output

# Ensure our config is populated correctly
config = obj["config"]
assert config.class_labels == test_labels

stages = callback_values['stages']
# Verify the stages are as we expect them, if there is a size-mismatch python will raise a Value error
[file_source, deserialize, process_nlp, triton_inf, monitor, add_class, validation, serialize, to_file] = stages

assert process_nlp._vocab_hash_file == vocab_file_local

@pytest.mark.usefixtures("chdir_tmpdir")
@pytest.mark.replace_callback('pipeline_fil')
def test_pipeline_fil_relative_path_precedence(self, config: Config, callback_values, tmp_path):
"""
Ensure that relative paths are choosen over the morpheus data directory paths
"""

labels_file = "data/labels_fil.txt"
columns_file = "data/columns_fil.txt"

labels_file_local = os.path.join(tmp_path, labels_file)
columns_file_local = os.path.join(tmp_path, columns_file)

os.makedirs(os.path.join(tmp_path, "data"), exist_ok=True)

# Use different labels
test_labels = ["label1"]

# Overwrite the copied labels
with open(labels_file_local, mode="w") as f:
f.writelines("\n".join(test_labels))

# Use different labels
test_columns = [f"column{i}" for i in range(29)]

# Overwrite the copied labels
with open(columns_file_local, mode="w") as f:
f.writelines("\n".join(test_columns))

args = (GENERAL_ARGS + ['pipeline-fil', f"--labels_file={labels_file}", f"--columns_file={columns_file}"] +
FILE_SRC_ARGS + ['deserialize', 'preprocess'] + INF_TRITON_ARGS + MONITOR_ARGS + ['add-class'] +
VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS)

obj = {}
runner = CliRunner()
result = runner.invoke(cli.cli, args, obj=obj)
assert result.exit_code == 47, result.output

# Ensure our config is populated correctly
config = obj["config"]
assert config.class_labels == test_labels

assert config.fil.feature_columns == test_columns

@pytest.mark.usefixtures("chdir_tmpdir")
@pytest.mark.replace_callback('pipeline_ae')
def test_pipeline_ae_relative_path_precedence(self, config: Config, callback_values, tmp_path):
"""
Ensure that relative paths are choosen over the morpheus data directory paths
"""

labels_file = "data/labels_ae.txt"
columns_file = "data/columns_ae.txt"

labels_file_local = os.path.join(tmp_path, labels_file)
columns_file_local = os.path.join(tmp_path, columns_file)

os.makedirs(os.path.join(tmp_path, "data"), exist_ok=True)

# Use different labels
test_labels = ["label1"]

# Overwrite the copied labels
with open(labels_file_local, mode="w") as f:
f.writelines("\n".join(test_labels))

# Use different labels
test_columns = [f"column{i}" for i in range(33)]

# Overwrite the copied labels
with open(columns_file_local, mode="w") as f:
f.writelines("\n".join(test_columns))

args = (GENERAL_ARGS + [
'pipeline-ae',
'--userid_filter=user321',
'--userid_column_name=user_col',
f"--labels_file={labels_file}",
f"--columns_file={columns_file}",
'from-cloudtrail',
'--input_glob=input_glob*.csv',
'train-ae',
'--train_data_glob=train_glob*.csv',
'--seed',
'47',
'preprocess',
'inf-pytorch',
'add-scores',
'timeseries',
'--resolution=1m',
'--zscore_threshold=8.0',
'--hot_start'
] + MONITOR_ARGS + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS)

obj = {}
runner = CliRunner()
result = runner.invoke(cli.cli, args, obj=obj)
assert result.exit_code == 47, result.output

# Ensure our config is populated correctly
config = obj["config"]
assert config.class_labels == test_labels

assert config.ae.feature_columns == test_columns