Skip to content

Commit

Permalink
Simplification of the streaming RAG ingest example to improve usabili…
Browse files Browse the repository at this point in the history
…ty (#1454)

Closes 

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Devin Robison (https://github.com/drobison00)
  - Bhargav Suryadevara (https://github.com/bsuryadevara)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1454
  • Loading branch information
drobison00 authored Feb 6, 2024
1 parent 4c95c1d commit 44825d9
Show file tree
Hide file tree
Showing 87 changed files with 5,014 additions and 1,055 deletions.
2 changes: 2 additions & 0 deletions conda/environments/examples_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies:
- pip
- pypdf=3.17.4
- python-confluent-kafka>=1.9.2,<1.10.0a0
- python-docx==1.1.0
- python-graphviz
- python=3.10
- pytorch-cuda
Expand All @@ -65,4 +66,5 @@ dependencies:
- milvus==2.3.5
- nemollm
- pymilvus==2.3.6
- PyMuPDF==1.23.21
name: examples_cuda-121_arch-x86_64
7 changes: 6 additions & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ channels:

dependencies:

build_cpp: # should be split in to `build_python` if/when converting to use scikit-build
build_cpp: # should be split into `build_python` if/when converting to use scikit-build
common:
- output_types: [conda]
packages:
Expand Down Expand Up @@ -291,8 +291,10 @@ dependencies:
- pytest-asyncio
- pytest-benchmark=4.0
- pytest-cov
- python-docx==1.1.0
- pip
- pip:
- PyMuPDF==1.23.21
- pytest-kafka==0.6.0

example-dfp-prod:
Expand Down Expand Up @@ -366,6 +368,9 @@ dependencies:
- *newspaper3k
- *pypdf
- onnx
- pip
- pip:
- PyMuPDF==1.23.21

model-training-tuning:
common:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from dfp.modules import dfp_deployment
from dfp.modules import dfp_inference
from dfp.modules import dfp_inference_pipe
from dfp.modules import dfp_monitor
from dfp.modules import dfp_postprocessing
from dfp.modules import dfp_preproc
from dfp.modules import dfp_rolling_window
Expand All @@ -30,7 +29,6 @@
from dfp.modules import dfp_training_pipe

__all__ = [
"dfp_monitor",
"dfp_split_users",
"dfp_data_prep",
"dfp_inference",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import logging

import mrc
from dfp.utils.module_ids import DFP_MONITOR

from morpheus.modules.general.monitor import MonitorLoaderFactory
from morpheus.utils.module_ids import FILTER_DETECTIONS
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_ids import SERIALIZE
Expand Down Expand Up @@ -164,7 +164,7 @@ def dfp_inference_pipe(builder: mrc.Builder):
# |
# v
# +-------------------------------------+
# | dfp_monitor_module |
# | monitor_module |
# +-------------------------------------+
# |
# v
Expand All @@ -174,7 +174,7 @@ def dfp_inference_pipe(builder: mrc.Builder):
# |
# v
# +-------------------------------------+
# | dfp_monitor_module |
# | monitor_module |
# +-------------------------------------+
# |
# v
Expand All @@ -199,7 +199,7 @@ def dfp_inference_pipe(builder: mrc.Builder):
# |
# v
# +-------------------------------------+
# | dfp_monitor_module |
# | monitor_module |
# +-------------------------------------+
# |
# v
Expand Down Expand Up @@ -282,7 +282,7 @@ def dfp_inference_pipe(builder: mrc.Builder):
write_to_file_conf = merge_dictionaries(write_to_file_options, write_to_file_defaults)

write_to_file_monitor_options = {"description": "Saved [inference_pipe]"}
write_to_file_monitor_module_conf = merge_dictionaries(write_to_file_monitor_options, monitor_options)
write_to_fm_conf = merge_dictionaries(write_to_file_monitor_options, monitor_options)

# Load modules
preproc_module = builder.load_module(DFP_PREPROC, "morpheus", "dfp_preproc", preproc_conf)
Expand All @@ -291,15 +291,16 @@ def dfp_inference_pipe(builder: mrc.Builder):
"dfp_rolling_window",
dfp_rolling_window_conf)
dfp_data_prep_module = builder.load_module(DFP_DATA_PREP, "morpheus", "dfp_data_prep", dfp_data_prep_conf)
dfp_data_prep_monitor_module = builder.load_module(DFP_MONITOR,
"morpheus",
"dfp_inference_data_prep_monitor",
data_prep_monitor_module_conf)

dfp_data_prep_loader = MonitorLoaderFactory.get_instance("dfp_inference_data_prep_monitor",
module_config=data_prep_monitor_module_conf)

dfp_data_prep_monitor_module = dfp_data_prep_loader.load(builder=builder)
dfp_inference_module = builder.load_module(DFP_INFERENCE, "morpheus", "dfp_inference", dfp_inference_conf)
dfp_inference_monitor_module = builder.load_module(DFP_MONITOR,
"morpheus",
"dfp_inference_monitor",
inference_monitor_module_conf)

dfp_inference_monitor_loader = MonitorLoaderFactory.get_instance("dfp_inference_monitor",
module_config=inference_monitor_module_conf)
dfp_inference_monitor_module = dfp_inference_monitor_loader.load(builder=builder)
filter_detections_module = builder.load_module(FILTER_DETECTIONS,
"morpheus",
"filter_detections",
Expand All @@ -310,10 +311,10 @@ def dfp_inference_pipe(builder: mrc.Builder):
dfp_post_proc_conf)
serialize_module = builder.load_module(SERIALIZE, "morpheus", "serialize", serialize_conf)
write_to_file_module = builder.load_module(WRITE_TO_FILE, "morpheus", "write_to_file", write_to_file_conf)
dfp_write_to_file_monitor_module = builder.load_module(DFP_MONITOR,
"morpheus",
"dfp_inference_write_to_file",
write_to_file_monitor_module_conf)

dfp_write_to_file_monitor_loader = MonitorLoaderFactory.get_instance("dfp_inference_write_to_file_monitor",
module_config=write_to_fm_conf)
dfp_write_to_file_monitor_module = dfp_write_to_file_monitor_loader.load(builder=builder)

# Make an edge between the modules.
builder.make_edge(preproc_module.output_port("output"), dfp_rolling_window_module.input_port("input"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import logging

import mrc
from dfp.utils.module_ids import DFP_MONITOR

from morpheus.modules.general.monitor import MonitorLoaderFactory
from morpheus.utils.loader_ids import FILE_TO_DF_LOADER
from morpheus.utils.module_ids import DATA_LOADER
from morpheus.utils.module_ids import FILE_BATCHER
Expand Down Expand Up @@ -73,7 +73,7 @@ def dfp_preproc(builder: mrc.Builder):
# |
# v
# +-------------------------------+
# | dfp_monitor_module |
# | monitor_module |
# +-------------------------------+
# |
# v
Expand All @@ -83,7 +83,7 @@ def dfp_preproc(builder: mrc.Builder):
# |
# v
# +-------------------------------+
# | dfp_monitor_module |
# | monitor_module |
# +-------------------------------+
# |
# v
Expand Down Expand Up @@ -146,15 +146,15 @@ def dfp_preproc(builder: mrc.Builder):
"morpheus",
"dfp_file_to_df_dataloader",
file_to_df_conf)
file_to_df_monitor_module = builder.load_module(DFP_MONITOR,
"morpheus",
"file_to_df_monitor",
file_to_df_monitor_conf)

file_to_df_monitor_loader = MonitorLoaderFactory.get_instance("file_to_df_monitor_loader",
module_config=file_to_df_monitor_conf)
file_to_df_monitor_module = file_to_df_monitor_loader.load(builder=builder)
dfp_split_users_module = builder.load_module(DFP_SPLIT_USERS, "morpheus", "dfp_split_users", dfp_split_users_conf)
dfp_split_users_monitor_module = builder.load_module(DFP_MONITOR,
"morpheus",
"dfp_training_ingested_monitor",
dfp_split_users_monitor_conf)

dfp_split_users_monitor_loader = MonitorLoaderFactory.get_instance("dfp_split_users_monitor_loader",
module_config=dfp_split_users_monitor_conf)
dfp_split_users_monitor_module = dfp_split_users_monitor_loader.load(builder=builder)

# Make an edge between the modules.
builder.make_edge(filter_control_message_module.output_port("output"), file_batcher_module.input_port("input"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import logging

import mrc
from dfp.utils.module_ids import DFP_MONITOR

from morpheus.modules.general.monitor import MonitorLoaderFactory
from morpheus.utils.module_ids import MLFLOW_MODEL_WRITER
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_utils import merge_dictionaries
Expand Down Expand Up @@ -150,7 +150,7 @@ def dfp_training_pipe(builder: mrc.Builder):
# |
# v
# +-------------------------------------+
# | dfp_monitor_module |
# | monitor_module |
# +-------------------------------------+
# |
# v
Expand All @@ -160,7 +160,7 @@ def dfp_training_pipe(builder: mrc.Builder):
# |
# v
# +-------------------------------------+
# | dfp_monitor_module |
# | monitor_module |
# +-------------------------------------+
# |
# v
Expand All @@ -170,7 +170,7 @@ def dfp_training_pipe(builder: mrc.Builder):
# |
# v
# +-------------------------------------+
# | dfp_monitor_module |
# | monitor_module |
# +-------------------------------------+
# |
# v
Expand Down Expand Up @@ -260,23 +260,24 @@ def dfp_training_pipe(builder: mrc.Builder):
"dfp_rolling_window",
dfp_rolling_window_conf)
dfp_data_prep_module = builder.load_module(DFP_DATA_PREP, "morpheus", "dfp_data_prep", dfp_data_prep_conf)
dfp_data_prep_monitor_module = builder.load_module(DFP_MONITOR,
"morpheus",
"dfp_training_data_prep_monitor",
data_prep_monitor_module_conf)
dfp_data_prep_loader = MonitorLoaderFactory.get_instance("data_prep_monitor",
module_config=data_prep_monitor_module_conf)
dfp_data_prep_monitor_module = dfp_data_prep_loader.load(builder=builder)

dfp_training_module = builder.load_module(DFP_TRAINING, "morpheus", "dfp_training", dfp_training_conf)
dfp_training_monitor_module = builder.load_module(DFP_MONITOR,
"morpheus",
"dfp_training_training_monitor",
training_monitor_module_conf)

dfp_training_monitor_loader = MonitorLoaderFactory.get_instance("training_monitor",
module_config=training_monitor_module_conf)
dfp_training_monitor_module = dfp_training_monitor_loader.load(builder=builder)

mlflow_model_writer_module = builder.load_module(MLFLOW_MODEL_WRITER,
"morpheus",
"mlflow_model_writer",
mlflow_model_writer_conf)
mlflow_model_writer_monitor_module = builder.load_module(DFP_MONITOR,
"morpheus",
"dfp_training_mlflow_model_writer_monitor",
mlflow_model_writer_module_conf)

mlflow_model_writer_loader = MonitorLoaderFactory.get_instance("mlflow_model_writer_monitor",
module_config=mlflow_model_writer_module_conf)
mlflow_model_writer_monitor_module = mlflow_model_writer_loader.load(builder=builder)

# Make an edge between the modules.
builder.make_edge(preproc_module.output_port("output"), dfp_rolling_window_module.input_port("input"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@
DFP_INFERENCE_PIPE = "DFPInferencePipe"
DFP_TRAINING_PIPE = "DFPTrainingPipe"
DFP_DEPLOYMENT = "DFPDeployment"
DFP_MONITOR = "DFPMonitor"
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def run_pipeline(source: str,
# | | | | | | | |
# | | v | | v | |
# | | +-------------------------------------+ | | + -------------------------------------+ | |
# | | | dfp_monitor_module | | | | dfp_monitor_module | | |
# | | | monitor_module | | | | monitor_module | | |
# | | +-------------------------------------+ | | + -------------------------------------+ | |
# | | | | | | | |
# | | v | | v | |
Expand All @@ -231,7 +231,7 @@ def run_pipeline(source: str,
# | | | | | | | |
# | | v | | v | |
# | | +-------------------------------------+ | | + -------------------------------------+ | |
# | | | dfp_monitor_module | | | | dfp_monitor_module | | |
# | | | monitor_module | | | | monitor_module | | |
# | | +-------------------------------------+ | | + -------------------------------------+ | |
# | | | | | | | |
# | | v | | v | |
Expand All @@ -241,7 +241,7 @@ def run_pipeline(source: str,
# | | | | | | | |
# | | v | | v | |
# | | +-------------------------------------+ | | + -------------------------------------+ | |
# | | | dfp_monitor_module | | | | dfp_post_proc_module | | |
# | | | monitor_module | | | | dfp_post_proc_module | | |
# | | +-------------------------------------+ | | + -------------------------------------+ | |
# | ------------------------------------------------ | | | |
# | | v | |
Expand All @@ -256,7 +256,7 @@ def run_pipeline(source: str,
# | | | | |
# | | v | |
# | | +-------------------------------------+ | |
# | | | dfp_monitor_module | | |
# | | | monitor_module | | |
# | | +-------------------------------------+ | |
# | ------------------------------------------------ |
# --------------------------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 44825d9

Please sign in to comment.