Skip to content

Commit

Permalink
Table locking & column preallocation (#586)
Browse files Browse the repository at this point in the history
* Builds on changes in #427
* Adds a `PreallocatorMixin` which when added to a stage performs pre-allocation. This should be added to the first stage in a pipeline which emits a DataFrame or MessageMeta in a pipeline.
* Morpheus' TypeId enum exposed to the Python API, allowing stages to define types for columns needing pre-allocation
* `MutableTableInfo` exposed to Python via a context manager to be used in `with` blocks
* `type_util` (`Dtype`) and `type_util_detail` (`DataType`) merged into a new compilation unit `dtype` fixes #490 


fixes #456

Authors:
  - David Gardner (https://github.com/dagardner-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #586
  • Loading branch information
dagardner-nv authored Feb 6, 2023
1 parent ebcfcc1 commit 778eba2
Show file tree
Hide file tree
Showing 85 changed files with 2,159 additions and 1,023 deletions.
1 change: 1 addition & 0 deletions ci/scripts/copyright.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,4 +462,5 @@ def checkCopyright_main():
}

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sys.exit(checkCopyright_main())
180 changes: 133 additions & 47 deletions docs/source/developer_guide/guides/2_real_world_phishing.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/source/developer_guide/guides/4_source_cpp_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ RabbitMQSourceStage(const std::string& host,
const std::string& exchange_type = "fanout"s,
const std::string& queue_name = ""s,
std::chrono::milliseconds poll_interval = 100ms);
~RabbitMQSourceStage() override = default;
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import mrc

from morpheus._lib.common import TypeId
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
Expand Down Expand Up @@ -48,6 +49,17 @@ def __init__(self, config: Config, sep_token: str = '[SEP]'):
else:
raise ValueError("sep_token cannot be an empty string")

# This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed,
# ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any
# stage defining this attribute.
self._needed_columns.update({
'to_count': TypeId.INT32,
'bcc_count': TypeId.INT32,
'cc_count': TypeId.INT32,
'total_recipients': TypeId.INT32,
'data': TypeId.STRING
})

@property
def name(self) -> str:
return "recipient-features"
Expand All @@ -59,18 +71,17 @@ def supports_cpp_node(self) -> bool:
return False

def on_data(self, message: MessageMeta) -> MessageMeta:
# Get the DataFrame from the incoming message
df = message.df

df['to_count'] = df['To'].str.count('@')
df['bcc_count'] = df['BCC'].str.count('@')
df['cc_count'] = df['CC'].str.count('@')
df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count']

# Attach features to string data
df['data'] = (df['to_count'].astype(str) + '[SEP]' + df['bcc_count'].astype(str) + '[SEP]' +
df['cc_count'].astype(str) + '[SEP]' + df['total_recipients'].astype(str) + '[SEP]' +
df['Message'])
# Open the DataFrame from the incoming message for in-place modification
with message.mutable_dataframe() as df:
df['to_count'] = df['To'].str.count('@')
df['bcc_count'] = df['BCC'].str.count('@')
df['cc_count'] = df['CC'].str.count('@')
df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count']

# Attach features to string data
df['data'] = (df['to_count'].astype(str) + self._sep_token + df['bcc_count'].astype(str) + self._sep_token +
df['cc_count'].astype(str) + self._sep_token + df['total_recipients'].astype(str) +
self._sep_token + df['Message'])

# Return the message for the next stage
return message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger(__name__)


@register_stage("from-rabbitmq")
class RabbitMQSourceStage(SingleOutputSource):
class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource):
"""
Source stage used to load messages from a RabbitMQ queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger(__name__)


@register_stage("from-rabbitmq")
class RabbitMQSourceStage(SingleOutputSource):
class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource):
"""
Source stage used to load messages from a RabbitMQ queue.
Expand Down
2 changes: 2 additions & 0 deletions examples/digital_fingerprinting/production/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ services:
dockerfile: ./Dockerfile
target: jupyter
args:
- MORPHEUS_CONTAINER=${MORPHEUS_CONTAINER:-nvcr.io/nvidia/morpheus/morpheus}
- MORPHEUS_CONTAINER_VERSION=${MORPHEUS_CONTAINER_VERSION:-v23.01.00-runtime}
deploy:
resources:
Expand Down Expand Up @@ -71,6 +72,7 @@ services:
dockerfile: ./Dockerfile
target: runtime
args:
- MORPHEUS_CONTAINER=${MORPHEUS_CONTAINER:-nvcr.io/nvidia/morpheus/morpheus}
- MORPHEUS_CONTAINER_VERSION=${MORPHEUS_CONTAINER_VERSION:-v23.01.00-runtime}
image: dfp_morpheus
container_name: morpheus_pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import logging
import typing

import pandas as pd

from morpheus.messages.message_meta import MessageMeta
from morpheus.messages.multi_message import MultiMessage

logger = logging.getLogger(__name__)


@dataclasses.dataclass
@dataclasses.dataclass(init=False)
class DFPMessageMeta(MessageMeta, cpp_class=None):
"""
This class extends MessageMeta to also hold userid corresponding to batched metadata.
Expand All @@ -37,11 +39,15 @@ class DFPMessageMeta(MessageMeta, cpp_class=None):
"""
user_id: str

def __init__(self, df: pd.DataFrame, user_id: str) -> None:
super().__init__(df)
self.user_id = user_id

def get_df(self):
return self.df

def set_df(self, df):
self.df = df
self._df = df


@dataclasses.dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from morpheus._lib.common import FileTypes
from morpheus.config import Config
from morpheus.io.deserializers import read_file_to_df
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.column_info import DataFrameInputSchema
Expand Down Expand Up @@ -78,7 +79,7 @@ def _single_object_to_dataframe(file_object: fsspec.core.OpenFile,
return s3_df


class DFPFileToDataFrameStage(SinglePortStage):
class DFPFileToDataFrameStage(PreallocatorMixin, SinglePortStage):

def __init__(self,
c: Config,
Expand Down
3 changes: 2 additions & 1 deletion examples/sid_visualization/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.general.monitor_stage import MonitorStage
Expand All @@ -39,7 +40,7 @@
from morpheus.utils.logger import configure_logging


class NLPVizFileSource(SingleOutputSource):
class NLPVizFileSource(PreallocatorMixin, SingleOutputSource):
"""
Source stage is used to load messages from a file and dumping the contents into the pipeline immediately. Useful for
testing performance and accuracy of a pipeline.
Expand Down
6 changes: 3 additions & 3 deletions morpheus/_lib/cmake/libraries/cuda_utils.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ find_package(pybind11 REQUIRED)
# Place the two cuda sources in their own target and disable IWYU for that target.
add_library(cuda_utils_objs
OBJECT
${MORPHEUS_LIB_ROOT}/src/utilities/matx_util.cu
${MORPHEUS_LIB_ROOT}/src/utilities/type_util.cu
${MORPHEUS_LIB_ROOT}/src/utilities/matx_util.cu
)

set_target_properties(
Expand Down Expand Up @@ -51,11 +50,12 @@ target_link_libraries(cuda_utils_objs
add_library(cuda_utils
SHARED
$<TARGET_OBJECTS:cuda_utils_objs>
${MORPHEUS_LIB_ROOT}/src/objects/data_table.cpp
${MORPHEUS_LIB_ROOT}/src/objects/dev_mem_info.cpp
${MORPHEUS_LIB_ROOT}/src/objects/dtype.cpp
${MORPHEUS_LIB_ROOT}/src/objects/table_info.cpp
${MORPHEUS_LIB_ROOT}/src/objects/tensor_object.cpp
${MORPHEUS_LIB_ROOT}/src/utilities/tensor_util.cpp
${MORPHEUS_LIB_ROOT}/src/utilities/type_util_detail.cpp
)

target_include_directories(cuda_utils
Expand Down
2 changes: 2 additions & 0 deletions morpheus/_lib/cmake/libraries/morpheus.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ add_library(morpheus
${MORPHEUS_LIB_ROOT}/src/messages/multi_tensor.cpp
${MORPHEUS_LIB_ROOT}/src/objects/fiber_queue.cpp
${MORPHEUS_LIB_ROOT}/src/objects/file_types.cpp
${MORPHEUS_LIB_ROOT}/src/objects/mutable_table_ctx_mgr.cpp
${MORPHEUS_LIB_ROOT}/src/objects/wrapped_tensor.cpp
${MORPHEUS_LIB_ROOT}/src/objects/python_data_table.cpp
${MORPHEUS_LIB_ROOT}/src/objects/rmm_tensor.cpp
Expand All @@ -44,6 +45,7 @@ add_library(morpheus
${MORPHEUS_LIB_ROOT}/src/stages/file_source.cpp
${MORPHEUS_LIB_ROOT}/src/stages/filter_detection.cpp
${MORPHEUS_LIB_ROOT}/src/stages/kafka_source.cpp
${MORPHEUS_LIB_ROOT}/src/stages/preallocate.cpp
${MORPHEUS_LIB_ROOT}/src/stages/preprocess_fil.cpp
${MORPHEUS_LIB_ROOT}/src/stages/preprocess_nlp.cpp
${MORPHEUS_LIB_ROOT}/src/stages/serialize.cpp
Expand Down
34 changes: 0 additions & 34 deletions morpheus/_lib/cmake/python_modules/file_types.cmake

This file was deleted.

37 changes: 10 additions & 27 deletions morpheus/_lib/cudf_helpers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,17 @@ from cudf._lib.utils cimport table_view_from_table

cdef extern from "morpheus/objects/table_info.hpp" namespace "morpheus" nogil:

cdef cppclass IDataTable:
IDataTable()

cdef cppclass TableInfo:
TableInfo()
TableInfo(shared_ptr[const IDataTable] parent,
table_view view,
vector[string] index_names,
vector[string] column_names)
cdef cppclass TableInfoData:
TableInfoData()
TableInfoData(table_view view,
vector[string] indices,
vector[string] columns)

table_view get_view() const
vector[string] get_index_names()
vector[string] get_column_names() const
table_view table_view
vector[string] index_names
vector[string] column_names

int num_indices() const
int num_columns() const
int num_rows() const

cdef public api:
object make_table_from_table_with_metadata(table_with_metadata table, int index_col_count):
Expand Down Expand Up @@ -83,19 +77,8 @@ cdef public api:

return cudf.DataFrame._from_data(data, index)

object make_table_from_table_info(TableInfo info, object owner):

i_names = info.get_index_names()
c_names = info.get_column_names()

index_names = [x.decode() for x in i_names]
column_names = [x.decode() for x in c_names]

data, index = data_from_table_view(info.get_view(), owner, column_names=column_names, index_names=index_names)

return cudf.DataFrame._from_data(data, index)

TableInfo make_table_info_from_table(object table, shared_ptr[const IDataTable] parent):
TableInfoData make_table_info_data_from_table(object table):

cdef vector[string] temp_col_names = get_column_names(table, True)

Expand Down Expand Up @@ -123,4 +106,4 @@ cdef public api:

column_names.push_back(str.encode(name))

return TableInfo(parent, input_table_view, index_names, column_names)
return TableInfoData(input_table_view, index_names, column_names)
12 changes: 8 additions & 4 deletions morpheus/_lib/include/morpheus/io/serializers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,23 @@ void df_to_csv(const TableInfo& tbl,
* @param tbl : A wrapper around data in the dataframe
* @param include_index_col : Determines whether or not to include the dataframe index
* @return std::string
*
* Note the include_index_col is currently being ignored in both versions of `df_to_json` due to a known issue in
* Pandas: https://github.com/pandas-dev/pandas/issues/37600
* Requires MutableTableInfo since there is no C++ implementation of the JSON writer
*/
// Note the include_index_col is currently being ignored in both versions of `df_to_json` due to a known issue in
// Pandas: https://github.com/pandas-dev/pandas/issues/37600
std::string df_to_json(const TableInfo& tbl, bool include_index_col = true);
std::string df_to_json(MutableTableInfo& tbl, bool include_index_col = true);

/**
* @brief Serialize a dataframe into a JSON formatted string
* @param tbl : A wrapper around data in the dataframe
* @param out_stream : Output stream to write the results to a destination
* @param include_index_col : Determines whether or not to include the dataframe index
* @param flush : When `true` flush `out_stream`.
*
* Requires MutableTableInfo since there is no C++ implementation of the JSON writer
*/
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col = true, bool flush = false);
void df_to_json(MutableTableInfo& tbl, std::ostream& out_stream, bool include_index_col = true, bool flush = false);

/** @} */ // end of group
} // namespace morpheus
Loading

0 comments on commit 778eba2

Please sign in to comment.