Skip to content

Commit

Permalink
Manual testing of Morpheus with Kafka & Validation improvements (nv-m…
Browse files Browse the repository at this point in the history
…orpheus#290)

* instructions for manually testing of Morpheus using Kafka. Adds a Kafka version for each of the four validation scripts in `scripts/validation`
* csv & json serializers now support an `include_index_col` flag to control exporting the Dataframe's index column. Note due to a limitation of cudf & pandas this has no impact on JSON:
  + pandas-dev/pandas#37600 
  + rapidsai/cudf#11317
* `morpheus.utils.logging` renamed to `morpheus.utils.logger` so that other modules in `morpheus.utils` can import the standard lib logging module.
* Comparison logic in the `ValidationStage` has been moved to it's own module `morpheus.utils.compare_df` so that the functionality can be used outside of the stage.


fixes nv-morpheus#265

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Pete MacKinnon (https://github.com/pdmack)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#290
  • Loading branch information
dagardner-nv authored Aug 8, 2022
1 parent a47cc65 commit bb200ab
Show file tree
Hide file tree
Showing 24 changed files with 806 additions and 143 deletions.
32 changes: 21 additions & 11 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,24 @@ Launching a full production Kafka cluster is outside the scope of this project.
```bash
export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway')
```
5. Update the `kafka-docker/docker-compose.yml` so the environment variable `KAFKA_ADVERTISED_HOST_NAME` matches the previous step. For example, the line should look like:
5. Update the `kafka-docker/docker-compose.yml`, performing two changes:
1. Update the `ports` entry to:
```yml
ports:
- "0.0.0.0::9092"
```
This will prevent the containers from attempting to map IPv6 ports.
1. Change the value of `KAFKA_ADVERTISED_HOST_NAME` to match the value of the `KAFKA_ADVERTISED_HOST_NAME` environment variable from the previous step. For example, the line should look like:

```yml
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
```
Which should match the value of `$KAFKA_ADVERTISED_HOST_NAME` from the previous step:
```yml
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
```
Which should match the value of `$KAFKA_ADVERTISED_HOST_NAME` from the previous step:

```bash
$ echo $KAFKA_ADVERTISED_HOST_NAME
"172.17.0.1"
```bash
$ echo $KAFKA_ADVERTISED_HOST_NAME
"172.17.0.1"
```
6. Launch kafka with 3 instances:

Expand All @@ -252,11 +259,14 @@ Launching a full production Kafka cluster is outside the scope of this project.
In practice, 3 instances has been shown to work well. Use as many instances as required. Keep in mind each instance takes about 1 Gb of memory.
7. Launch the Kafka shell
1. To configure the cluster, you will need to launch into a container that has the Kafka shell.
2. You can do this with `./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME`.
2. You can do this with:
```bash
./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME
```
3. However, this makes it difficult to load data into the cluster. Instead, you can manually launch the Kafka shell by running:
```bash
# Change to the morpheus root to make it easier for mounting volumes
cd ${MORPHEUS_HOME}
cd ${MORPHEUS_ROOT}
# Run the Kafka shell docker container
docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \
Expand Down
4 changes: 2 additions & 2 deletions docs/source/developer_guide/guides/1_simple_python_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging

from pass_thru import PassThruStage
```
Expand Down Expand Up @@ -185,7 +185,7 @@ from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging

from pass_thru import PassThruStage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ from morpheus.stages.postprocess.filter_detections_stage import FilterDetections
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging

from recipient_features_stage import RecipientFeaturesStage

Expand Down
2 changes: 1 addition & 1 deletion examples/abp_pcap_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging


@click.command()
Expand Down
2 changes: 1 addition & 1 deletion examples/gnn_fraud_detection_pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging
from stages.classification_stage import ClassificationStage
from stages.graph_construction_stage import FraudGraphConstructionStage
from stages.graph_sage_stage import GraphSAGEStage
Expand Down
2 changes: 1 addition & 1 deletion examples/ransomware_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.add_scores_stage import AddScoresStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging
from stages.create_features import CreateFeaturesRWStage
from stages.preprocessing import PreprocessingRWStage

Expand Down
10 changes: 6 additions & 4 deletions morpheus/_lib/include/morpheus/io/serializers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

namespace morpheus {

std::string df_to_csv(const TableInfo& tbl, bool include_header);
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col = true);

void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header);
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col = true);

std::string df_to_json(const TableInfo& tbl);
// Note the include_index_col is currently being ignored in both versions of `df_to_json` due to a known issue in
// Pandas: https://github.com/pandas-dev/pandas/issues/37600
std::string df_to_json(const TableInfo& tbl, bool include_index_col = true);

void df_to_json(const TableInfo& tbl, std::ostream& out_stream);
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col = true);

} // namespace morpheus
7 changes: 5 additions & 2 deletions morpheus/_lib/include/morpheus/stages/write_to_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class WriteToFileStage : public srf::pysrf::PythonNode<std::shared_ptr<MessageMe
*/
WriteToFileStage(const std::string &filename,
std::ios::openmode mode = std::ios::out,
FileTypes file_type = FileTypes::Auto);
FileTypes file_type = FileTypes::Auto,
bool include_index_col = true);

private:
/**
Expand All @@ -64,6 +65,7 @@ class WriteToFileStage : public srf::pysrf::PythonNode<std::shared_ptr<MessageMe
subscribe_fn_t build_operator();

bool m_is_first;
bool m_include_index_col;
std::ofstream m_fstream;
std::function<void(sink_type_t &)> m_write_func;
};
Expand All @@ -81,7 +83,8 @@ struct WriteToFileStageInterfaceProxy
const std::string &name,
const std::string &filename,
const std::string &mode = "w",
FileTypes file_type = FileTypes::Auto);
FileTypes file_type = FileTypes::Auto,
bool include_index_col = true);
};

#pragma GCC visibility pop
Expand Down
34 changes: 24 additions & 10 deletions morpheus/_lib/src/io/serializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

#include <cudf/io/csv.hpp>
#include <cudf/io/data_sink.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <pybind11/gil.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <rmm/mr/device/per_device_resource.hpp>

#include <memory>
#include <numeric>
#include <ostream>
#include <sstream>

Expand Down Expand Up @@ -75,38 +79,48 @@ class OStreamSink : public cudf::io::data_sink
size_t m_bytest_written{0};
};

std::string df_to_csv(const TableInfo& tbl, bool include_header)
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col)
{
// Create an ostringstream and use that with the overload accepting an ostream
std::ostringstream out_stream;

df_to_csv(tbl, out_stream, include_header);
df_to_csv(tbl, out_stream, include_header, include_index_col);

return out_stream.str();
}

void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header)
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col)
{
auto column_names = tbl.get_column_names();
cudf::size_type start_col = 1;
if (include_index_col)
{
start_col = 0;
column_names.insert(column_names.begin(), ""s); // insert the id column
}

std::vector<cudf::size_type> col_idexes(column_names.size());
std::iota(col_idexes.begin(), col_idexes.end(), start_col);
auto tbl_view = tbl.get_view().select(col_idexes);

OStreamSink sink(out_stream);
auto destination = cudf::io::sink_info(&sink);
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl.get_view())
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view)
.include_header(include_header)
.true_value("True"s)
.false_value("False"s);

cudf::io::table_metadata metadata{};
if (include_header)
{
auto column_names = tbl.get_column_names();
column_names.insert(column_names.begin(), ""s); // insert the id column
metadata.column_names = column_names;
options_builder = options_builder.metadata(&metadata);
}

cudf::io::write_csv(options_builder.build(), rmm::mr::get_current_device_resource());
}

std::string df_to_json(const TableInfo& tbl)
std::string df_to_json(const TableInfo& tbl, bool include_index_col)
{
std::string results;
// no cpp impl for to_json, instead python module converts to pandas and calls to_json
Expand All @@ -116,7 +130,7 @@ std::string df_to_json(const TableInfo& tbl)

auto df = tbl.as_py_object();
auto buffer = StringIO();
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true);
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true, "index"_a = include_index_col);
df.attr("to_json")(buffer, **kwargs);
buffer.attr("seek")(0);

Expand All @@ -127,11 +141,11 @@ std::string df_to_json(const TableInfo& tbl)
return results;
}

void df_to_json(const TableInfo& tbl, std::ostream& out_stream)
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col)
{
// Unlike df_to_csv, we use the ostream overload to call the string overload because there is no C++ implementation
// of to_json
std::string output = df_to_json(tbl);
std::string output = df_to_json(tbl, include_index_col);

// Now write the contents to the stream
out_stream.write(output.data(), output.size());
Expand Down
5 changes: 3 additions & 2 deletions morpheus/_lib/src/python_modules/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ PYBIND11_MODULE(stages, m)
py::arg("builder"),
py::arg("name"),
py::arg("filename"),
py::arg("mode") = "w",
py::arg("file_type") = 0); // Setting this to FileTypes::AUTO throws a conversion error at runtime
py::arg("mode") = "w",
py::arg("file_type") = 0, // Setting this to FileTypes::AUTO throws a conversion error at runtime
py::arg("include_index_col") = true);

#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
Expand Down
17 changes: 11 additions & 6 deletions morpheus/_lib/src/stages/write_to_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
namespace morpheus {
// Component public implementations
// ************ WriteToFileStage **************************** //
WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmode mode, FileTypes file_type) :
WriteToFileStage::WriteToFileStage(const std::string &filename,
std::ios::openmode mode,
FileTypes file_type,
bool include_index_col) :
PythonNode(base_t::op_factory_from_sub_fn(build_operator())),
m_is_first(true)
m_is_first(true),
m_include_index_col(include_index_col)
{
if (file_type == FileTypes::Auto)
{
Expand Down Expand Up @@ -59,13 +63,13 @@ WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmo
void WriteToFileStage::write_json(WriteToFileStage::sink_type_t &msg)
{
// Call df_to_json passing our fstream
df_to_json(msg->get_info(), m_fstream);
df_to_json(msg->get_info(), m_fstream, m_include_index_col);
}

void WriteToFileStage::write_csv(WriteToFileStage::sink_type_t &msg)
{
// Call df_to_csv passing our fstream
df_to_csv(msg->get_info(), m_fstream, m_is_first);
df_to_csv(msg->get_info(), m_fstream, m_is_first, m_include_index_col);
}

void WriteToFileStage::close()
Expand Down Expand Up @@ -102,7 +106,8 @@ std::shared_ptr<srf::segment::Object<WriteToFileStage>> WriteToFileStageInterfac
const std::string &name,
const std::string &filename,
const std::string &mode,
FileTypes file_type)
FileTypes file_type,
bool include_index_col)
{
std::ios::openmode fsmode = std::ios::out;

Expand Down Expand Up @@ -138,7 +143,7 @@ std::shared_ptr<srf::segment::Object<WriteToFileStage>> WriteToFileStageInterfac
throw std::runtime_error(std::string("Unsupported file mode. Must choose either 'w' or 'a'. Mode: ") + mode);
}

auto stage = builder.construct_object<WriteToFileStage>(name, filename, fsmode, file_type);
auto stage = builder.construct_object<WriteToFileStage>(name, filename, fsmode, file_type, include_index_col);

return stage;
}
Expand Down
8 changes: 7 additions & 1 deletion morpheus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.config import auto_determine_bootstrap
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging

# pylint: disable=line-too-long, import-outside-toplevel, invalid-name, global-at-module-level, unused-argument

Expand Down Expand Up @@ -1325,6 +1325,12 @@ def validate(ctx: click.Context, **kwargs):
@click.command(short_help="Write all messages to a file", **command_kwargs)
@click.option('--filename', type=click.Path(writable=True), required=True, help="The file to write to")
@click.option('--overwrite', is_flag=True, help="Whether or not to overwrite the target file")
@click.option('--include-index-col',
'include_index_col',
default=True,
type=bool,
help=("Includes dataframe's index column in the output "
"Note: this currently only works for CSV file output"))
@prepare_command()
def to_file(ctx: click.Context, **kwargs):

Expand Down
18 changes: 13 additions & 5 deletions morpheus/io/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import cudf


def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> typing.List[str]:
def df_to_csv(df: cudf.DataFrame,
include_header=False,
strip_newline=False,
include_index_col=True) -> typing.List[str]:
"""
Serializes a DataFrame into CSV and returns the serialized output seperated by lines.
Expand All @@ -31,13 +34,15 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) ->
Whether or not to include the header, by default False.
strip_newline : bool, optional
Whether or not to strip the newline characters from each string, by default False.
include_index_col: bool, optional
Write out the index as a column, by default True.
Returns
-------
typing.List[str]
List of strings for each line
"""
results = df.to_csv(header=include_header)
results = df.to_csv(header=include_header, index=include_index_col)
if strip_newline:
results = results.split("\n")
else:
Expand All @@ -46,7 +51,7 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) ->
return results


def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
def df_to_json(df: cudf.DataFrame, strip_newlines=False, include_index_col=True) -> typing.List[str]:
"""
Serializes a DataFrame into JSON and returns the serialized output seperated by lines.
Expand All @@ -56,7 +61,10 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
Input DataFrame to serialize.
strip_newline : bool, optional
Whether or not to strip the newline characters from each string, by default False.
include_index_col: bool, optional
Write out the index as a column, by default True.
Note: This value is currently being ignored due to a known issue in Pandas:
https://github.com/pandas-dev/pandas/issues/37600
Returns
-------
typing.List[str]
Expand All @@ -65,7 +73,7 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
str_buf = StringIO()

# Convert to list of json string objects
df.to_json(str_buf, orient="records", lines=True)
df.to_json(str_buf, orient="records", lines=True, index=include_index_col)

# Start from beginning
str_buf.seek(0)
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/general/buffer_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from morpheus.config import Config
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.logging import deprecated_stage_warning
from morpheus.utils.logger import deprecated_stage_warning

logger = logging.getLogger(__name__)

Expand Down
Loading

0 comments on commit bb200ab

Please sign in to comment.