Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual testing of Morpheus with Kafka & Validation improvements #290

Merged
37 commits merged into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5763a82
Simple copy test
dagardner-nv Jul 19, 2022
2b47e46
Add instructions for partitioned test
dagardner-nv Jul 19, 2022
bf16140
Add Kafka ABP validation instructions
dagardner-nv Jul 19, 2022
81d4fd8
Test data
dagardner-nv Jul 21, 2022
8b4554c
Add a flag to optionally include or exclude the cudf ID column from C…
dagardner-nv Jul 21, 2022
91636af
Replace usage of older MORPHEUS_HOME env var with MORPHEUS_ROOT
dagardner-nv Jul 21, 2022
55f06c7
Update copy test using Kafka's console producer and consumer
dagardner-nv Jul 21, 2022
abc4864
Update partitioned testing:
dagardner-nv Jul 21, 2022
fafa8b7
Refactor ABP pipeline test
dagardner-nv Jul 21, 2022
abb3343
Compare with sorted keys
dagardner-nv Jul 21, 2022
ee9f2b1
wip
dagardner-nv Jul 21, 2022
d5dfb91
Skip python test, known bug
dagardner-nv Jul 22, 2022
ee6c403
Move comparison logic from validation stage to be usable outside of a…
dagardner-nv Jul 22, 2022
562e510
Rename morpheus.utils.logging to morpheus.utils.logger allowing other…
dagardner-nv Jul 22, 2022
69d881b
Make compare_df runnable from the command line
dagardner-nv Jul 22, 2022
ff913d1
Fix bug in compare_df
dagardner-nv Jul 22, 2022
521cb87
When rows don't match but are within the tolerances comparison.matche…
dagardner-nv Jul 22, 2022
d75b5d0
Add hammah user123 to test
dagardner-nv Jul 22, 2022
8a70db2
Add phishing pipeline to kafka test
dagardner-nv Jul 22, 2022
77a1f85
Add sid validation to kafka testing
dagardner-nv Jul 22, 2022
d914ddd
Split bash commands
dagardner-nv Jul 22, 2022
a79615a
Split bash commands
dagardner-nv Jul 23, 2022
d8d4027
wip
dagardner-nv Jul 23, 2022
f6fc77b
wip
dagardner-nv Jul 23, 2022
eab0051
wip
dagardner-nv Jul 23, 2022
ae5e089
wip
dagardner-nv Jul 23, 2022
eb407b7
wip
dagardner-nv Jul 23, 2022
ebcdda7
wip
dagardner-nv Jul 23, 2022
f0c0fc8
Script no longer needed
dagardner-nv Jul 23, 2022
8f0da01
Fix includes
dagardner-nv Jul 25, 2022
3e6d2bf
Fix imports
dagardner-nv Jul 25, 2022
3a6aaf9
Update import paths
dagardner-nv Jul 25, 2022
fbeb52b
Add instructions for setting ports to prevent forwarding of ip6 ports…
dagardner-nv Jul 26, 2022
f81e3a3
Merge branch 'branch-22.08' into david-test-kafka
dagardner-nv Aug 1, 2022
a279f2b
Move main and parse_arge to a stand-alone script
dagardner-nv Aug 1, 2022
000f860
Fix logger name
dagardner-nv Aug 1, 2022
3bda99d
Update instructions to use new compare_data_files.py script
dagardner-nv Aug 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,24 @@ Launching a full production Kafka cluster is outside the scope of this project.
```bash
export KAFKA_ADVERTISED_HOST_NAME=$(docker network inspect bridge | jq -r '.[0].IPAM.Config[0].Gateway')
```
5. Update the `kafka-docker/docker-compose.yml` so the environment variable `KAFKA_ADVERTISED_HOST_NAME` matches the previous step. For example, the line should look like:
5. Update the `kafka-docker/docker-compose.yml`, performing two changes:
1. Update the `ports` entry to:
```yml
ports:
- "0.0.0.0::9092"
```
This will prevent the containers from attempting to map IPv6 ports.
1. Change the value of `KAFKA_ADVERTISED_HOST_NAME` to match the value of the `KAFKA_ADVERTISED_HOST_NAME` environment variable from the previous step. For example, the line should look like:

```yml
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
```
Which should match the value of `$KAFKA_ADVERTISED_HOST_NAME` from the previous step:
```yml
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
```
Which should match the value of `$KAFKA_ADVERTISED_HOST_NAME` from the previous step:

```bash
$ echo $KAFKA_ADVERTISED_HOST_NAME
"172.17.0.1"
```bash
$ echo $KAFKA_ADVERTISED_HOST_NAME
"172.17.0.1"
```
6. Launch kafka with 3 instances:

Expand All @@ -252,11 +259,14 @@ Launching a full production Kafka cluster is outside the scope of this project.
In practice, 3 instances has been shown to work well. Use as many instances as required. Keep in mind each instance takes about 1 Gb of memory.
7. Launch the Kafka shell
1. To configure the cluster, you will need to launch into a container that has the Kafka shell.
2. You can do this with `./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME`.
2. You can do this with:
```bash
./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME
```
3. However, this makes it difficult to load data into the cluster. Instead, you can manually launch the Kafka shell by running:
```bash
# Change to the morpheus root to make it easier for mounting volumes
cd ${MORPHEUS_HOME}
cd ${MORPHEUS_ROOT}

# Run the Kafka shell docker container
docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \
Expand Down
4 changes: 2 additions & 2 deletions docs/source/developer_guide/guides/1_simple_python_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging

from pass_thru import PassThruStage
```
Expand Down Expand Up @@ -185,7 +185,7 @@ from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging

from pass_thru import PassThruStage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ from morpheus.stages.postprocess.filter_detections_stage import FilterDetections
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging

from recipient_features_stage import RecipientFeaturesStage

Expand Down
2 changes: 1 addition & 1 deletion examples/abp_pcap_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging


@click.command()
Expand Down
2 changes: 1 addition & 1 deletion examples/gnn_fraud_detection_pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging
from stages.classification_stage import ClassificationStage
from stages.graph_construction_stage import FraudGraphConstructionStage
from stages.graph_sage_stage import GraphSAGEStage
Expand Down
2 changes: 1 addition & 1 deletion examples/ransomware_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.add_scores_stage import AddScoresStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging
from stages.create_features import CreateFeaturesRWStage
from stages.preprocessing import PreprocessingRWStage

Expand Down
10 changes: 6 additions & 4 deletions morpheus/_lib/include/morpheus/io/serializers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

namespace morpheus {

std::string df_to_csv(const TableInfo& tbl, bool include_header);
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col = true);

void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header);
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col = true);

std::string df_to_json(const TableInfo& tbl);
// Note the include_index_col is currently being ignored in both versions of `df_to_json` due to a known issue in
// Pandas: https://github.com/pandas-dev/pandas/issues/37600
std::string df_to_json(const TableInfo& tbl, bool include_index_col = true);

void df_to_json(const TableInfo& tbl, std::ostream& out_stream);
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col = true);

} // namespace morpheus
7 changes: 5 additions & 2 deletions morpheus/_lib/include/morpheus/stages/write_to_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class WriteToFileStage : public srf::pysrf::PythonNode<std::shared_ptr<MessageMe
*/
WriteToFileStage(const std::string &filename,
std::ios::openmode mode = std::ios::out,
FileTypes file_type = FileTypes::Auto);
FileTypes file_type = FileTypes::Auto,
bool include_index_col = true);

private:
/**
Expand All @@ -64,6 +65,7 @@ class WriteToFileStage : public srf::pysrf::PythonNode<std::shared_ptr<MessageMe
subscribe_fn_t build_operator();

bool m_is_first;
bool m_include_index_col;
std::ofstream m_fstream;
std::function<void(sink_type_t &)> m_write_func;
};
Expand All @@ -81,7 +83,8 @@ struct WriteToFileStageInterfaceProxy
const std::string &name,
const std::string &filename,
const std::string &mode = "w",
FileTypes file_type = FileTypes::Auto);
FileTypes file_type = FileTypes::Auto,
bool include_index_col = true);
};

#pragma GCC visibility pop
Expand Down
34 changes: 24 additions & 10 deletions morpheus/_lib/src/io/serializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

#include <cudf/io/csv.hpp>
#include <cudf/io/data_sink.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <pybind11/gil.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <rmm/mr/device/per_device_resource.hpp>

#include <memory>
#include <numeric>
#include <ostream>
#include <sstream>

Expand Down Expand Up @@ -75,38 +79,48 @@ class OStreamSink : public cudf::io::data_sink
size_t m_bytest_written{0};
};

std::string df_to_csv(const TableInfo& tbl, bool include_header)
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col)
{
// Create an ostringstream and use that with the overload accepting an ostream
std::ostringstream out_stream;

df_to_csv(tbl, out_stream, include_header);
df_to_csv(tbl, out_stream, include_header, include_index_col);

return out_stream.str();
}

void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header)
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col)
{
auto column_names = tbl.get_column_names();
cudf::size_type start_col = 1;
if (include_index_col)
{
start_col = 0;
column_names.insert(column_names.begin(), ""s); // insert the id column
}

std::vector<cudf::size_type> col_idexes(column_names.size());
std::iota(col_idexes.begin(), col_idexes.end(), start_col);
auto tbl_view = tbl.get_view().select(col_idexes);

OStreamSink sink(out_stream);
auto destination = cudf::io::sink_info(&sink);
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl.get_view())
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view)
.include_header(include_header)
.true_value("True"s)
.false_value("False"s);

cudf::io::table_metadata metadata{};
if (include_header)
{
auto column_names = tbl.get_column_names();
column_names.insert(column_names.begin(), ""s); // insert the id column
metadata.column_names = column_names;
options_builder = options_builder.metadata(&metadata);
}

cudf::io::write_csv(options_builder.build(), rmm::mr::get_current_device_resource());
}

std::string df_to_json(const TableInfo& tbl)
std::string df_to_json(const TableInfo& tbl, bool include_index_col)
{
std::string results;
// no cpp impl for to_json, instead python module converts to pandas and calls to_json
Expand All @@ -116,7 +130,7 @@ std::string df_to_json(const TableInfo& tbl)

auto df = tbl.as_py_object();
auto buffer = StringIO();
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true);
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true, "index"_a = include_index_col);
df.attr("to_json")(buffer, **kwargs);
buffer.attr("seek")(0);

Expand All @@ -127,11 +141,11 @@ std::string df_to_json(const TableInfo& tbl)
return results;
}

void df_to_json(const TableInfo& tbl, std::ostream& out_stream)
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col)
{
// Unlike df_to_csv, we use the ostream overload to call the string overload because there is no C++ implementation
// of to_json
std::string output = df_to_json(tbl);
std::string output = df_to_json(tbl, include_index_col);

// Now write the contents to the stream
out_stream.write(output.data(), output.size());
Expand Down
5 changes: 3 additions & 2 deletions morpheus/_lib/src/python_modules/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ PYBIND11_MODULE(stages, m)
py::arg("builder"),
py::arg("name"),
py::arg("filename"),
py::arg("mode") = "w",
py::arg("file_type") = 0); // Setting this to FileTypes::AUTO throws a conversion error at runtime
py::arg("mode") = "w",
py::arg("file_type") = 0, // Setting this to FileTypes::AUTO throws a conversion error at runtime
py::arg("include_index_col") = true);

#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
Expand Down
17 changes: 11 additions & 6 deletions morpheus/_lib/src/stages/write_to_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
namespace morpheus {
// Component public implementations
// ************ WriteToFileStage **************************** //
WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmode mode, FileTypes file_type) :
WriteToFileStage::WriteToFileStage(const std::string &filename,
std::ios::openmode mode,
FileTypes file_type,
bool include_index_col) :
PythonNode(base_t::op_factory_from_sub_fn(build_operator())),
m_is_first(true)
m_is_first(true),
m_include_index_col(include_index_col)
{
if (file_type == FileTypes::Auto)
{
Expand Down Expand Up @@ -59,13 +63,13 @@ WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmo
void WriteToFileStage::write_json(WriteToFileStage::sink_type_t &msg)
{
// Call df_to_json passing our fstream
df_to_json(msg->get_info(), m_fstream);
df_to_json(msg->get_info(), m_fstream, m_include_index_col);
}

void WriteToFileStage::write_csv(WriteToFileStage::sink_type_t &msg)
{
// Call df_to_csv passing our fstream
df_to_csv(msg->get_info(), m_fstream, m_is_first);
df_to_csv(msg->get_info(), m_fstream, m_is_first, m_include_index_col);
}

void WriteToFileStage::close()
Expand Down Expand Up @@ -102,7 +106,8 @@ std::shared_ptr<srf::segment::Object<WriteToFileStage>> WriteToFileStageInterfac
const std::string &name,
const std::string &filename,
const std::string &mode,
FileTypes file_type)
FileTypes file_type,
bool include_index_col)
{
std::ios::openmode fsmode = std::ios::out;

Expand Down Expand Up @@ -138,7 +143,7 @@ std::shared_ptr<srf::segment::Object<WriteToFileStage>> WriteToFileStageInterfac
throw std::runtime_error(std::string("Unsupported file mode. Must choose either 'w' or 'a'. Mode: ") + mode);
}

auto stage = builder.construct_object<WriteToFileStage>(name, filename, fsmode, file_type);
auto stage = builder.construct_object<WriteToFileStage>(name, filename, fsmode, file_type, include_index_col);

return stage;
}
Expand Down
8 changes: 7 additions & 1 deletion morpheus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.config import auto_determine_bootstrap
from morpheus.utils.logging import configure_logging
from morpheus.utils.logger import configure_logging

# pylint: disable=line-too-long, import-outside-toplevel, invalid-name, global-at-module-level, unused-argument

Expand Down Expand Up @@ -1321,6 +1321,12 @@ def validate(ctx: click.Context, **kwargs):
@click.command(short_help="Write all messages to a file", **command_kwargs)
@click.option('--filename', type=click.Path(writable=True), required=True, help="The file to write to")
@click.option('--overwrite', is_flag=True, help="Whether or not to overwrite the target file")
@click.option('--include-index-col',
'include_index_col',
default=True,
type=bool,
help=("Includes dataframe's index column in the output "
"Note: this currently only works for CSV file output"))
@prepare_command()
def to_file(ctx: click.Context, **kwargs):

Expand Down
18 changes: 13 additions & 5 deletions morpheus/io/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import cudf


def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> typing.List[str]:
def df_to_csv(df: cudf.DataFrame,
include_header=False,
strip_newline=False,
include_index_col=True) -> typing.List[str]:
"""
Serializes a DataFrame into CSV and returns the serialized output seperated by lines.

Expand All @@ -31,13 +34,15 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) ->
Whether or not to include the header, by default False.
strip_newline : bool, optional
Whether or not to strip the newline characters from each string, by default False.
include_index_col: bool, optional
Write out the index as a column, by default True.

Returns
-------
typing.List[str]
List of strings for each line
"""
results = df.to_csv(header=include_header)
results = df.to_csv(header=include_header, index=include_index_col)
if strip_newline:
results = results.split("\n")
else:
Expand All @@ -46,7 +51,7 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) ->
return results


def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
def df_to_json(df: cudf.DataFrame, strip_newlines=False, include_index_col=True) -> typing.List[str]:
"""
Serializes a DataFrame into JSON and returns the serialized output seperated by lines.

Expand All @@ -56,7 +61,10 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
Input DataFrame to serialize.
strip_newline : bool, optional
Whether or not to strip the newline characters from each string, by default False.

include_index_col: bool, optional
Write out the index as a column, by default True.
Note: This value is currently being ignored due to a known issue in Pandas:
https://github.com/pandas-dev/pandas/issues/37600
Returns
-------
typing.List[str]
Expand All @@ -65,7 +73,7 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
str_buf = StringIO()

# Convert to list of json string objects
df.to_json(str_buf, orient="records", lines=True)
df.to_json(str_buf, orient="records", lines=True, index=include_index_col)

# Start from beginning
str_buf.seek(0)
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/general/buffer_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from morpheus.config import Config
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.logging import deprecated_stage_warning
from morpheus.utils.logger import deprecated_stage_warning

logger = logging.getLogger(__name__)

Expand Down
Loading