Skip to content

Commit

Permalink
Merge pull request #8 from bsuryadevara/bhargav_integrated_training_595
Browse files Browse the repository at this point in the history
module docs updates
  • Loading branch information
drobison00 authored Mar 29, 2023
2 parents f76fdd3 + 64635e8 commit 3d041f9
Show file tree
Hide file tree
Showing 36 changed files with 558 additions and 429 deletions.
12 changes: 6 additions & 6 deletions docs/source/control_message_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ The control message is a JSON object used in the Morpheus pipeline workflow. It

## Components

The control message has one main component: `inputs`. The inputs component is an array of input objects, each of which represents a separate input to the pipeline. Each input object has the following structure:
The primary component `inputs` are used to group control messages together, treating them as if they were inputs in a pipeline. In the form of an array, the inputs component is made up of individual control message objects, each representing a distinct input to the pipeline. Each control message object has the following structure:

### Inputs
### Control Message
```
{
"tasks": [
Expand All @@ -35,9 +35,9 @@ The control message has one main component: `inputs`. The inputs component is an
}
```

### Tasks
#### Tasks

The tasks component of each input object is an array of task objects, each of which represents a separate task to be executed on the input data. Each task object has the following structure:
The tasks component of each control message object is an array of task objects, each of which represents a separate task to be executed on the input data. Each task object has the following structure:

```
{
Expand Down Expand Up @@ -67,15 +67,15 @@ The tasks component of each input object is an array of task objects, each of wh
}
```

- `loader_id` : The ID of the loader to be used for loading the input data. Currently, only the `fsspec` and `file_to_df` loader is supported. The user has the option to register custom loaders in the dataloader registry and utilize them in the pipeline.
- `loader_id` : The ID of the loader to be used for loading the input data. Currently, only the `fsspec` and `file_to_df` loaders are supported. The user has the option to register custom loaders in the dataloader registry and utilize them in the pipeline.
- `files` : An array of file paths or glob patterns specifying the input data to be loaded.

- Incorporate key and value updates to properties objects as required for `training` and `inference` tasks. There is no specified format.

### Metadata
The metadata component of each input object is an object containing metadata information. Properties defined in this metadata component can be accessed anywhere across the stages that consume `ControlMessage` objects.

- `data_type` : which is a string indicating the type of data being processed. The supported data types are:
- `data_type` : which is a string indicates how to process the data. The supported data types are:
- `payload` : Arbitrary input data
- `Streaming` : Streaming data

Expand Down
58 changes: 54 additions & 4 deletions docs/source/loaders/core/file_to_df_loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.

## File to DataFrame Loader

This function is used to load files containing data into a dataframe. Dataframe is created by processing files either using a single thread, multiprocess, dask, or dask_thread. This the function determines the download method to use, and if it starts with "dask," it creates a dask client and uses it to process the files. Otherwise, it uses a single thread or multiprocess to process the files. This function then caches the resulting dataframe using a hash of the file paths. In addition to loading data from the disk, it has the ability to load the file content from S3 buckets.
[DataLoader](../../modules/core/data_loader.md) module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to use different processing methods, such as single-threaded, multiprocess, dask, or dask_thread, as determined by the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable. When download_method starts with "dask," a dask client is created to process the files, otherwise, a single thread or multiprocess is used.

### Configurable Parameters
After processing, the resulting dataframe is cached using a hash of the file paths. This loader also has the ability to load file content from S3 buckets, in addition to loading data from the disk.

- `id` (str): Registered loader id.
### Example Loader Configuration

### Example JSON Configuration
Using below configuration while loading DataLoader module, specifies that the DataLoader module should utilize the `file_to_df` loader when loading files into a dataframe.

```json
{
Expand All @@ -32,3 +32,53 @@ This function is used to load files containing data into a dataframe. Dataframe
}]
}
```

**Note** : Loaders can receive configuration from the `load` task via [control message](../../../source/control_message_guide.md) during runtime.

### Task Configurable Parameters

The parameters that can be configured for this specific loader at load task level:

| Parameter | Type | Description | Example Value | Default Value |
| ------------------ | ---------- | -------------------------------- | ------------------------ | -------------- |
| `batcher_config ` | dictionary | Options for batching | See below | `[Required]` |
| `files` | array | List of files to load | ["/path/to/input/files"] | `[]` |
| `loader_id` | string | Unique identifier for the loader | "file_to_df" | `[Required]` |


### `batcher_config`

| Key | Type | Description | Example Value | Default Value |
|-------------------------|------------|--------------------------------------------|----------------------|---------------|
| `cache_dir` | string | Directory to cache the rolling window data | "/path/to/cache" | `-` |
| `file_type` | string | Type of the input file | "csv" | `"JSON"` |
| `filter_null` | boolean | Whether to filter out null values | true | `false` |
| `parser_kwargs` | dictionary | Keyword arguments to pass to the parser | {"delimiter": ","} | `-` |
| `schema` | dictionary | Schema of the input data | See Below | `-` |
| `timestamp_column_name` | string | Name of the timestamp column | "timestamp" | `-` |

### Example Load Task Configuration

Below JSON configuration specifies how to pass additional configuration to the loader through a control message task at runtime.

```json
{
"type": "load",
"properties": {
"loader_id": "file_to_df",
"files": ["/path/to/input/files"],
"batcher_config": {
"timestamp_column_name": "timestamp_column_name",
"schema": "string",
"file_type": "JSON",
"filter_null": false,
"parser_kwargs": {
"delimiter": ","
},
"cache_dir": "/path/to/cache"
}
}
}
```

**Note** : The [file_batcher](../../../../morpheus/modules/file_batcher.py) module currently generates tasks internally and assigns them to control messages, and then sends them to [DataLoader](../../modules/core/data_loader.md) module which uses [file_to_df_loader](../../../../morpheus/loaders/file_to_df_loader.py). Having stated that, this loader configuration is obtained from the [File Batcher](../../modules/core/file_batcher.md) module configuration.
35 changes: 30 additions & 5 deletions docs/source/loaders/core/fsspec_loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ limitations under the License.

## Filesystem Spec Loader

Loads data from external sources using the fsspec library, and returns the updated ControlMessage object with payload as MessageMeta, which contains dataframe (with filenames).
[DataLoader](../../modules/core/data_loader.md) module is configured to use this loader function. It is responsible for loading data from external sources using the fsspec library, and returns the updated ControlMessage object with payload as MessageMeta, which contains dataframe (with filenames).

### Configurable Parameters

- `id` (str): Registered loader id.

### Example JSON Configuration
### Example Loader Configuration

```json
{
Expand All @@ -32,3 +29,31 @@ Loads data from external sources using the fsspec library, and returns the updat
}]
}
```

**Note** : Loaders can receive configuration from the `load` task via [control message](../../../source/control_message_guide.md) during runtime.

### Task Configurable Parameters

The parameters that can be configured for this specific loader at load task level:

| Parameter | Type | Description | Example Value | Default Value |
| ------------------ | ---------- | -------------------------------- | --------------------------------- | -------------- |
| `files` | array | List of files to load | ["/your/input/filepath"] | `[]` |
| `loader_id` | string | Unique identifier for the loader | "file_to_df" | `[Required]` |




### Example Load Task Configuration

Below JSON configuration specifies how to pass additional configuration to the loader through a control message task at runtime.

```json
{
"type": "load",
"properties": {
"loader_id": "file_to_df",
"files": ["/your/input/filepath"],
}
}
```
2 changes: 2 additions & 0 deletions docs/source/loaders/morpheus_loaders.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ limitations under the License.

Custom functions called "Loaders" can be utilized by the DataLoader Module to load data into the pipeline. The user can choose to register their own customized loader function and add it to a dataloader registry, which will then become accessible to the DataLoader module during module loading.

**Note** : Loaders receive configuration from the `load` task via [control message](../../source/control_message_guide.md) during runtime.

## Core Loaders

- [File to DataFrame Loader](./core/file_to_df_loader.md)
Expand Down
12 changes: 6 additions & 6 deletions docs/source/modules/core/data_loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ are specified in the module configuration file at the time of object constructio

### Configurable Parameters

| Parameter | Type | Description | Example Value | Default Value |
|-----------|-------|---------------------------------------------------|---------------|---------------|
| `loaders` | array | An array containing information on loaders to use | See Below | [] |
| Parameter | Type | Description | Example Value | Default Value |
|-----------|-------|---------------------------------------------------|---------------|-----------------|
| `loaders` | array | An array containing information on loaders to use | See Below | `[]` |

### `loaders`

| Parameter | Type | Description | Example Value | Default Value |
|--------------|------------|------------------------------------------|----------------------------------------|---------------|
| `id` | string | Unique identifier for the loader | `loader1` | - |
| `properties` | dictionary | Dictionary of properties for that loader | `{... loader specific parameters ...}` | `{}` |
| `id` | string | Unique identifier for the loader | "loader1" | `-` |
| `properties` | dictionary | Dictionary of properties for that loader | {... loader specific parameters ...} | `{}` |

### Example JSON Configuration

Expand All @@ -53,4 +53,4 @@ are specified in the module configuration file at the time of object constructio
}
]
}
```
```
30 changes: 15 additions & 15 deletions docs/source/modules/core/file_batcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,29 @@ remaining files by period that fall inside the window.

| Parameter | Type | Description | Example Value | Default Value |
|-------------------------|------------|-------------------------------|------------------------|---------------|
| `batching_options` | dictionary | Options for batching | See below | - |
| `cache_dir` | string | Cache directory | `./file_batcher_cache` | None |
| `file_type` | string | File type | JSON | JSON |
| `filter_nulls` | boolean | Whether to filter null values | false | false |
| `batching_options` | dictionary | Options for batching | See below | `-` |
| `cache_dir` | string | Cache directory | "./file_batcher_cache" | `None` |
| `file_type` | string | File type | "JSON" | `"JSON"` |
| `filter_nulls` | boolean | Whether to filter null values | false | `false` |
| `schema` | dictionary | Data schema | See below | `[Required]` |
| `timestamp_column_name` | string | Name of the timestamp column | timestamp | timestamp |
| `timestamp_column_name` | string | Name of the timestamp column | "timestamp" | `"timestamp"` |

### `batching_options`

| Key | Type | Description | Example Value | Default Value |
|--------------------------|-----------------|-------------------------------------|---------------------------------------------|--------------------------|
| `end_time` | datetime/string | Endtime of the time window | "2023-03-14T23:59:59" | None |
| `iso_date_regex_pattern` | string | Regex pattern for ISO date matching | "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}" | <iso_date_regex_pattern> |
| `parser_kwargs` | dictionary | Additional arguments for the parser | {} | {} |
| `period` | string | Time period for grouping files | "1d" | "1d" |
| `sampling_rate_s` | integer | Sampling rate in seconds | 60 | 60 |
| `start_time` | datetime/string | Start time of the time window | "2023-03-01T00:00:00" | None |
| Key | Type | Description | Example Value | Default Value |
|--------------------------|-----------------|-------------------------------------|---------------------------------------------|----------------------------|
| `end_time` | datetime/string | Endtime of the time window | "2023-03-14T23:59:59" | `None` |
| `iso_date_regex_pattern` | string | Regex pattern for ISO date matching | "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}" | `<iso_date_regex_pattern>` |
| `parser_kwargs` | dictionary | Additional arguments for the parser | {} | `{}` |
| `period` | string | Time period for grouping files | "1d" | `"D"` |
| `sampling_rate_s` | integer | Sampling rate in seconds | 60 | `60` |
| `start_time` | datetime/string | Start time of the time window | "2023-03-01T00:00:00" | `None` |

### `schema`

| Key | Type | Description | Example Value | Default Value |
|--------------|--------|---------------|---------------|---------------|
| `encoding` | string | Encoding | "latin1" | "latin1" |
| `encoding` | string | Encoding | "latin1" | `"latin1"` |
| `schema_str` | string | Schema string | "string" | `[Required]` |

### Example JSON Configuration
Expand All @@ -70,4 +70,4 @@ remaining files by period that fall inside the window.
},
"timestamp_column_name": "timestamp"
}
```
```
15 changes: 8 additions & 7 deletions docs/source/modules/core/file_to_df.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ addition to loading data from the disk, it has the ability to load the file cont

| Parameter | Type | Description | Example Value | Default Value |
|-------------------------|------------|--------------------------------------------|----------------------|---------------|
| `cache_dir` | string | Directory to cache the rolling window data | `/path/to/cache` | - |
| `file_type` | string | Type of the input file | `csv` | JSON |
| `filter_null` | boolean | Whether to filter out null values | true | false |
| `parser_kwargs` | dictionary | Keyword arguments to pass to the parser | `{"delimiter": ","}` | - |
| `schema` | dictionary | Schema of the input data | See Below | - |
| `timestamp_column_name` | string | Name of the timestamp column | `timestamp` | - |
| `cache_dir` | string | Directory to cache the rolling window data | "/path/to/cache" | `-` |
| `file_type` | string | Type of the input file | "csv" | `"JSON"` |
| `filter_null` | boolean | Whether to filter out null values | true | `false` |
| `parser_kwargs` | dictionary | Keyword arguments to pass to the parser | {"delimiter": ","} | `-` |
| `schema` | dictionary | Schema of the input data | See Below | `-` |
| `timestamp_column_name` | string | Name of the timestamp column | "timestamp" | `-` |

### Example JSON Configuration

Expand All @@ -46,4 +46,5 @@ addition to loading data from the disk, it has the ability to load the file cont
"encoding": "latin1"
},
"timestamp_column_name": "timestamp"
}
}
```
11 changes: 6 additions & 5 deletions docs/source/modules/core/filter_control_message.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ When the requirements are met, this module gently discards the control messages.

| Parameter | Type | Description | Example Value | Default Value |
|------------------------------|---------|--------------------------------------|---------------------|---------------|
| `enable_data_type_filtering` | boolean | Enables filtering based on data type | true | false |
| `enable_task_filtering` | boolean | Enables filtering based on task type | true | false |
| `filter_data_type` | string | The data type to be used as a filter | `desired_data_type` | None |
| `filter_task_type` | string | The task type to be used as a filter | `specific_task` | None |
| `enable_data_type_filtering` | boolean | Enables filtering based on data type | true | `false` |
| `enable_task_filtering` | boolean | Enables filtering based on task type | true | `false` |
| `filter_data_type` | string | The data type to be used as a filter | "desired_data_type" | `None` |
| `filter_task_type` | string | The task type to be used as a filter | "specific_task" | `None` |

### Example JSON Configuration

Expand All @@ -36,4 +36,5 @@ When the requirements are met, this module gently discards the control messages.
"enable_data_type_filtering": true,
"filter_task_type": "specific_task",
"filter_data_type": "desired_data_type"
}
}
```
Loading

0 comments on commit 3d041f9

Please sign in to comment.