diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 760de05286..5ffe9764aa 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -232,17 +232,24 @@ Launching a full production Kafka cluster is outside the scope of this project. ```bash export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') ``` -5. Update the `kafka-docker/docker-compose.yml` so the environment variable `KAFKA_ADVERTISED_HOST_NAME` matches the previous step. For example, the line should look like: +5. Update the `kafka-docker/docker-compose.yml`, performing two changes: + 1. Update the `ports` entry to: + ```yml + ports: + - "0.0.0.0::9092" + ``` + This will prevent the containers from attempting to map IPv6 ports. + 1. Change the value of `KAFKA_ADVERTISED_HOST_NAME` to match the value of the `KAFKA_ADVERTISED_HOST_NAME` environment variable from the previous step. For example, the line should look like: - ```yml - environment: - KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1 - ``` - Which should match the value of `$KAFKA_ADVERTISED_HOST_NAME` from the previous step: + ```yml + environment: + KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1 + ``` + Which should match the value of `$KAFKA_ADVERTISED_HOST_NAME` from the previous step: - ```bash - $ echo $KAFKA_ADVERTISED_HOST_NAME - "172.17.0.1" + ```bash + $ echo $KAFKA_ADVERTISED_HOST_NAME + "172.17.0.1" ``` 6. Launch kafka with 3 instances: @@ -252,11 +259,14 @@ Launching a full production Kafka cluster is outside the scope of this project. In practice, 3 instances has been shown to work well. Use as many instances as required. Keep in mind each instance takes about 1 Gb of memory. 7. Launch the Kafka shell 1. To configure the cluster, you will need to launch into a container that has the Kafka shell. - 2. You can do this with `./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME`. + 2. You can do this with: + ```bash + ./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME + ``` 3. However, this makes it difficult to load data into the cluster. Instead, you can manually launch the Kafka shell by running: ```bash # Change to the morpheus root to make it easier for mounting volumes - cd ${MORPHEUS_HOME} + cd ${MORPHEUS_ROOT} # Run the Kafka shell docker container docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ diff --git a/docs/source/developer_guide/guides/1_simple_python_stage.md b/docs/source/developer_guide/guides/1_simple_python_stage.md index 92e3385278..1d93c5835e 100644 --- a/docs/source/developer_guide/guides/1_simple_python_stage.md +++ b/docs/source/developer_guide/guides/1_simple_python_stage.md @@ -124,7 +124,7 @@ from morpheus.config import Config from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.input.file_source_stage import FileSourceStage -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging from pass_thru import PassThruStage ``` @@ -185,7 +185,7 @@ from morpheus.config import Config from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.input.file_source_stage import FileSourceStage -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging from pass_thru import PassThruStage diff --git a/docs/source/developer_guide/guides/2_real_world_phishing.md b/docs/source/developer_guide/guides/2_real_world_phishing.md index 06ddbe9fcd..c72b29d22d 100644 --- a/docs/source/developer_guide/guides/2_real_world_phishing.md +++ b/docs/source/developer_guide/guides/2_real_world_phishing.md @@ -302,7 +302,7 @@ from morpheus.stages.postprocess.filter_detections_stage import FilterDetections from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging from recipient_features_stage import RecipientFeaturesStage diff --git a/examples/abp_pcap_detection/run.py b/examples/abp_pcap_detection/run.py index 9c6def11f8..c7e65cd25d 100644 --- a/examples/abp_pcap_detection/run.py +++ b/examples/abp_pcap_detection/run.py @@ -31,7 +31,7 @@ from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging @click.command() diff --git a/examples/gnn_fraud_detection_pipeline/run.py b/examples/gnn_fraud_detection_pipeline/run.py index 3e9b915aa5..8441a8eacf 100644 --- a/examples/gnn_fraud_detection_pipeline/run.py +++ b/examples/gnn_fraud_detection_pipeline/run.py @@ -26,7 +26,7 @@ from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging from stages.classification_stage import ClassificationStage from stages.graph_construction_stage import FraudGraphConstructionStage from stages.graph_sage_stage import GraphSAGEStage diff --git a/examples/ransomware_detection/run.py b/examples/ransomware_detection/run.py index 723a7d10f0..c4f351fa74 100644 --- a/examples/ransomware_detection/run.py +++ b/examples/ransomware_detection/run.py @@ -29,7 +29,7 @@ from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.add_scores_stage import AddScoresStage from morpheus.stages.postprocess.serialize_stage import SerializeStage -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging from stages.create_features import CreateFeaturesRWStage from stages.preprocessing import PreprocessingRWStage diff --git a/morpheus/_lib/include/morpheus/io/serializers.hpp b/morpheus/_lib/include/morpheus/io/serializers.hpp index de5c7cb83a..2a8f7ee265 100644 --- a/morpheus/_lib/include/morpheus/io/serializers.hpp +++ b/morpheus/_lib/include/morpheus/io/serializers.hpp @@ -25,12 +25,14 @@ namespace morpheus { -std::string df_to_csv(const TableInfo& tbl, bool include_header); +std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col = true); -void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header); +void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col = true); -std::string df_to_json(const TableInfo& tbl); +// Note the include_index_col is currently being ignored in both versions of `df_to_json` due to a known issue in +// Pandas: https://github.com/pandas-dev/pandas/issues/37600 +std::string df_to_json(const TableInfo& tbl, bool include_index_col = true); -void df_to_json(const TableInfo& tbl, std::ostream& out_stream); +void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col = true); } // namespace morpheus diff --git a/morpheus/_lib/include/morpheus/stages/write_to_file.hpp b/morpheus/_lib/include/morpheus/stages/write_to_file.hpp index e0bb5a8598..f966ea5242 100644 --- a/morpheus/_lib/include/morpheus/stages/write_to_file.hpp +++ b/morpheus/_lib/include/morpheus/stages/write_to_file.hpp @@ -49,7 +49,8 @@ class WriteToFileStage : public srf::pysrf::PythonNode m_write_func; }; @@ -81,7 +83,8 @@ struct WriteToFileStageInterfaceProxy const std::string &name, const std::string &filename, const std::string &mode = "w", - FileTypes file_type = FileTypes::Auto); + FileTypes file_type = FileTypes::Auto, + bool include_index_col = true); }; #pragma GCC visibility pop diff --git a/morpheus/_lib/src/io/serializers.cpp b/morpheus/_lib/src/io/serializers.cpp index 3a5ceeeb55..5bc4527555 100644 --- a/morpheus/_lib/src/io/serializers.cpp +++ b/morpheus/_lib/src/io/serializers.cpp @@ -19,11 +19,15 @@ #include #include +#include +#include #include #include #include #include +#include +#include #include #include @@ -75,21 +79,33 @@ class OStreamSink : public cudf::io::data_sink size_t m_bytest_written{0}; }; -std::string df_to_csv(const TableInfo& tbl, bool include_header) +std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col) { // Create an ostringstream and use that with the overload accepting an ostream std::ostringstream out_stream; - df_to_csv(tbl, out_stream, include_header); + df_to_csv(tbl, out_stream, include_header, include_index_col); return out_stream.str(); } -void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header) +void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col) { + auto column_names = tbl.get_column_names(); + cudf::size_type start_col = 1; + if (include_index_col) + { + start_col = 0; + column_names.insert(column_names.begin(), ""s); // insert the id column + } + + std::vector col_idexes(column_names.size()); + std::iota(col_idexes.begin(), col_idexes.end(), start_col); + auto tbl_view = tbl.get_view().select(col_idexes); + OStreamSink sink(out_stream); auto destination = cudf::io::sink_info(&sink); - auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl.get_view()) + auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view) .include_header(include_header) .true_value("True"s) .false_value("False"s); @@ -97,8 +113,6 @@ void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_head cudf::io::table_metadata metadata{}; if (include_header) { - auto column_names = tbl.get_column_names(); - column_names.insert(column_names.begin(), ""s); // insert the id column metadata.column_names = column_names; options_builder = options_builder.metadata(&metadata); } @@ -106,7 +120,7 @@ void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_head cudf::io::write_csv(options_builder.build(), rmm::mr::get_current_device_resource()); } -std::string df_to_json(const TableInfo& tbl) +std::string df_to_json(const TableInfo& tbl, bool include_index_col) { std::string results; // no cpp impl for to_json, instead python module converts to pandas and calls to_json @@ -116,7 +130,7 @@ std::string df_to_json(const TableInfo& tbl) auto df = tbl.as_py_object(); auto buffer = StringIO(); - py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true); + py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true, "index"_a = include_index_col); df.attr("to_json")(buffer, **kwargs); buffer.attr("seek")(0); @@ -127,11 +141,11 @@ std::string df_to_json(const TableInfo& tbl) return results; } -void df_to_json(const TableInfo& tbl, std::ostream& out_stream) +void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col) { // Unlike df_to_csv, we use the ostream overload to call the string overload because there is no C++ implementation // of to_json - std::string output = df_to_json(tbl); + std::string output = df_to_json(tbl, include_index_col); // Now write the contents to the stream out_stream.write(output.data(), output.size()); diff --git a/morpheus/_lib/src/python_modules/stages.cpp b/morpheus/_lib/src/python_modules/stages.cpp index 2b06145a16..87e527e2fe 100644 --- a/morpheus/_lib/src/python_modules/stages.cpp +++ b/morpheus/_lib/src/python_modules/stages.cpp @@ -168,8 +168,9 @@ PYBIND11_MODULE(stages, m) py::arg("builder"), py::arg("name"), py::arg("filename"), - py::arg("mode") = "w", - py::arg("file_type") = 0); // Setting this to FileTypes::AUTO throws a conversion error at runtime + py::arg("mode") = "w", + py::arg("file_type") = 0, // Setting this to FileTypes::AUTO throws a conversion error at runtime + py::arg("include_index_col") = true); #ifdef VERSION_INFO m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO); diff --git a/morpheus/_lib/src/stages/write_to_file.cpp b/morpheus/_lib/src/stages/write_to_file.cpp index 4ed12456db..cda3ed00f3 100644 --- a/morpheus/_lib/src/stages/write_to_file.cpp +++ b/morpheus/_lib/src/stages/write_to_file.cpp @@ -27,9 +27,13 @@ namespace morpheus { // Component public implementations // ************ WriteToFileStage **************************** // -WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmode mode, FileTypes file_type) : +WriteToFileStage::WriteToFileStage(const std::string &filename, + std::ios::openmode mode, + FileTypes file_type, + bool include_index_col) : PythonNode(base_t::op_factory_from_sub_fn(build_operator())), - m_is_first(true) + m_is_first(true), + m_include_index_col(include_index_col) { if (file_type == FileTypes::Auto) { @@ -59,13 +63,13 @@ WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmo void WriteToFileStage::write_json(WriteToFileStage::sink_type_t &msg) { // Call df_to_json passing our fstream - df_to_json(msg->get_info(), m_fstream); + df_to_json(msg->get_info(), m_fstream, m_include_index_col); } void WriteToFileStage::write_csv(WriteToFileStage::sink_type_t &msg) { // Call df_to_csv passing our fstream - df_to_csv(msg->get_info(), m_fstream, m_is_first); + df_to_csv(msg->get_info(), m_fstream, m_is_first, m_include_index_col); } void WriteToFileStage::close() @@ -102,7 +106,8 @@ std::shared_ptr> WriteToFileStageInterfac const std::string &name, const std::string &filename, const std::string &mode, - FileTypes file_type) + FileTypes file_type, + bool include_index_col) { std::ios::openmode fsmode = std::ios::out; @@ -138,7 +143,7 @@ std::shared_ptr> WriteToFileStageInterfac throw std::runtime_error(std::string("Unsupported file mode. Must choose either 'w' or 'a'. Mode: ") + mode); } - auto stage = builder.construct_object(name, filename, fsmode, file_type); + auto stage = builder.construct_object(name, filename, fsmode, file_type, include_index_col); return stage; } diff --git a/morpheus/cli.py b/morpheus/cli.py index 6a67b93820..aee3016589 100644 --- a/morpheus/cli.py +++ b/morpheus/cli.py @@ -30,7 +30,7 @@ from morpheus.config import CppConfig from morpheus.config import PipelineModes from morpheus.config import auto_determine_bootstrap -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging # pylint: disable=line-too-long, import-outside-toplevel, invalid-name, global-at-module-level, unused-argument @@ -1325,6 +1325,12 @@ def validate(ctx: click.Context, **kwargs): @click.command(short_help="Write all messages to a file", **command_kwargs) @click.option('--filename', type=click.Path(writable=True), required=True, help="The file to write to") @click.option('--overwrite', is_flag=True, help="Whether or not to overwrite the target file") +@click.option('--include-index-col', + 'include_index_col', + default=True, + type=bool, + help=("Includes dataframe's index column in the output " + "Note: this currently only works for CSV file output")) @prepare_command() def to_file(ctx: click.Context, **kwargs): diff --git a/morpheus/io/serializers.py b/morpheus/io/serializers.py index 64bcd58bde..8bf8996cfd 100644 --- a/morpheus/io/serializers.py +++ b/morpheus/io/serializers.py @@ -19,7 +19,10 @@ import cudf -def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> typing.List[str]: +def df_to_csv(df: cudf.DataFrame, + include_header=False, + strip_newline=False, + include_index_col=True) -> typing.List[str]: """ Serializes a DataFrame into CSV and returns the serialized output seperated by lines. @@ -31,13 +34,15 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> Whether or not to include the header, by default False. strip_newline : bool, optional Whether or not to strip the newline characters from each string, by default False. + include_index_col: bool, optional + Write out the index as a column, by default True. Returns ------- typing.List[str] List of strings for each line """ - results = df.to_csv(header=include_header) + results = df.to_csv(header=include_header, index=include_index_col) if strip_newline: results = results.split("\n") else: @@ -46,7 +51,7 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> return results -def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]: +def df_to_json(df: cudf.DataFrame, strip_newlines=False, include_index_col=True) -> typing.List[str]: """ Serializes a DataFrame into JSON and returns the serialized output seperated by lines. @@ -56,7 +61,10 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]: Input DataFrame to serialize. strip_newline : bool, optional Whether or not to strip the newline characters from each string, by default False. - + include_index_col: bool, optional + Write out the index as a column, by default True. + Note: This value is currently being ignored due to a known issue in Pandas: + https://github.com/pandas-dev/pandas/issues/37600 Returns ------- typing.List[str] @@ -65,7 +73,7 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]: str_buf = StringIO() # Convert to list of json string objects - df.to_json(str_buf, orient="records", lines=True) + df.to_json(str_buf, orient="records", lines=True, index=include_index_col) # Start from beginning str_buf.seek(0) diff --git a/morpheus/stages/general/buffer_stage.py b/morpheus/stages/general/buffer_stage.py index 8d1a7d87e9..559f04b482 100644 --- a/morpheus/stages/general/buffer_stage.py +++ b/morpheus/stages/general/buffer_stage.py @@ -20,7 +20,7 @@ from morpheus.config import Config from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair -from morpheus.utils.logging import deprecated_stage_warning +from morpheus.utils.logger import deprecated_stage_warning logger = logging.getLogger(__name__) diff --git a/morpheus/stages/general/delay_stage.py b/morpheus/stages/general/delay_stage.py index 6b274714d9..5bedf308f6 100644 --- a/morpheus/stages/general/delay_stage.py +++ b/morpheus/stages/general/delay_stage.py @@ -20,7 +20,7 @@ from morpheus.config import Config from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair -from morpheus.utils.logging import deprecated_stage_warning +from morpheus.utils.logger import deprecated_stage_warning logger = logging.getLogger(__name__) diff --git a/morpheus/stages/output/write_to_file_stage.py b/morpheus/stages/output/write_to_file_stage.py index e3ea37ad30..c55a9fa03f 100644 --- a/morpheus/stages/output/write_to_file_stage.py +++ b/morpheus/stages/output/write_to_file_stage.py @@ -49,7 +49,12 @@ class WriteToFileStage(SinglePortStage): """ - def __init__(self, c: Config, filename: str, overwrite: bool, file_type: FileTypes = FileTypes.Auto): + def __init__(self, + c: Config, + filename: str, + overwrite: bool, + file_type: FileTypes = FileTypes.Auto, + include_index_col: bool = True): super().__init__(c) @@ -69,6 +74,7 @@ def __init__(self, c: Config, filename: str, overwrite: bool, file_type: FileTyp self._file_type = determine_file_type(self._output_file) self._is_first = True + self._include_index_col = include_index_col @property def name(self) -> str: @@ -91,9 +97,11 @@ def supports_cpp_node(self): def _convert_to_strings(self, df: typing.Union[pd.DataFrame, cudf.DataFrame]): if (self._file_type == FileTypes.JSON): - output_strs = serializers.df_to_json(df) + output_strs = serializers.df_to_json(df, include_index_col=self._include_index_col) elif (self._file_type == FileTypes.CSV): - output_strs = serializers.df_to_csv(df, include_header=self._is_first) + output_strs = serializers.df_to_csv(df, + include_header=self._is_first, + include_index_col=self._include_index_col) self._is_first = False else: raise NotImplementedError("Unknown file type: {}".format(self._file_type)) @@ -110,7 +118,12 @@ def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> Strea # Sink to file if (self._build_cpp_node()): - to_file = _stages.WriteToFileStage(builder, self.unique_name, self._output_file, "w", self._file_type) + to_file = _stages.WriteToFileStage(builder, + self.unique_name, + self._output_file, + "w", + self._file_type, + self._include_index_col) else: def node_fn(obs: srf.Observable, sub: srf.Subscriber): diff --git a/morpheus/stages/postprocess/validation_stage.py b/morpheus/stages/postprocess/validation_stage.py index c52df37701..b05547ed0d 100644 --- a/morpheus/stages/postprocess/validation_stage.py +++ b/morpheus/stages/postprocess/validation_stage.py @@ -16,7 +16,6 @@ import json import logging import os -import re import typing import pandas as pd @@ -31,6 +30,7 @@ from morpheus.messages import MultiMessage from morpheus.pipeline.multi_message_stage import MultiMessageStage from morpheus.pipeline.stream_pair import StreamPair +from morpheus.utils import compare_df logger = logging.getLogger(__name__) @@ -100,8 +100,6 @@ def __init__( "Cannot output validation results to '{}'. File exists and overwrite = False".format( self._results_file_name)) - self._val_df: pd.DataFrame = None - @property def name(self) -> str: return "validation" @@ -121,36 +119,11 @@ def accepted_types(self) -> typing.Tuple: def supports_cpp_node(self): return False - def _filter_df(self, df): - include_columns = None - - if (self._include_columns is not None and len(self._include_columns) > 0): - include_columns = re.compile("({})".format("|".join(self._include_columns))) - - exclude_columns = [re.compile(x) for x in self._exclude_columns] - - # Filter out any known good/bad columns we dont want to compare - columns: typing.List[str] = [] - - # First build up list of included. If no include regex is specified, select all - if (include_columns is None): - columns = list(df.columns) - else: - columns = [y for y in list(df.columns) if include_columns.match(y)] - - # Now remove by the ignore - for test in exclude_columns: - columns = [y for y in columns if not test.match(y)] - - return df[columns] - def _do_comparison(self, messages: typing.List[MultiMessage]): if (len(messages) == 0): return - import datacompy - # Get all of the meta data and combine into a single frame all_meta = [x.get_meta() for x in messages] @@ -159,68 +132,17 @@ def _do_comparison(self, messages: typing.List[MultiMessage]): combined_df = pd.concat(all_meta) - results_df = self._filter_df(combined_df) - - # if the index column is set, make that the index - if (self._index_col is not None): - results_df = results_df.set_index(self._index_col, drop=True) - - if (self._index_col.startswith("_index_")): - results_df.index.name = str(results_df.index.name).replace("_index_", "", 1) - - val_df = self._filter_df(read_file_to_df(self._val_file_name, FileTypes.Auto, df_type="pandas")) - - # Now start the comparison - missing_columns = val_df.columns.difference(results_df.columns) - extra_columns = results_df.columns.difference(val_df.columns) - same_columns = val_df.columns.intersection(results_df.columns) - - # Now get the results in the same order - results_df = results_df[same_columns] - - comparison = datacompy.Compare( - val_df, - results_df, - on_index=True, - abs_tol=self._abs_tol, - rel_tol=self._rel_tol, - df1_name="val", - df2_name="res", - cast_column_names_lower=False, - ) - - total_rows = len(val_df) - diff_rows = len(val_df) - int(comparison.count_matching_rows()) - - if (comparison.matches()): - logger.info("Results match validation dataset") - else: - match_columns = comparison.intersect_rows[same_columns + "_match"] - - mismatched_idx = match_columns[match_columns.apply(lambda r: not r.all(), axis=1)].index - - merged = pd.concat([val_df, results_df], keys=["val", "res"]).swaplevel().sort_index() - - mismatch_df = merged.loc[mismatched_idx] - - logger.debug("Results do not match. Diff %d/%d (%f %%). First 10 mismatched rows:", - diff_rows, - total_rows, - diff_rows / total_rows * 100.0) - logger.debug(mismatch_df[:20]) - - # Now build the output - output = { - "total_rows": total_rows, - "matching_rows": int(comparison.count_matching_rows()), - "diff_rows": diff_rows, - "matching_cols": list(same_columns), - "extra_cols": list(extra_columns), - "missing_cols": list(missing_columns), - } + val_df = read_file_to_df(self._val_file_name, FileTypes.Auto, df_type="pandas") + results = compare_df.compare_df(val_df, + combined_df, + self._include_columns, + self._exclude_columns, + replace_idx=self._index_col, + abs_tol=self._abs_tol, + rel_tol=self._rel_tol) with open(self._results_file_name, "w") as f: - json.dump(output, f, indent=2, sort_keys=True) + json.dump(results, f, indent=2, sort_keys=True) def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair: diff --git a/morpheus/utils/compare_df.py b/morpheus/utils/compare_df.py new file mode 100644 index 0000000000..5a661acda8 --- /dev/null +++ b/morpheus/utils/compare_df.py @@ -0,0 +1,137 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +import typing + +import datacompy +import pandas as pd + +logger = logging.getLogger(__name__) + + +def filter_df(df: pd.DataFrame, + include_columns: typing.List[str], + exclude_columns: typing.List[str], + replace_idx: str = None): + + if (include_columns is not None and len(include_columns) > 0): + include_columns = re.compile("({})".format("|".join(include_columns))) + + if exclude_columns is not None: + exclude_columns = [re.compile(x) for x in exclude_columns] + else: + exclude_columns = [] + + # Filter out any known good/bad columns we dont want to compare + columns: typing.List[str] = [] + + # First build up list of included. If no include regex is specified, select all + if (include_columns is None or len(include_columns) == 0): + columns = list(df.columns) + else: + columns = [y for y in list(df.columns) if include_columns.match(y)] + + # Now remove by the ignore + for test in exclude_columns: + columns = [y for y in columns if not test.match(y)] + + filtered_df = df[columns] + + # if the index column is set, make that the index + if replace_idx is not None and replace_idx in filtered_df: + filtered_df = filtered_df.set_index(replace_idx, drop=True) + + if replace_idx.startswith("_index_"): + filtered_df.index.name = str(filtered_df.index.name).replace("_index_", "", 1) + + return filtered_df + + +def compare_df(df_a: pd.DataFrame, + df_b: pd.DataFrame, + include_columns: typing.List[str] = None, + exclude_columns: typing.List[str] = None, + replace_idx: str = None, + abs_tol: float = 0.001, + rel_tol: float = 0.005, + dfa_name: str = "val", + dfb_name: str = "res"): + """ + Compares two pandas Dataframe, returning a comparison summary as a dict in the form of: + ``` + { + "total_rows": , + "matching_rows": , + "diff_rows": , + "matching_cols": <[str]>, + "extra_cols": extra_cols: <[str]>, + "missing_cols": missing_cols: <[str]>, + } + ``` + """ + df_a_filtered = filter_df(df_a, include_columns, exclude_columns, replace_idx=replace_idx) + df_b_filtered = filter_df(df_b, include_columns, exclude_columns, replace_idx=replace_idx) + + missing_columns = df_a_filtered.columns.difference(df_b_filtered.columns) + extra_columns = df_b_filtered.columns.difference(df_a_filtered.columns) + same_columns = df_a_filtered.columns.intersection(df_b_filtered.columns) + + # Now get the results in the same order + df_b_filtered = df_b_filtered[same_columns] + + comparison = datacompy.Compare( + df_a_filtered, + df_b_filtered, + on_index=True, + abs_tol=abs_tol, + rel_tol=rel_tol, + df1_name=dfa_name, + df2_name=dfb_name, + cast_column_names_lower=False, + ) + + total_rows = len(df_a_filtered) + diff_rows = len(df_a_filtered) - int(comparison.count_matching_rows()) + + if (comparison.matches()): + logger.info("Results match validation dataset") + else: + match_columns = comparison.intersect_rows[same_columns + "_match"] + mismatched_idx = match_columns[match_columns.apply(lambda r: not r.all(), axis=1)].index + + merged = pd.concat([df_a_filtered, df_b_filtered], keys=[dfa_name, dfb_name]).swaplevel().sort_index() + + mismatch_df = merged.loc[mismatched_idx] + + if diff_rows > 0: + logger.debug("Results do not match. Diff %d/%d (%f %%). First 10 mismatched rows:", + diff_rows, + total_rows, + diff_rows / total_rows * 100.0) + logger.debug(mismatch_df[:20]) + else: + logger.info("Results match validation dataset") + + # Now build the output + return { + "total_rows": total_rows, + "matching_rows": int(comparison.count_matching_rows()), + "diff_rows": diff_rows, + "matching_cols": list(same_columns), + "extra_cols": list(extra_columns), + "missing_cols": list(missing_columns), + } diff --git a/morpheus/utils/logging.py b/morpheus/utils/logger.py similarity index 100% rename from morpheus/utils/logging.py rename to morpheus/utils/logger.py diff --git a/scripts/compare_data_files.py b/scripts/compare_data_files.py new file mode 100755 index 0000000000..b7bb65f87e --- /dev/null +++ b/scripts/compare_data_files.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import sys + +from morpheus._lib.file_types import FileTypes +from morpheus.io.deserializers import read_file_to_df +from morpheus.utils.compare_df import compare_df +from morpheus.utils.logger import configure_logging + + +def parse_args(): + argparser = argparse.ArgumentParser("Compares two data files which are parsable as Pandas dataframes") + argparser.add_argument("data_files", nargs=2, help="Files to compare") + argparser.add_argument('--include', + nargs='*', + help=("Which columns to include in the validation. " + "Resulting columns is the intersection of all regex. Include applied before exclude")) + argparser.add_argument( + '--exclude', + nargs='*', + default=[r'^ID$', r'^_ts_'], + help=("Which columns to exclude from the validation. " + "Resulting ignored columns is the intersection of all regex. Include applied before exclude")) + argparser.add_argument( + '--index_col', + help=("Specifies a column which will be used to align messages with rows in the validation dataset.")) + argparser.add_argument('--abs_tol', + type=float, + default=0.001, + help="Absolute tolerance to use when comparing float columns.") + argparser.add_argument('--rel_tol', + type=float, + default=0.05, + help="Relative tolerance to use when comparing float columns.") + args = argparser.parse_args() + return args + + +def main(): + args = parse_args() + configure_logging(log_level=logging.DEBUG) + + df_a = read_file_to_df(args.data_files[0], file_type=FileTypes.Auto, df_type='pandas') + df_b = read_file_to_df(args.data_files[1], file_type=FileTypes.Auto, df_type='pandas') + results = compare_df(df_a, + df_b, + include_columns=args.include, + exclude_columns=args.exclude, + replace_idx=args.index_col, + abs_tol=args.abs_tol, + rel_tol=args.rel_tol) + + if results['diff_rows'] > 0: + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/scripts/validation/kafka_testing.md b/scripts/validation/kafka_testing.md new file mode 100644 index 0000000000..4fbf17ee12 --- /dev/null +++ b/scripts/validation/kafka_testing.md @@ -0,0 +1,465 @@ +## Pre-reqs +1. Launch Kafka using instructions from the [Quick Launch Kafka Cluster](../../CONTRIBUTING.md#quick-launch-kafka-cluster) section of [CONTRIBUTING.md](../../CONTRIBUTING.md) +1. Populate an environment variable `BROKER_LIST` with the IP:Ports of the nodes in the Kafka cluster. Ensure this environment variable is set in all of the terminals where Morpheus is executed: + ```bash + export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') + export BROKER_LIST=$(HOST_IP=$KAFKA_ADVERTISED_HOST_NAME ./broker-list.sh) + ``` + +## Simple Data Copying +### Checking KafkaSourceStage +#### Single Partition Topic Test +1. Open a new terminal and create a topic called "morpheus-src-copy-test" with only a single partition + ```bash + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-src-copy-test --partitions 1 --bootstrap-server `broker-list.sh` + ``` + Keep this shell & container open you will need it in later steps. + +1. Open a new terminal and launch a pipeline to listen to Kafka, from the root of the Morpheus repo run: + ```bash + morpheus --log_level=DEBUG run \ + pipeline-nlp \ + from-kafka --input_topic morpheus-src-copy-test --bootstrap_servers "${BROKER_LIST}" \ + monitor --description "Kafka Read" \ + deserialize \ + serialize \ + to-file --include-index-col=false --filename=${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test.csv --overwrite + ``` + +1. Return to the Kafka terminal and run: + ```bash + cat /workspace/tests/tests_data/filter_probs.jsonlines | \ + $KAFKA_HOME/bin/kafka-console-producer.sh \ + --topic=morpheus-src-copy-test --broker-list=`broker-list.sh` - + ``` + +1. Return to the Morpheus terminal, and once the monitor stage has recorded: `read: 20 messages` shut down the pipeline with Cntrl-C. + +1. If successful the output file `.tmp/morpheus-src-copy-test.csv` should be identicle to `tests/tests_data/filter_probs.csv`. Verify: + ```bash + diff -q --ignore-all-space ${MORPHEUS_ROOT}/tests/tests_data/filter_probs.csv ${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test.csv + ``` + +#### Partitioned Topic Test +1. From the Kafka terminal create a new topic named "morpheus-src-copy-test-p" with three partitions: + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-src-copy-test-p --partitions 3 --bootstrap-server `broker-list.sh` + ``` + +1. Open a new terminal and launch a pipeline to listen to Kafka, from the root of the Morpheus repo run: + ```bash + morpheus --log_level=DEBUG run \ + pipeline-nlp \ + from-kafka --input_topic morpheus-src-copy-test-p --bootstrap_servers "${BROKER_LIST}" \ + deserialize \ + monitor --description "Kafka Read" \ + serialize \ + to-file --include-index-col=false --filename=${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test-p.csv --overwrite + ``` + +1. Return to the Kafka terminal and run: + ```bash + cat /workspace/tests/tests_data/filter_probs.jsonlines | \ + $KAFKA_HOME/bin/kafka-console-producer.sh \ + --topic=morpheus-src-copy-test-p --broker-list=`broker-list.sh` - + ``` + +1. Return to the Morpheus terminal, and once the monitor stage has recorded: `read: 20 messages` shut down the pipeline with Cntrl-C. + +1. If successful the output file `.tmp/morpheus-src-copy-test-p.csv` should contain the same records as those in `tests/tests_data/filter_probs.csv` however they are most likely out of order. To verify the output we will compare the sorted outputs: + ```bash + diff -q --ignore-all-space <(sort tests/tests_data/filter_probs.csv) <(sort .tmp/morpheus-src-copy-test-p.csv) + ``` + + +### Checking WriteToKafkaStage +#### Single Partition Topic Test +1. Open a new terminal and create a topic called "morpheus-sink-copy-test" with only a single partition, and start a consumer on that topic: + ```bash + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sink-copy-test --partitions 1 --bootstrap-server `broker-list.sh` + + $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-sink-copy-test \ + --bootstrap-server `broker-list.sh` > /workspace/.tmp/morpheus-sink-copy-test.jsonlines + ``` + +1. Open a new terminal and from the Morpheus root run: + ```bash + morpheus --log_level=DEBUG run \ + pipeline-nlp \ + from-file --filename=${MORPHEUS_ROOT}/tests/tests_data/filter_probs.csv \ + deserialize \ + serialize \ + to-kafka --output_topic morpheus-sink-copy-test --bootstrap_servers "${BROKER_LIST}" + ``` + The `tests/tests_data/filter_probs.csv` contains 20 lines of data and the pipeline should complete rather quickly (less than 5 seconds). + +1. The Kafka consumer we started in step #1 won't give us any sort of indication as to how many records have been consumed, we will indirectly check the progress by counting the rows in the output file. Once the Morpheus pipeline completes check the number of lines in the output: + ```bash + wc -l ${MORPHEUS_ROOT}/.tmp/morpheus-sink-copy-test.jsonlines + ``` + +1. Once all 20 lines have been written to the output file, verify the contents with: + ```bash + diff -q --ignore-all-space <(cat ${MORPHEUS_ROOT}/.tmp/morpheus-sink-copy-test.jsonlines | jq --sort-keys) <(cat ${MORPHEUS_ROOT}/tests/tests_data/filter_probs.jsonlines | jq --sort-keys) + ``` + Note the usage of `jq --sort-keys` which will reformat the json outut, sorting the keys, this ensures that `{"a": 5, "b": 6}` and `{"b": 6, "a": 5}` are considered equivelant. + +1. Stop the consumer in the Kafka terminal. + +#### Partitioned Topic Test +1. From the Kafka terminal create a new topic named "morpheus-sink-copy-test-p" with three partitions, and start a consumer on that topic: + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sink-copy-test-p --partitions 3 --bootstrap-server `broker-list.sh` + + $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-sink-copy-test-p \ + --bootstrap-server `broker-list.sh` > /workspace/.tmp/morpheus-sink-copy-test-p.jsonlines + ``` + +1. Open a new terminal and from the Morpheus root run: + ```bash + morpheus --log_level=DEBUG run \ + pipeline-nlp \ + from-file --filename=${MORPHEUS_ROOT}/tests/tests_data/filter_probs.csv \ + deserialize \ + serialize \ + to-kafka --output_topic morpheus-sink-copy-test-p --bootstrap_servers "${BROKER_LIST}" + ``` + The `tests/tests_data/filter_probs.csv` contains 20 lines of data and the pipeline should complete rather quickly (less than 5 seconds). + +1. The Kafka consumer we started in step #1 won't give us any sort of indication that it has concluded we will indirectly check the progress by counting the rows in the output file. Once the Morpheus pipeline completes check the number of lines in the output: + ```bash + wc -l ${MORPHEUS_ROOT}/.tmp/morpheus-sink-copy-test-p.jsonlines + ``` + +1. Once all 20 lines have been written to the output file, verify the contents with: + ```bash + diff -q --ignore-all-space <(sort ${MORPHEUS_ROOT}/.tmp/morpheus-sink-copy-test-p.jsonlines | jq --sort-keys) <(sort ${MORPHEUS_ROOT}/tests/tests_data/filter_probs.jsonlines | jq --sort-keys) + ``` + Note due to the multiple partitions the consumer most likely receieved records out of order, so we are comparing the sorted output of both files. + +1. Stop the consumer in the Kafka terminal. + + +## ABP Validation Pipeline +For this test we are going to replace the from & to file stages from the ABP validation pipeline with Kafka stages, reading input data from a Kafka topic named "morpheus-abp-pre" and writing results to a topic named "morpheus-abp-post" + +1. Create two Kafka topics both with only a single partition, and launch a consumer listening to the morpheus-abp-post topic. + ```bash + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-abp-pre --partitions 1 --bootstrap-server `broker-list.sh` + + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-abp-post --partitions 1 --bootstrap-server `broker-list.sh` + + $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-abp-post \ + --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines + ``` + +1. In a new terminal launch Triton: + ```bash + docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ + nvcr.io/nvidia/tritonserver:22.02-py3 \ + tritonserver --model-repository=/models/triton-model-repo \ + --exit-on-error=false \ + --model-control-mode=explicit \ + --load-model abp-nvsmi-xgb + ``` + +1. Open a new terminal and launch the inference pipeline which will both listen and write to kafka: + ```bash + morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ + pipeline-fil \ + from-kafka --input_topic morpheus-abp-pre --bootstrap_servers "${BROKER_LIST}" \ + monitor --description "Kafka Read" \ + deserialize \ + preprocess \ + inf-triton --model_name=abp-nvsmi-xgb --server_url="localhost:8000" --force_convert_inputs=True \ + monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ + add-class \ + serialize \ + to-kafka --output_topic morpheus-abp-post --bootstrap_servers "${BROKER_LIST}" \ + monitor --description "Kafka Write" + ``` + +1. Open a new terminal and launch a Kafka producer to feed the morpheus-abp-pre topic with the input data: + ```bash + export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + cat /workspace/models/datasets/validation-data/abp-validation-data.jsonlines | \ + $KAFKA_HOME/bin/kafka-console-producer.sh \ + --topic=morpheus-abp-pre --broker-list=`broker-list.sh` - + ``` + This command should execute quickly writing `1242` records and should complete in less than 5 seconds. + +1. Return to the Morpheus terminal. Once the `Kafka Write` monitor has reported that `1242` messages has been written shutdown Morpheus with Cntrl-C. We can check the number of lines in the outut file: + ```bash + wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines + ``` + +1. Once all `1242` lines have been written to the output file, verify the contents with: + ```bash + diff -q --ignore-all-space <(cat ${MORPHEUS_ROOT}/models/datasets/validation-data/abp-validation-data.jsonlines | jq --sort-keys) <(cat ${MORPHEUS_ROOT}/.tmp/val_kafka_abp-nvsmi-xgb.jsonlines | jq --sort-keys) + ``` + +1. Stop the consumer in the first Kafka terminal, and stop Triton. + +## Hammah Validation Pipeline +### User123 +For this test we are going to replace to-file stage from the Hammah validation pipeline with the to-kafka stage using a topic named "morpheus-hammah-user123". Note: this pipeline requires a custom `UserMessageMeta` class which the from-kafka stage is currently unable to generate, for that reason the `CloudTrailSourceStage` remains in-place. + +1. Create the Kafka topic, and launch a consumer listening to . + ```bash + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-hammah-user123 --partitions 1 --bootstrap-server `broker-list.sh` + + $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-hammah-user123 \ + --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_hammah-user123-pytorch.jsonlines + ``` + +1. Open a new terminal and launch the pipeline which will write results to kafka: + ```bash + morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=false \ + pipeline-ae --userid_filter="user123" --userid_column_name="userIdentitysessionContextsessionIssueruserName" \ + from-cloudtrail --input_glob="${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-*.csv" \ + train-ae --train_data_glob="${MORPHEUS_ROOT}/models/datasets/training-data/hammah-*.csv" --seed 42 \ + preprocess \ + inf-pytorch \ + add-scores \ + timeseries --resolution=1m --zscore_threshold=8.0 --hot_start \ + monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ + serialize --exclude='event_dt|tlsDetailsclientProvidedHostHeader' \ + to-kafka --output_topic morpheus-hammah-user123 --bootstrap_servers "${BROKER_LIST}" \ + monitor --description "Kafka Write" + ``` + + This pipeline should complete in approximately 10 seconds, with the Kafka monitor stage recording `847` messages written to Kafka. + +1. The Kafka consumer we started in step #1 won't give us any sort of indication as to how many records have been consumed, we will indirectly check the progress by counting the rows in the output file. Once the Morpheus pipeline completes check the number of lines in the output: + ```bash + wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-user123-pytorch.jsonlines + ``` + +1. Once all `847` rows have been written, return to the Kafka terminal and stop the consumer with Cntrl-C. + +1. Verify the output with, expect to see `38` unmatched rows: + ```bash + ${MORPHEUS_ROOT}/scripts/compare_data_files.py \ + ${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-user123-validation-data.csv \ + ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-user123-pytorch.jsonlines \ + --index_col="_index_" --exclude "event_dt" --rel_tol=0.1 + ``` + +### Role-g +Similar to the Hammah User123 test, we are going to replace to-file stage from the Hammah validation pipeline with the to-kafka stage using a topic named "morpheus-hammah-role-g". + +1. Create the Kafka topic, and launch a consumer listening to . + ```bash + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-hammah-role-g --partitions 1 --bootstrap-server `broker-list.sh` + + $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-hammah-role-g \ + --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines + ``` + +1. Open a new terminal and launch the pipeline which will write results to kafka: + ```bash + morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=false \ + pipeline-ae --userid_filter="role-g" --userid_column_name="userIdentitysessionContextsessionIssueruserName" \ + from-cloudtrail --input_glob="${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-*.csv" \ + train-ae --train_data_glob="${MORPHEUS_ROOT}/models/datasets/training-data/hammah-*.csv" --seed 42 \ + preprocess \ + inf-pytorch \ + add-scores \ + timeseries --resolution=10m --zscore_threshold=8.0 \ + monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ + serialize --exclude='event_dt|tlsDetailsclientProvidedHostHeader' \ + to-kafka --output_topic morpheus-hammah-role-g --bootstrap_servers "${BROKER_LIST}" \ + monitor --description "Kafka Write" + ``` + + This pipeline should complete in approximately 10 seconds, with the Kafka monitor stage recording `314` messages written to Kafka. + +1. The Kafka consumer we started in step #1 won't give us any sort of indication as to how many records have been consumed, we will indirectly check the progress by counting the rows in the output file. Once the Morpheus pipeline completes check the number of lines in the output: + ```bash + wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines + ``` + +1. Once all `314` rows have been written, return to the Kafka terminal and stop the consumer with Cntrl-C. + +1. Verify the output with, all rows should match: + ```bash + ${MORPHEUS_ROOT}/scripts/compare_data_files.py \ + ${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-role-g-validation-data.csv \ + ${MORPHEUS_ROOT}/.tmp/val_kafka_hammah-role-g-pytorch.jsonlines \ + --index_col="_index_" --exclude "event_dt" --rel_tol=0.15 + ``` + +## Phishing Validation Pipeline +For this test we are going to replace the from & to file stages from the Phishing validation pipeline with Kafka stages, reading input data from a Kafka topic named "morpheus-phishing-pre" and writing results to a topic named "morpheus-phishing-post" + +1. Create two Kafka topics both with only a single partition, and launch a consumer listening to the morpheus-phishing-post topic. + ```bash + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-phishing-pre --partitions 1 --bootstrap-server `broker-list.sh` + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-phishing-post --partitions 1 --bootstrap-server `broker-list.sh` + $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-phishing-post \ + --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_phishing.jsonlines + ``` + +1. In a new terminal launch Triton: + ```bash + docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ + nvcr.io/nvidia/tritonserver:22.02-py3 \ + tritonserver --model-repository=/models/triton-model-repo \ + --exit-on-error=false \ + --model-control-mode=explicit \ + --load-model phishing-bert-onnx + ``` + +1. Open a new terminal and launch the inference pipeline which will both listen and write to kafka: + ```bash + morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 \ + pipeline-nlp --model_seq_length=128 --labels_file=${MORPHEUS_ROOT}/morpheus/data/labels_phishing.txt \ + from-kafka --input_topic morpheus-phishing-pre --bootstrap_servers "${BROKER_LIST}" \ + monitor --description "Kafka Read" \ + deserialize \ + preprocess --vocab_hash_file=${MORPHEUS_ROOT}/morpheus/data/bert-base-uncased-hash.txt \ + --truncation=True --do_lower_case=True --add_special_tokens=False \ + inf-triton --model_name=phishing-bert-onnx --server_url="localhost:8000" --force_convert_inputs=True \ + monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ + add-class --label=pred --threshold=0.7 \ + serialize \ + to-kafka --output_topic morpheus-phishing-post --bootstrap_servers "${BROKER_LIST}" \ + monitor --description "Kafka Write" + ``` + +1. Open a new terminal and launch a Kafka producer to feed the morpheus-phishing-pre topic with the input data: + ```bash + export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway') + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + cat /workspace/models/datasets/validation-data/phishing-email-validation-data.jsonlines | \ + $KAFKA_HOME/bin/kafka-console-producer.sh \ + --topic=morpheus-phishing-pre --broker-list=`broker-list.sh` - + ``` + This command should execute quickly writing `1010` records and should complete in less than 5 seconds. + +1. Return to the Morpheus terminal. The pipeline will take anywhere from 2 to 5 minutes to complete. Once the `Kafka Write` monitor has reported that `1010` messages has been written shutdown Morpheus with Cntrl-C. We can check the number of lines in the outut file: + ```bash + wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_phishing.jsonlines + ``` + +1. Once all `1010` rows have been written, return to the Kafka terminal and stop the consumer with Cntrl-C. + +1. Verify the output with, expect to see `43` un-matched rows: + ```bash + ${MORPHEUS_ROOT}/scripts/compare_data_files.py \ + ${MORPHEUS_ROOT}/models/datasets/validation-data/phishing-email-validation-data.jsonlines \ + ${MORPHEUS_ROOT}/.tmp/val_kafka_phishing.jsonlines + ``` + +1. Stop Triton + +## Sid Validation Pipeline +For this test we are going to replace the file stage from the Sid validation pipeline with the to-kafka stage writing results to a topic named "morpheus-sid-post". +Note: Due to the complexity of the input data and a limitation of the cudf reader we will need to keep the from-file source reading data as CSV. + +1. Create two Kafka topic and launch a consumer listening to the morpheus-sid-post topic. + ```bash + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash + ``` + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sid-post --partitions 1 --bootstrap-server `broker-list.sh` + $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-sid-post \ + --bootstrap-server `broker-list.sh` > /workspace/.tmp/val_kafka_sid.jsonlines + ``` + +1. In a new terminal launch Triton: + ```bash + docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v ${MORPHEUS_ROOT}/models:/models \ + nvcr.io/nvidia/tritonserver:22.02-py3 \ + tritonserver --model-repository=/models/triton-model-repo \ + --exit-on-error=false \ + --model-control-mode=explicit \ + --load-model sid-minibert-onnx + ``` + +1. Open a new terminal and launch the inference pipeline which will both listen and write to kafka: + ```bash + morpheus --log_level=DEBUG run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 \ + pipeline-nlp --model_seq_length=256 \ + from-file --filename=${MORPHEUS_ROOT}/models/datasets/validation-data/sid-validation-data.csv \ + deserialize \ + preprocess --vocab_hash_file=${MORPHEUS_ROOT}/morpheus/data/bert-base-uncased-hash.txt \ + --truncation=True --do_lower_case=True --add_special_tokens=False \ + inf-triton --model_name=sid-minibert-onnx --server_url="localhost:8000" --force_convert_inputs=True \ + monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ + add-class --prefix="si_" \ + serialize --exclude "id" --exclude "^_ts_" \ + to-kafka --output_topic morpheus-sid-post --bootstrap_servers "${BROKER_LIST}" \ + monitor --description "Kafka Write" + ``` + +1. The pipeline will take aproximately 2 minutes to complete. We can check the number of lines in the outut file: + ```bash + wc -l ${MORPHEUS_ROOT}/.tmp/val_kafka_sid.jsonlines + ``` + +1. Once all `2000` rows have been written, return to the Kafka terminal and stop the consumer with Cntrl-C. + +1. Verify the output with, expect to see `25` un-matched rows: + ```bash + ${MORPHEUS_ROOT}/scripts/compare_data_files.py \ + ${MORPHEUS_ROOT}/models/datasets/validation-data/sid-validation-data.csv \ + ${MORPHEUS_ROOT}/.tmp/val_kafka_sid.jsonlines + ``` + +1. Stop Triton + +## Optional Cleanup +### Delete all topics +1. Return to the Kafka terminal and run: + ```bash + $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh` | xargs -I'{}' $KAFKA_HOME/bin/kafka-topics.sh --delete --bootstrap-server `broker-list.sh` --topic='{}' + ``` + +### Shutdown Kafka +1. From the root of the `kafka-docker` repo run (in the host OS not inside a container): + ```bash + docker-compose stop + docker-compose rm + ``` diff --git a/tests/benchmarks/test_bench_monitor_stage.py b/tests/benchmarks/test_bench_monitor_stage.py index 5ea624b0a0..60a81099d7 100644 --- a/tests/benchmarks/test_bench_monitor_stage.py +++ b/tests/benchmarks/test_bench_monitor_stage.py @@ -26,7 +26,7 @@ from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging def build_and_run_pipeline(config: Config, df: cudf.DataFrame): diff --git a/tests/benchmarks/test_bench_serialize_stage.py b/tests/benchmarks/test_bench_serialize_stage.py index 5fe018713f..ab4e060dbb 100644 --- a/tests/benchmarks/test_bench_serialize_stage.py +++ b/tests/benchmarks/test_bench_serialize_stage.py @@ -27,7 +27,7 @@ from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils.logging import configure_logging +from morpheus.utils.logger import configure_logging def build_and_run_pipeline(config: Config, diff --git a/tests/tests_data/filter_probs.jsonlines b/tests/tests_data/filter_probs.jsonlines new file mode 100644 index 0000000000..0292b14e5f --- /dev/null +++ b/tests/tests_data/filter_probs.jsonlines @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:861c23f69585ef22e2db7273d3c2fe4ba2823802d004f69e47169fc69d0c2cae +size 760