Skip to content

Commit

Permalink
Integrated training: Framework updates and Modular DFP pipeline imple…
Browse files Browse the repository at this point in the history
…mentation (nv-morpheus#760)

Resolves nv-morpheus#595

Authors:
  - Devin Robison (https://github.com/drobison00)
  - Bhargav Suryadevara (https://github.com/bsuryadevara)
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#760
  • Loading branch information
drobison00 authored Mar 30, 2023
1 parent 71c9366 commit c29aa16
Show file tree
Hide file tree
Showing 213 changed files with 12,294 additions and 1,632 deletions.
5 changes: 1 addition & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ project(morpheus
VERSION 23.03.00
LANGUAGES C CXX CUDA)

rapids_cmake_write_version_file(${CMAKE_BINARY_DIR}/autogenerated/include/mrc/version.hpp)
rapids_cmake_write_version_file(${CMAKE_BINARY_DIR}/autogenerated/include/morpheus/version.hpp)

# Ccache configuration
include(environment/init_ccache)
Expand Down Expand Up @@ -169,11 +169,8 @@ if(MORPHEUS_ENABLE_DEBUG_INFO)

morpheus_utils_print_target_properties(
TARGETS
common
messages
morpheus
morpheus_utils
stages
WRITE_TO_FILE
)

Expand Down
1 change: 1 addition & 0 deletions docs/source/_templates/custom-module-template.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

.. automodule:: {{ fullname }}
:members:
:undoc-members:
:exclude-members: {{ classes | join(", ") }}

{% block attributes %}
Expand Down
124 changes: 124 additions & 0 deletions docs/source/control_message_guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# Control Message Documentation

The control message is a JSON object used in the Morpheus pipeline workflow. It is wrapped in a `ControlMessage` object and passed between the Morpheus stages.

## Components

The primary component `inputs` are used to group control messages together, treating them as if they were inputs in a pipeline. In the form of an array, the inputs component is made up of individual control message objects, each representing a distinct input to the pipeline. Each control message object has the following structure:

### Control Message
```
{
"tasks": [
// Array of task objects
],
"metadata": {
// Metadata object
}
}
```

#### Tasks

The tasks component of each control message object is an array of task objects, each of which represents a separate task to be executed on the input data. Each task object has the following structure:

```
{
"type": "string",
"properties": {
// Properties object
}
}
```


- `type` : The type field of the task object is a string indicating the type of task to be executed. Currently, the following task types are supported:

- `load` : Load input data from a specified file or files
- `training` : Train a machine learning model on the input data
- `inference` : Perform inference using a trained machine learning model on the input data

- `properties` : The properties field of the task object is an object containing task-specific properties. The specific properties required for each task type are described below.

- The properties object for a `load` task has the following structure:
```
{
"loader_id": "string",
"files": [
"string"
]
}
```
- `loader_id` : The ID of the loader to be used for loading the input data. Currently, only the `fsspec` and `file_to_df` loaders are supported. The user has the option to register custom loaders in the dataloader registry and utilize them in the pipeline.
- `files` : An array of file paths or glob patterns specifying the input data to be loaded.
- Incorporate key and value updates to properties objects as required for `training` and `inference` tasks. There is no specified format.
### Metadata
The metadata component of each input object is an object containing metadata information. Properties defined in this metadata component can be accessed anywhere across the stages that consume `ControlMessage` objects.
- `data_type` : which is a string indicates how to process the data. The supported data types are:
- `payload` : Arbitrary input data
- `Streaming` : Streaming data
## Example
This example demonstrates how to add various parameters to control message JSON. Below message contains an array of three task objects: a `load` task, a `training` task, and an `inference` task. The `load` task loads input data from two files specified in the files array to a dataframe using the fsspec loader. The `training` task trains a neural network model with three layers and ReLU activation. The `inference` task performs inference using the trained model with ID `model_001`. The metadata component of the input object indicates that the input data type is `payload`.
```json
{
"inputs": [
{
"tasks": [
{
"type": "load",
"properties": {
"loader_id": "fsspec",
"files": [
"/path/to/file1",
"/path/to/file2"
]
}
},
{
"type": "training",
"properties": {
"model_type": "neural_network",
"model_params": {
"num_layers": 3,
"activation": "relu"
}
}
},
{
"type": "inference",
"properties": {
"model_id": "model_001"
}
}
],
"metadata": {
"data_type": "payload"
}
}
]
}
```
96 changes: 73 additions & 23 deletions docs/source/developer_guide/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,100 @@ limitations under the License.
# Morpheus Architecture

## Overview

The organization of Morpheus can be broken down into four different layers. Working from the top down:

* Orchestration Layer
* Responsible for coordinating pipelines and facilitating communication.
* That is, monitoring pipelines, transferring messages between pipelines, starting and stopping pipelines, assigning resources to pipelines, and so on.
* Plays a large role in multi-machine pipelines but works out of the box for single-machine pipelines.
* Responsible for coordinating pipelines and facilitating communication.
* That is, monitoring pipelines, transferring messages between pipelines, starting and stopping pipelines,
assigning resources to pipelines, and so on.
* Plays a large role in multi-machine pipelines but works out of the box for single-machine pipelines.
* Pipeline Layer
* Composed of one or more stages connected by edges.
* Data moves between stages using buffered channels, and the pipeline will automatically handle backpressure by monitoring the amount of data in each edge buffer.
* Composed of one or more stages connected by edges.
* Data moves between stages using buffered channels, and the pipeline will automatically handle backpressure by
monitoring the amount of data in each edge buffer.
* Stage Layer
* Main building blocks in Morpheus.
* Responsible for executing a specific function on incoming data from previous stages in the pipeline.
* Isolated from each other and can be thought of as black boxes.
* Composed of one or more nodes, connected by edges.
* All nodes are guaranteed to operate on the same machine, in the same process space.
* Main building blocks in Morpheus.
* Responsible for executing a specific function on incoming data from previous stages in the pipeline.
* Isolated from each other and can be thought of as black boxes.
* Composed of one or more nodes, connected by edges.
* All nodes are guaranteed to operate on the same machine, in the same process space.
* Module Layer
* TODO: Add details about modules.
* Node Layer
* Smallest building block in Morpheus.
* Each node operates on the same thread.
* Composed of one or more operators in the reactive programming style.
* Smallest building block in Morpheus.
* Each node operates on the same thread.
* Composed of one or more operators in the reactive programming style.

## Pipeline Details
Pipelines are a collection of one or more stages that are connected via edges. Data flows from one stage to the next across these edges using buffers. We utilize these buffers to allow stages to process messages at different rates. Once each stage is done processing a message, the pipeline will move it onto the next stage's buffer for processing. This process continues until the message has made it through the entire pipeline.

The main goal of the pipeline is to maximize throughput via parallel execution of the stages. So we can utilize hardware optimally and avoid processing individual messages sequentially. Given a multi-stage pipeline consisting of stages 1 and 2. Stage 1 collects its first message from its data source and begins processing it. Once Stage 1 is done with its first message, the resulting output message will be forwarded to Stage 2. At this point, Stage 1 immediately begins processing the next input to the pipeline, while Stage 2 begins work on the output of Stage 1. This allows for multiple messages to be in flight in the pipeline at a time, increasing parallelization.
Pipelines are a collection of one or more stages that are connected via edges. Data flows from one stage to the next
across these edges using buffers. We utilize these buffers to allow stages to process messages at different rates. Once
each stage is done processing a message, the pipeline will move it onto the next stage's buffer for processing. This
process continues until the message has made it through the entire pipeline.

The main goal of the pipeline is to maximize throughput via parallel execution of the stages. So we can utilize hardware
optimally and avoid processing individual messages sequentially. Given a multi-stage pipeline consisting of stages 1 and
2. Stage 1 collects its first message from its data source and begins processing it. Once Stage 1 is done with its first
message, the resulting output message will be forwarded to Stage 2. At this point, Stage 1 immediately begins processing
the next input to the pipeline, while Stage 2 begins work on the output of Stage 1. This allows for multiple messages to
be in flight in the pipeline at a time, increasing parallelization.

Utilizing buffers between stages in this way does come at a cost. Increasing the size of the buffers helps improve parallelization by ensuring all stages have some work to do. But this also increases latency since messages can sit in a buffer waiting to be processed. The inverse is also true. Decreasing the buffer sizes improves latency, but can starve some stages of work to do, decreasing parallelization. The pipeline has to walk a fine line of keeping all stages supplied with data with the smallest buffers possible.
Utilizing buffers between stages in this way does come at a cost. Increasing the size of the buffers helps improve
parallelization by ensuring all stages have some work to do. But this also increases latency since messages can sit in a
buffer waiting to be processed. The inverse is also true. Decreasing the buffer sizes improves latency, but can starve
some stages of work to do, decreasing parallelization. The pipeline has to walk a fine line of keeping all stages
supplied with data with the smallest buffers possible.

## Stage Details
A stage is the fundamental building block in Morpheus and is responsible for performing all of the work in a pipeline. A stage can encapsulate any piece of functionality and is capable of integrating with any service or external library. This freedom allows stages to range from very small Python map functions up to very complex inference stages, which connect to services and work in multiple threads. For example, Morpheus has simple stages for actions like reading and writing to a file and more complex stages like the Triton inference stage, which can send many asynchronous inference requests using shared device memory.

A stage is the fundamental building block in Morpheus and is responsible for performing all of the work in a pipeline. A
stage can encapsulate any piece of functionality and is capable of integrating with any service or external library.
This freedom allows stages to range from very small Python map functions up to very complex inference stages, which
connect to services and work in multiple threads. For example, Morpheus has simple stages for actions like reading and
writing to a file and more complex stages like the Triton inference stage, which can send many asynchronous inference
requests using shared device memory.

While stages are very flexible, they all comprise three main pieces: identification, type inference, and node creation.

### Identification

The stage identifier is a unique string used in both logging and creating the stage from the CLI.

### Type Inference
To perform work, each stage needs to know what type of data it will be operating on. Since Morpheus can pass any type of data from stage to stage, the pipeline must ensure compatible types at every edge connection between stages. This process is called stage type inference and is performed during the pipeline build phase.

Stage type inference is necessary because the output type of some stages may depend on the output type of the previous stage. For example, consider a simple pass through stage that passes the input message to the next stage unmodified. If our pass through stage is preceded by a stage generating a string, its output type will be a string. Instead, if it's preceded by a stage generating an integer, its output type will be an integer.
To perform work, each stage needs to know what type of data it will be operating on. Since Morpheus can pass any type of
data from stage to stage, the pipeline must ensure compatible types at every edge connection between stages. This
process is called stage type inference and is performed during the pipeline build phase.

Due to the dynamic nature of the output type of a stage, stages must specify a type inference function that accepts an input type and returns the output type. Starting at the source stages, the pipeline will use this function to determine the output type of the source stages. This result will then be passed to the type inference function of the next stage, and so on until the input and output types of every stage in the pipeline have been determined.
Stage type inference is necessary because the output type of some stages may depend on the output type of the previous
stage. For example, consider a simple pass through stage that passes the input message to the next stage unmodified. If
our pass through stage is preceded by a stage generating a string, its output type will be a string. Instead, if it's
preceded by a stage generating an integer, its output type will be an integer.

After the build phase, the output types of stages cannot be changed. Returning a different type than specified during the build phase will result in undefined behavior.
Due to the dynamic nature of the output type of a stage, stages must specify a type inference function that accepts an
input type and returns the output type. Starting at the source stages, the pipeline will use this function to determine
the output type of the source stages. This result will then be passed to the type inference function of the next stage,
and so on until the input and output types of every stage in the pipeline have been determined.

After the build phase, the output types of stages cannot be changed. Returning a different type than specified during
the build phase will result in undefined behavior.

### Node Creation
The most important piece of a stage is node creation. The node creation function is responsible for creating the instances of the nodes which will make up a stage. Like a pipeline, stages can be built up of one or more smaller nodes connected by edges.

The difference between stages and nodes is that stages guarantee that the same machine will run all nodes in the same process space. This allows nodes to optimize the information they pass between themselves to ensure maximum performance. For example, two nodes could pass a raw GPU device pointer between them, allowing maximum performance with minimum overhead. Without this guarantee that both nodes are running in the same process space, passing such a low-level piece of information would be unsafe.
The most important piece of a stage is node creation. The node creation function is responsible for creating the
instances of the nodes which will make up a stage. Like a pipeline, stages can be built up of one or more smaller nodes
connected by edges.

The difference between stages and nodes is that stages guarantee that the same machine will run all nodes in the same
process space. This allows nodes to optimize the information they pass between themselves to ensure maximum performance.
For example, two nodes could pass a raw GPU device pointer between them, allowing maximum performance with minimum
overhead. Without this guarantee that both nodes are running in the same process space, passing such a low-level piece
of information would be unsafe.

## Morpheus Modules

Modules, introduced in the 23.03 release, introduce a new method for defining units of work which are compact,
composable, nestable, and fully reusable. Once a module has been defined and registered, it can be used in new and
existing pipelines as either a new ModuleStage or loaded directly within the context of an existing stage using
`builder.load_module(...)`.
Loading

0 comments on commit c29aa16

Please sign in to comment.