Skip to content

Commit

Permalink
Add support for a CPU-only Mode (#1851)
Browse files Browse the repository at this point in the history
* Adds a new enum `morpheus.config.ExecutionMode` with members `GPU` & `CPU` along with a new `morpheus.config.Config.execution_mode` attribute.
* For backwards compatibility, by default `Config.execution_mode` will always default to `GPU`
* Add new `supported_execution_modes` to `StageBase` which returns `ExecutionMode.GPU` by default. This ensures that building a pipeline with a stage not matching the execution mode will raise a reasonable error to the user.
* Add `CpuOnlyMixin` and `GpuAndCpuMixin` mixins to automate overriding this, and makes it easier for users to determine which execution modes a given stage supports at a glance.
* Since C++ Stage/Message impls can only support cuDF DataFrames, and RMM tensors, this PR re-purposes the existing Python stage/message impls mode to serve as CPU-only mode.
* CPU-only mode will center around pandas DataFrames and NumPy arrays for tensors, since the current Python code which expects cuDF/CuPy is already 99% compatible with pandas/NumPy.
* Avoid importing `cudf` or any other GPU based package which will fail on import at the top-level of a module. This is important for stage, message and modules which are automatically imported by the morpheus CLI tool.
* Add new utility methods to `morpheus.utils.type_utils` (ex: `get_df_pkg`, `is_cudf_type`) to help avoid importing cudf directly
* Add a new `Config.freeze` method which will make a config object immutable. This will be called the first time a config object is used to construct a pipeline or stage object. Prevents the possibility of config parameters from being changed in the middle of pipeline construction.
* `CudfHelper::load` is no longer called automatically on import, instead it is called manually on pipeline build when execution mode is GPU.
* Add Python implementation of `ControlMessage`
* To simulate a system without a GPU to test CPU-only mode, if the `CPU_ONLY` environment variable is defined `docker/run_container_dev.sh` will launch the container using the `runc` runtime.
* Remove automatic test parameterization of C++/Python mode, since supporting CPU-only mode will become the exception not the rule. Add a new `gpu_and_cpu_mode` test marker to explicitly indicate a test intended to be parameterized over execution modes.
* Fix copy constructor for `ControlMessage`
* `AppShieldSourceStage` now emits `ControlMessage`s, `AppShieldMessageMeta` is now deprecated
* `AutoencoderSourceStage` and thus `AzureSourceStage`, `CloudTrailSourceStage`, and `DuoSourceStage` now emit `ControlMessage`, `UserMessageMeta` is now deprecated.
* DFP production pipeline updated to remove `DFPMessageMeta`, pipeline now executes in C++ mode.
* Consolidate common logig in `docker/run_container_dev.sh` & `docker/run_container_release.sh` into `docker/run_container.sh`
* Remove inconsistent behavior in the Python impl of `TensorMemory.set_tensor` (#1955)

Closes #1646
Closes #1846
Closes #1852
Closes #1955

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)
  - Yuchen Zhang (https://github.com/yczhang-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1851
  • Loading branch information
dagardner-nv authored Oct 18, 2024
1 parent 5692639 commit e13e345
Show file tree
Hide file tree
Showing 266 changed files with 3,257 additions and 3,145 deletions.
5 changes: 5 additions & 0 deletions ci/vale/styles/config/vocabularies/morpheus/accept.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ CMake
Conda
CPython
[Cc]ryptocurrenc[y|ies]
cuDF
cuML
CuPy
[Cc]yber
[Cc]ybersecurity
Cython
Expand Down Expand Up @@ -51,7 +54,9 @@ NeMo
nginx
NIC
NIM(s?)
NumPy
NVIDIA
pandas
[Pp]arallelization
[Pp]arsable
PCIe
Expand Down
57 changes: 57 additions & 0 deletions docker/run_container.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Color variables
b="\033[0;36m"
g="\033[0;32m"
r="\033[0;31m"
e="\033[0;90m"
y="\033[0;33m"
x="\033[0m"

_UNDEF_VAR_ERROR_MSG="Use the dev/release scripts to set these automatically"

DOCKER_IMAGE_NAME=${DOCKER_IMAGE_NAME:?"Must set \$DOCKER_IMAGE_NAME. ${_UNDEF_VAR_ERROR_MSG}"}
DOCKER_IMAGE_TAG=${DOCKER_IMAGE_TAG:?"Must set \$DOCKER_IMAGE_TAG. ${_UNDEF_VAR_ERROR_MSG}"}

# DOCKER_ARGS are set by the dev/release scripts
# DOCKER_EXTRA_ARGS are optionally set by the user
DOCKER_ARGS=${DOCKER_ARGS:-""}
DOCKER_ARGS="${DOCKER_ARGS} --net=host --cap-add=sys_nice ${DOCKER_EXTRA_ARGS}"
DOCKER_EXTRA_ARGS=${DOCKER_EXTRA_ARGS:-""}

if [[ -n "${CPU_ONLY}" ]]; then
echo -e "${b}Executing in CPU only mode${x}"
DOCKER_ARGS="${DOCKER_ARGS} --runtime=runc"
else
echo -e "${b}Executing in GPU mode${x}"
DOCKER_ARGS="${DOCKER_ARGS} --runtime=nvidia --gpus=all"
fi

if [[ -n "${SSH_AUTH_SOCK}" ]]; then
echo -e "${b}Setting up ssh-agent auth socket${x}"
DOCKER_ARGS="${DOCKER_ARGS} -v $(readlink -f $SSH_AUTH_SOCK):/ssh-agent:ro -e SSH_AUTH_SOCK=/ssh-agent"
fi

echo -e "${g}Launching ${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG}...${x}"

# Enable command logging to show what is being executed
set -x
docker run ${DOCA_EXTRA_ARGS} --rm -ti ${DOCKER_ARGS} ${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} "${@:-bash}"

{ EXIT_CODE=$?; set +x; } 2>/dev/null

exit $EXIT_CODE
38 changes: 6 additions & 32 deletions docker/run_container_dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# set -x
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"

# Color variables
b="\033[0;36m"
g="\033[0;32m"
r="\033[0;31m"
e="\033[0;90m"
y="\033[0;33m"
x="\033[0m"
export DOCKER_IMAGE_NAME=${DOCKER_IMAGE_NAME:-"morpheus"}
export DOCKER_IMAGE_TAG=${DOCKER_IMAGE_TAG:-"dev-$(date +'%y%m%d')"}

DOCKER_IMAGE_NAME=${DOCKER_IMAGE_NAME:-"morpheus"}
DOCKER_IMAGE_TAG=${DOCKER_IMAGE_TAG:-"dev-$(date +'%y%m%d')"}
DOCKER_EXTRA_ARGS=${DOCKER_EXTRA_ARGS:-""}
export DOCKER_ARGS="-v $PWD:/workspace -v /dev/hugepages:/dev/hugepages --privileged"

DOCKER_ARGS="--runtime=nvidia --env WORKSPACE_VOLUME=${PWD} -v $PWD:/workspace --net=host --gpus=all --cap-add=sys_nice"

if [[ -n "${SSH_AUTH_SOCK}" ]]; then
echo -e "${b}Setting up ssh-agent auth socket${x}"
DOCKER_ARGS="${DOCKER_ARGS} -v $(readlink -f $SSH_AUTH_SOCK):/ssh-agent:ro -e SSH_AUTH_SOCK=/ssh-agent"
fi

echo -e "${g}Launching ${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG}...${x}"

set -x
docker run \
-v /dev/hugepages:/dev/hugepages \
--privileged \
--rm \
-ti \
${DOCKER_ARGS} ${DOCKER_EXTRA_ARGS} \
${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} "${@:-bash}"

{ EXIT_CODE=$?; set +x; } 2>/dev/null

exit $EXIT_CODE
# Call the general run script
${SCRIPT_DIR}/run_container.sh
39 changes: 7 additions & 32 deletions docker/run_container_release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,23 @@

SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"

# Color variables
b="\033[0;36m"
g="\033[0;32m"
r="\033[0;31m"
e="\033[0;90m"
y="\033[0;33m"
x="\033[0m"

# Change to the script file to ensure we are in the correct repo (in case were in a submodule)
pushd ${SCRIPT_DIR} &> /dev/null

MORPHEUS_SUPPORT_DOCA=${MORPHEUS_SUPPORT_DOCA:-OFF}
MORPHEUS_BUILD_MORPHEUS_LLM=${MORPHEUS_BUILD_MORPHEUS_LLM:-ON}
MORPHEUS_BUILD_MORPHEUS_DFP=${MORPHEUS_BUILD_MORPHEUS_DFP:-ON}

DOCKER_IMAGE_NAME=${DOCKER_IMAGE_NAME:-"nvcr.io/nvidia/morpheus/morpheus"}
DOCKER_IMAGE_TAG=${DOCKER_IMAGE_TAG:-"$(git describe --tags --abbrev=0)-runtime"}

# This variable is used for passing extra arguments to the docker run command. Do not use DOCKER_ARGS for this purpose.
DOCKER_EXTRA_ARGS=${DOCKER_EXTRA_ARGS:-""}
export DOCKER_IMAGE_NAME=${DOCKER_IMAGE_NAME:-"nvcr.io/nvidia/morpheus/morpheus"}
export DOCKER_IMAGE_TAG=${DOCKER_IMAGE_TAG:-"$(git describe --tags --abbrev=0)-runtime"}

popd &> /dev/null

DOCKER_ARGS="--runtime=nvidia --env WORKSPACE_VOLUME=${PWD} --net=host --gpus=all --cap-add=sys_nice ${DOCKER_EXTRA_ARGS}"

if [[ -n "${SSH_AUTH_SOCK}" ]]; then
echo -e "${b}Setting up ssh-agent auth socket${x}"
DOCKER_ARGS="${DOCKER_ARGS} -v $(readlink -f $SSH_AUTH_SOCK):/ssh-agent:ro -e SSH_AUTH_SOCK=/ssh-agent"
fi

# DPDK requires hugepage and privileged container
DOCA_EXTRA_ARGS=""
# DPDK (and thus DOCA) requires hugepage and privileged container
export DOCKER_ARGS=""
if [[ ${MORPHEUS_SUPPORT_DOCA} == @(TRUE|ON) ]]; then
echo -e "${b}Enabling DOCA Support. Mounting /dev/hugepages and running in privileged mode${x}"
echo -e "Enabling DOCA Support. Mounting /dev/hugepages and running in privileged mode"

DOCKER_ARGS="${DOCKER_ARGS} -v /dev/hugepages:/dev/hugepages --privileged"
fi


echo -e "${g}Launching ${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG}...${x}"

# Enable command logging to show what is being executed
set -x
docker run ${DOCA_EXTRA_ARGS} --rm -ti ${DOCKER_ARGS} ${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_TAG} "${@:-bash}"
set +x
# Call the general run script
${SCRIPT_DIR}/run_container.sh
1 change: 1 addition & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@
r'^http://$',
r'^https://$',
r'https://(platform\.)?openai.com',
r'https://code.visualstudio.com'
]

# Add any paths that contain templates here, relative to this directory.
Expand Down
5 changes: 3 additions & 2 deletions docs/source/developer_guide/guides/2_real_world_phishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ The code for our sink will be similar to other stages with a few changes. First,

```python
@register_stage("to-rabbitmq")
class WriteToRabbitMQStage(PassThruTypeMixin, SinglePortStage):
class WriteToRabbitMQStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage):
```

Our sink will function as a pass-through allowing the possibility of other sinks to be added to the pipeline. We could, hypothetically, have a pipeline where we emit the results to both RabbitMQ and a file. For this reason we will also be using the `PassThruTypeMixin`.
Expand Down Expand Up @@ -1032,14 +1032,15 @@ import pika
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage

logger = logging.getLogger(__name__)


@register_stage("to-rabbitmq")
class WriteToRabbitMQStage(PassThruTypeMixin, SinglePortStage):
class WriteToRabbitMQStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage):
"""
Source stage used to load messages from a RabbitMQ queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ Defines a single column and type-cast.
| Argument | Type | Description |
| -------- | ---- | ----------- |
| `name` | `str` | Name of the column |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [Pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |

#### Custom Column (`CustomColumn`)
Subclass of `ColumnInfo`, defines a column to be computed by a user-defined function `process_column_fn`.

| Argument | Type | Description |
| -------- | ---- | ----------- |
| `name` | `str` | Name of the column |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [Pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `process_column_fn` | `function` | Function which receives the entire `DataFrame` as its only input, returning a new [`pandas.Series`](https://pandas.pydata.org/docs/reference/api/pandas.Series.html) object to be stored in column `name`. |
| `input_column_types` | `dict[str, str]` | The input columns and the expected [`dtype` strings](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) that are needed for this Column to successfully process. Setting this as `None` will pass all columns. Specifying which columns are needed improves performance. |

Expand Down Expand Up @@ -139,7 +139,7 @@ Subclass of `RenameColumn`, specific to casting UTC localized `datetime` values.
| Argument | Type | Description |
| -------- | ---- | ----------- |
| `name` | `str` | Name of the destination column |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [Pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `input_name` | `str` | Original column name |

#### String-Join Column (`StringJoinColumn`)
Expand All @@ -148,7 +148,7 @@ Subclass of `RenameColumn`, converts incoming `list` values to string by joining
| Argument | Type | Description |
| -------- | ---- | ----------- |
| `name` | `str` | Name of the destination column |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [Pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `input_name` | `str` | Original column name |
| `sep` | `str` | Separator string to use for the join |

Expand All @@ -158,7 +158,7 @@ Subclass of `ColumnInfo`, concatenates values from multiple columns into a new s
| Argument | Type | Description |
| -------- | ---- | ----------- |
| `name` | `str` | Name of the destination column |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [Pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `dtype` | `str` or Python type | Any type string or Python class recognized by [pandas](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) |
| `input_columns` | `List[str]` | List of columns to concatenate |
| `sep` | `str` | Separator string |

Expand Down
8 changes: 2 additions & 6 deletions examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
from functools import partial

import cupy as cp
import mrc
import numpy as np

import cudf

import morpheus._lib.messages as _messages
from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import ControlMessage
from morpheus.messages import InferenceMemoryFIL
from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage


Expand Down Expand Up @@ -184,7 +183,7 @@ def round_time_kernel(timestamp, rollup_time, secs):
seq_ids[:, 2] = fea_len - 1

# Create the inference memory. Keep in mind count here could be > than input count
memory = _messages.InferenceMemoryFIL(count=count, input__0=data, seq_ids=seq_ids)
memory = InferenceMemoryFIL(count=count, input__0=data, seq_ids=seq_ids)

infer_message = ControlMessage(msg)
infer_message.payload(meta)
Expand All @@ -197,6 +196,3 @@ def _get_preprocess_fn(self) -> typing.Callable[[ControlMessage], ControlMessage
fea_len=self._fea_length,
fea_cols=self.features,
req_cols=self.req_cols)

def _get_preprocess_node(self, builder: mrc.Builder):
raise NotImplementedError("C++ node not implemented for this stage")
Loading

0 comments on commit e13e345

Please sign in to comment.