Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for a CPU-only Mode #1851

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
323 commits
Select commit Hold shift + click to select a range
ded3472
WIP
dagardner-nv Aug 21, 2024
c94ae2e
Limit cudf imports
dagardner-nv Aug 21, 2024
4bc97d7
avoid cudf imports
dagardner-nv Aug 22, 2024
8d5600c
Remove top-level cudf import
dagardner-nv Aug 22, 2024
b3a0d85
Remove top-level cudf import
dagardner-nv Aug 22, 2024
118846c
Add load_cudf_helper to morpheus.common
dagardner-nv Aug 22, 2024
54d4dce
Add a mutex to prevent concurrent loads, although the GIL prevents co…
dagardner-nv Aug 22, 2024
6f802a2
Invoke CudfHelper::load
dagardner-nv Aug 22, 2024
72577b5
Move get_df_class and get_df_pkg to type_utils.py, add new is_cudf_ty…
dagardner-nv Aug 22, 2024
069bc89
WIP
dagardner-nv Aug 22, 2024
ffcbdf8
Add type for df literal
dagardner-nv Aug 22, 2024
090860a
Add new df_type_str_to_pkg helper method
dagardner-nv Aug 22, 2024
434d3af
Type utils cleanups
dagardner-nv Aug 22, 2024
8ac41a4
Add exec_mode_to_df_type_str
dagardner-nv Aug 22, 2024
6e155a9
WIP - avoid cudf imports
dagardner-nv Aug 22, 2024
6fbc2d9
Avoid cudf import, add type hints, cache the suffix df on a per-df-ty…
dagardner-nv Aug 22, 2024
58a0e3b
Avoid top-level cudf import, support pandas, consolidate netmask and …
dagardner-nv Aug 23, 2024
cfbf272
Avoid cudf imports
dagardner-nv Aug 23, 2024
f87c108
Actually check if GPU libs were loaded
dagardner-nv Aug 23, 2024
96ee381
WIP
dagardner-nv Aug 23, 2024
cd655b4
Don't import AutoEncoder as this imports torch, and we only need this…
dagardner-nv Aug 23, 2024
6ec4569
Avoid cudf import
dagardner-nv Aug 23, 2024
3a0695e
Avoid top level cudf imports in modules and controllers imported by m…
dagardner-nv Aug 23, 2024
8586464
Add a helper method to determine if a given object is a dataframe
dagardner-nv Aug 27, 2024
d63940c
Allow MonitorStage and TriggerStage to run in CPU mode
dagardner-nv Aug 27, 2024
360130b
Optionally avoid GPU settings in manual_seed
dagardner-nv Aug 27, 2024
ff45b02
Update to run in CPU only mode:
dagardner-nv Aug 27, 2024
9c71826
Add helper method for returning cupy/numpy
dagardner-nv Aug 27, 2024
0ca0be5
Mark InMemorySinkStage (and thus CompareDataFrameStage and Validation…
dagardner-nv Aug 27, 2024
b89d978
Misc cleanups
dagardner-nv Aug 27, 2024
396ac6e
Misc Cleanups
dagardner-nv Aug 27, 2024
e4ce7e1
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Aug 27, 2024
cbc59c2
Optionally run the container without access to GPUs, useful for testing
dagardner-nv Aug 27, 2024
1457d55
Disable default iteration on the use_cpp fixture, the primary reason …
dagardner-nv Aug 27, 2024
88057d7
Remove the execution_modes argument from the register_stage decorator…
dagardner-nv Aug 27, 2024
27384f0
WIP
dagardner-nv Aug 27, 2024
a7fbaf5
When in doubt default to C++ mode
dagardner-nv Aug 27, 2024
2803295
Mark supporting gpu and cpu
dagardner-nv Aug 27, 2024
cd3c24a
Fix python only test
dagardner-nv Aug 27, 2024
7fbc058
WIP
dagardner-nv Aug 27, 2024
f6b6175
WIP
dagardner-nv Aug 27, 2024
f6f814d
WIP
dagardner-nv Aug 28, 2024
c2a8746
Add return type-hint
dagardner-nv Sep 3, 2024
587d567
wip
dagardner-nv Sep 3, 2024
86a571d
Remove support for inferring execution mode based upon the test name,…
dagardner-nv Sep 3, 2024
525ef2a
Revert change
dagardner-nv Sep 3, 2024
6c75451
Remove mock of open, this broke cudf in a weird way
dagardner-nv Sep 3, 2024
d412c89
Mark GNN Fraud tests as xfail, need to determine what to do about GPU…
dagardner-nv Sep 3, 2024
f5c70fd
Update comment and add a dvlog
dagardner-nv Sep 3, 2024
4b9ba96
Support comparing series
dagardner-nv Sep 3, 2024
39b18f8
Set log level to debug if GLOG_v is defined, add a fixture to ensure …
dagardner-nv Sep 3, 2024
ab8045b
Replace the source file to test with, works-around an issue where pan…
dagardner-nv Sep 3, 2024
c650528
Update python tests to use pandas/numpy
dagardner-nv Sep 3, 2024
92b97bb
Remove tests for removed Python impls
dagardner-nv Sep 3, 2024
c43b95c
Misc cleanups
dagardner-nv Sep 3, 2024
7e7a9f5
Reload type_utils along with config, allowing the enum in config to m…
dagardner-nv Sep 3, 2024
688279a
Remove test that requires reloading the morpheus.config module, this …
dagardner-nv Sep 4, 2024
2a0c5ea
Remove python-mode tests for GPU based tests
dagardner-nv Sep 4, 2024
de78d22
Remove python-mode tests for GPU based tests
dagardner-nv Sep 4, 2024
b44ba66
Remove the use_python marker from tests where it doesn't make sense
dagardner-nv Sep 4, 2024
a3107e1
Rename use_python to cpu_mode and rename use_cpp to gpu_mode
dagardner-nv Sep 4, 2024
f128ae4
Add debug target for debugging unittests
dagardner-nv Sep 4, 2024
6c6dc01
Replace use_cpp and use_python markers with gpu_mode and cpu_mode res…
dagardner-nv Sep 4, 2024
794e52e
Support CPU mode in in-mem
dagardner-nv Sep 4, 2024
2962f0a
Fix filter_probs_df fixture
dagardner-nv Sep 4, 2024
1c6bf90
Mark tests for both CPU & GPU modes
dagardner-nv Sep 4, 2024
58cd456
Mark tests for both CPU & GPU modes
dagardner-nv Sep 4, 2024
0356c9c
Mark test stages as supporting both CPU & GPU modes
dagardner-nv Sep 4, 2024
aa6bea9
Cleanup type-hint
dagardner-nv Sep 4, 2024
b7ff46e
Set execution_mode fixture to autouse this allows tests to use the gp…
dagardner-nv Sep 4, 2024
8cb873d
Ensure that the df_type fixture implies setting the execution mode
dagardner-nv Sep 5, 2024
7f8fedc
Update tests, gpu_mode is a marker not a fixture
dagardner-nv Sep 5, 2024
696f8a9
Update markers to indicate that these tests need to execute in gpu an…
dagardner-nv Sep 5, 2024
b0216c3
Update markers to indicate that these tests need to execute in gpu an…
dagardner-nv Sep 5, 2024
d81a1de
Fix assumption that ConvMsg is GPU-only
dagardner-nv Sep 5, 2024
d0d3d2b
Update docstring
dagardner-nv Sep 5, 2024
9684d74
Remove the gpu_mode fixture
dagardner-nv Sep 5, 2024
85e08a7
Update test
dagardner-nv Sep 5, 2024
362376a
WIP
dagardner-nv Sep 5, 2024
80e3440
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 5, 2024
0ef9ba5
Fix test
dagardner-nv Sep 5, 2024
c064833
Remove unnescesary cpu_mode mark
dagardner-nv Sep 5, 2024
622f962
Remove test_write_to_file_stage.py there is an identical test in test…
dagardner-nv Sep 5, 2024
52282ee
Cleanup usage of cpu_mode mark
dagardner-nv Sep 5, 2024
87a0f88
WIP
dagardner-nv Sep 5, 2024
9760cdc
Mark ransomeware tests as xfail, the pipeline depends on the Appshiel…
dagardner-nv Sep 5, 2024
2f4b33f
Mark the http client and kafka sink stages as supporting both GPU & C…
dagardner-nv Sep 5, 2024
e586501
Switch to using the array_pkg fixture
dagardner-nv Sep 5, 2024
f859ca0
Lint fixes
dagardner-nv Sep 5, 2024
bc6ac3d
Add a few vocabulary entries
dagardner-nv Sep 6, 2024
e898922
WIP
dagardner-nv Sep 6, 2024
16930df
WIP
dagardner-nv Sep 6, 2024
68c5f83
Passthru stage should support GPU & CPU modes
dagardner-nv Sep 6, 2024
4f92ccb
First pass at updating AppShieldSourceStage to emit ControlMessages
dagardner-nv Sep 6, 2024
1bc26a0
update tests
dagardner-nv Sep 6, 2024
0b4b7ba
WIP
dagardner-nv Sep 6, 2024
76c16ce
Set the http port to be the default since we are running in C++ mode
dagardner-nv Sep 9, 2024
a16bc90
Fix spelling error
dagardner-nv Sep 9, 2024
be15217
Remove final decorator as this conflicts with using the preallocator …
dagardner-nv Sep 9, 2024
36b0190
Deprecate AppShieldMessageMeta
dagardner-nv Sep 9, 2024
5bd82f2
Explicitly convert source dataframes to cudf
dagardner-nv Sep 9, 2024
9835e5f
Refactor to use control messages, and limit pandas<->cudf conversions
dagardner-nv Sep 9, 2024
2ad1c99
wip
dagardner-nv Sep 10, 2024
8d2f424
Use PreallocatorMixin in create features since its emitting new dataf…
dagardner-nv Sep 10, 2024
206bc4e
Use C++ mode
dagardner-nv Sep 10, 2024
45291ea
Relocate test and update
dagardner-nv Sep 10, 2024
cf4aa84
Update tests
dagardner-nv Sep 10, 2024
fc9e66b
Config.freeze enforces a compatible value for CppConfig on the first …
dagardner-nv Sep 10, 2024
f6d5513
Ensure the empty config created for boundary stages have a matching e…
dagardner-nv Sep 10, 2024
45dc802
Boundary stages should support CPU mode
dagardner-nv Sep 10, 2024
ca81a8d
Remove setting CppConfig as the Config class does this now
dagardner-nv Sep 10, 2024
89e2add
Add type hints
dagardner-nv Sep 10, 2024
51b4833
First pass at deprecating UserMessageMeta and updating AutoencoderSou…
dagardner-nv Sep 10, 2024
3370623
AppShieldSourceStage should support both cpu and gpu modes
dagardner-nv Sep 10, 2024
8baf123
Cleanup comments
dagardner-nv Sep 10, 2024
8bf6bc4
Update TrainAE to accept incoming ControlMessages
dagardner-nv Sep 10, 2024
58ab86e
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 10, 2024
2949820
typehint cleanups
dagardner-nv Sep 10, 2024
642dcd9
Fix type-o in exception string
dagardner-nv Sep 10, 2024
e3b3f86
Minor type hint cleanups
dagardner-nv Sep 10, 2024
ac8ef0d
Replace abstractmethod decorators with NotImplementedError exceptions…
dagardner-nv Sep 10, 2024
630adf9
Add CPU support
dagardner-nv Sep 10, 2024
7c010ca
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 11, 2024
0853542
WIP
dagardner-nv Sep 11, 2024
7852184
Fix merge error
dagardner-nv Sep 11, 2024
88c08fe
merge cleanups
dagardner-nv Sep 11, 2024
25df4e1
merge cleanups
dagardner-nv Sep 11, 2024
d6beeaf
merge cleanups
dagardner-nv Sep 11, 2024
f12b564
merge cleanups
dagardner-nv Sep 11, 2024
021189f
WIP
dagardner-nv Sep 11, 2024
100e3ae
Fix broken tests
dagardner-nv Sep 11, 2024
5628e48
Fix tests
dagardner-nv Sep 11, 2024
2d3a0d0
Fix broken test
dagardner-nv Sep 11, 2024
6a6f5eb
AE pipelines no longer need Python mode
dagardner-nv Sep 11, 2024
a001440
Enable AE tests
dagardner-nv Sep 11, 2024
aff5dc8
Fix AE tests
dagardner-nv Sep 11, 2024
bf28221
Remove redundant xfail
dagardner-nv Sep 11, 2024
832b8d0
Remove broken test, because it lives inside a directory named 'dfenco…
dagardner-nv Sep 11, 2024
25c8b88
Enable gnn tests - not currently passing
dagardner-nv Sep 11, 2024
b7f0ab6
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 16, 2024
b63bd87
WIP
dagardner-nv Sep 16, 2024
8ed491a
Set pipeline to run in GPU mode
dagardner-nv Sep 16, 2024
6a837af
Fix mode marker
dagardner-nv Sep 16, 2024
364fb62
Remove unused import
dagardner-nv Sep 16, 2024
faab0c5
WIP
dagardner-nv Sep 16, 2024
1a5f027
First pass at replacing DFPMessageMeta with MessageMeta+ControlMessage
dagardner-nv Sep 16, 2024
f18a81e
Set DFP pipelines to run in GPU mode
dagardner-nv Sep 17, 2024
b83f342
fix dfp_training
yczhang-nv Sep 17, 2024
c3b3012
Merge branch 'fix-dfp-example-error' of github.com:yczhang-nv/Morpheu…
dagardner-nv Sep 17, 2024
169639d
Updating DFP tests
dagardner-nv Sep 17, 2024
b0c65e1
Refactor to avoid costly list comprehension
dagardner-nv Sep 17, 2024
7b01c2f
Fix test
dagardner-nv Sep 17, 2024
e8600d5
Fix test
dagardner-nv Sep 17, 2024
511dbdb
Move timezone truncation from the TrainAEStage to AutoencoderSourceStage
dagardner-nv Sep 17, 2024
c7cf687
Fix dfp tests
dagardner-nv Sep 17, 2024
379457e
Remove message_type
dagardner-nv Sep 17, 2024
53cbbef
Lint cleanups
dagardner-nv Sep 17, 2024
56ac198
WIP
dagardner-nv Sep 17, 2024
073cac1
Lint fixes
dagardner-nv Sep 17, 2024
64ab1a7
lint fixes
dagardner-nv Sep 17, 2024
acbd057
Disable the too-many-ancestors check
dagardner-nv Sep 17, 2024
db57b0f
disably cyclic check for control message
dagardner-nv Sep 17, 2024
0730407
Use 'pandas' instead of 'Pandas'
dagardner-nv Sep 18, 2024
547db64
Fix? Can't find 'action.yml' error
dagardner-nv Sep 18, 2024
2d780bb
Fix? Can't find 'action.yml' error
dagardner-nv Sep 18, 2024
e22b01a
Fix? Can't find 'action.yml' error
dagardner-nv Sep 18, 2024
66baad0
Fix? Can't find 'action.yml' error
dagardner-nv Sep 18, 2024
822c2fb
Fix? Can't find 'action.yml' error
dagardner-nv Sep 18, 2024
ba38528
Cleanups
dagardner-nv Sep 18, 2024
90792c6
Add cpu only support for testing the release container
dagardner-nv Sep 18, 2024
e779d76
Fix type-o in variable name, remove warning log
dagardner-nv Sep 18, 2024
c50c717
Warn users regarding the use of the --use_cpp flag
dagardner-nv Sep 18, 2024
c20a956
Change default triton port to the one used by the C++ impl
dagardner-nv Sep 18, 2024
3b8a0d4
Support CPU only mode in WriteToRabbitMQStage, add --use_cpu_only fla…
dagardner-nv Sep 18, 2024
e6d1935
write_simple.py now requires MORPHEUS_ROOT
dagardner-nv Sep 18, 2024
1fb03a4
Update code snippets to match source code changes
dagardner-nv Sep 18, 2024
b6ce583
Update python impls to match those from the 2_2 example
dagardner-nv Sep 18, 2024
d740423
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 18, 2024
6fad250
Convert to pandas prior to invoking the model
dagardner-nv Sep 18, 2024
877ae49
Set columns file to be package relative, fix the probs type
dagardner-nv Sep 18, 2024
23c34c9
Update help output, remove usage of --use_cpp=False, use package rela…
dagardner-nv Sep 18, 2024
2095ebd
Change default triton port as we will only be using the C++ impl
dagardner-nv Sep 18, 2024
c87173d
Fix inferring the dataframe type
dagardner-nv Sep 18, 2024
60b0fb2
Fix inferring the dataframe type
dagardner-nv Sep 18, 2024
a22ccc9
Use the default port of the c++ triton impl
dagardner-nv Sep 18, 2024
d3eec63
Use triton http port as this is used by the C++ impl, fix path to voc…
dagardner-nv Sep 18, 2024
e224ab9
Remove unused imports
dagardner-nv Sep 18, 2024
b47ef47
update read_simple and write_simple scripts
dagardner-nv Sep 18, 2024
f70787b
Mark the LLMEngineStage as supporting CPU & GPU execution modes
dagardner-nv Sep 19, 2024
51ab625
Add type hint
dagardner-nv Sep 19, 2024
5bcd44d
Revert "Add type hint"
dagardner-nv Sep 19, 2024
cf17c9f
Revert "Mark the LLMEngineStage as supporting CPU & GPU execution modes"
dagardner-nv Sep 19, 2024
1f43837
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 19, 2024
3632e80
Add work-around for issue #1891
dagardner-nv Sep 23, 2024
0a9d48c
Expose thread_count as a constructor parameter to TritonInferenceStag…
dagardner-nv Sep 23, 2024
19f432d
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 25, 2024
36f5189
Replace manually invoking CudfHelper::load(), with re-inmplementing C…
dagardner-nv Sep 25, 2024
36f441d
remove unused import
dagardner-nv Sep 25, 2024
8922dcf
Mark --use_cpp flag as deprecated, disallow combining --use_cpp with …
dagardner-nv Sep 25, 2024
20b26db
IWYU fixes
dagardner-nv Sep 26, 2024
a6b75d7
Remove explicit usage of CppConfig
dagardner-nv Sep 26, 2024
9fa5c3c
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 30, 2024
e1c8796
Remove use_cpp arg
dagardner-nv Sep 30, 2024
b7348b1
Default to CPP mode
dagardner-nv Sep 30, 2024
f1879de
Remove setting cppconfig in the grafana pipeline
dagardner-nv Sep 30, 2024
b19469d
Remove setting cppconfig in the viz pipelines
dagardner-nv Sep 30, 2024
38968de
Remove setting CppConfig
dagardner-nv Sep 30, 2024
20e355e
Remove uneeded pip install, use conda with the libmamba solver, remov…
dagardner-nv Sep 30, 2024
ad5f92a
Remove use_cpp
dagardner-nv Sep 30, 2024
d55996a
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Sep 30, 2024
84d6ff1
Merge branch 'david-cpu-only-mode-1846' of github.com:dagardner-nv/Mo…
dagardner-nv Sep 30, 2024
5c2542a
Merge branch 'david-cpu-only-mode-1846' of github.com:dagardner-nv/Mo…
dagardner-nv Sep 30, 2024
12c593f
Merge branch 'david-cpu-only-mode-1846-cpp-config' into david-cpu-onl…
dagardner-nv Sep 30, 2024
1a25a8e
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 4, 2024
c13826b
Fix merge error
dagardner-nv Oct 4, 2024
e6a46a6
Consolidate common code in the docker run scripts
dagardner-nv Oct 7, 2024
5d26520
Cleanup handling of DOCKER_ARGS, remove unused WORKSPACE_VOLUME env var
dagardner-nv Oct 7, 2024
ab49477
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 9, 2024
43c2534
Switch to using a forkserver, docs state that using fork is problemat…
dagardner-nv Oct 10, 2024
dd54ce1
Support CPU & GPU execution mode in MultiProcessingStage
dagardner-nv Oct 10, 2024
55a5b97
Test CPU and GPU execution modes
dagardner-nv Oct 10, 2024
5e50dcd
IWYU fixes
dagardner-nv Oct 10, 2024
60e6d95
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 10, 2024
dd46f71
Remove unused include
dagardner-nv Oct 10, 2024
986dbc4
Specify a cuda build of pytorch, install pytorch via conda rather tha…
dagardner-nv Oct 14, 2024
bec754a
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 14, 2024
27cfba0
Fix the config fixture to always run in gpu mode
dagardner-nv Oct 14, 2024
14e6cdc
Revert "Specify a cuda build of pytorch, install pytorch via conda ra…
dagardner-nv Oct 14, 2024
dca1933
Merge branch 'branch-24.10' into david-cpu-only-mode-1846
dagardner-nv Oct 14, 2024
24ee561
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Oct 17, 2024
7500d37
Fix merge error
dagardner-nv Oct 17, 2024
ac2eae6
Move SupportedTypes to a class property of MonitorController
dagardner-nv Oct 17, 2024
d243108
Rename DataFrameTypeStr to DataFrameModule
dagardner-nv Oct 17, 2024
f7d4bf3
Fix overloads for get_json_reader
dagardner-nv Oct 17, 2024
1b89232
Add property methods to the mixin to wrap common type_utils uses
dagardner-nv Oct 17, 2024
9362562
Formatting fixes
dagardner-nv Oct 17, 2024
fb90067
make the get_df_pkg and get_df_class wrappers methods not properties …
dagardner-nv Oct 17, 2024
075b0fa
Update stages to use GpuAndCpuMixin helpers
dagardner-nv Oct 17, 2024
319595e
Add helper methods for getting a read_csv and read_parquet method
dagardner-nv Oct 17, 2024
ce8ea07
Use get_csv_reader
dagardner-nv Oct 17, 2024
7be0402
Fix imports
dagardner-nv Oct 17, 2024
fa67db6
Rase a ValueError instead of a KeyError to match behavior of the C++ …
dagardner-nv Oct 17, 2024
485764c
Remove inconsistent tensor reshaping issue #1955
dagardner-nv Oct 17, 2024
d903feb
Update ControlMemory tests to test the Python and C++ impls
dagardner-nv Oct 17, 2024
ef466ae
Test should be run in both cpu and gpu modes
dagardner-nv Oct 17, 2024
357a6e2
Remove unused config_only_cpp and config_no_cpp fixtures
dagardner-nv Oct 17, 2024
1998922
lint fixes
dagardner-nv Oct 17, 2024
a0f5e5b
Fix cpu-mode json tests
dagardner-nv Oct 17, 2024
80e817a
Fix misleading runtime erro messafe, remove unused inferring gpu mode…
dagardner-nv Oct 17, 2024
32fb3b8
Ignore code.visualstudio.com, these urls are being forbidden when run…
dagardner-nv Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP - avoid cudf imports
  • Loading branch information
dagardner-nv committed Aug 22, 2024
commit 6e155a99b3633129f9e6298ee1c2a2629cbe5c90
10 changes: 5 additions & 5 deletions python/morpheus/morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
from datetime import timedelta
from urllib.parse import urlparse

import pandas as pd
import requests
import requests_cache

from morpheus.messages import MessageMeta
from morpheus.utils.type_aliases import DataFrameType
from morpheus.utils.type_aliases import DataFrameTypeStr
from morpheus.utils.type_utils import get_df_class

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,7 +142,7 @@ def __init__(self,
self._run_indefinitely = run_indefinitely
self._interval_secs = interval_secs
self._interval_td = timedelta(seconds=self._interval_secs)
self._df_type = df_type
self._df_class: type[DataFrameType] = get_df_class(df_type)

self._enable_cache = enable_cache

Expand Down Expand Up @@ -351,7 +351,7 @@ def fetch_dataframes(self):

Yeilds
------
cudf.DataFrame
DataFrameType
A DataFrame containing feed entry data.

Raises
Expand All @@ -376,14 +376,14 @@ def fetch_dataframes(self):
entry_accumulator.append(entry)

if self._batch_size > 0 and len(entry_accumulator) >= self._batch_size:
yield cudf.DataFrame(entry_accumulator)
yield self._df_class(entry_accumulator)
entry_accumulator.clear()

self._previous_entries = current_entries

# Yield any remaining entries.
if entry_accumulator:
yield cudf.DataFrame(entry_accumulator)
yield self._df_class(entry_accumulator)
else:
logger.debug("No new entries found.")

Expand Down
21 changes: 12 additions & 9 deletions python/morpheus/morpheus/parsers/windows_event_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import os
import typing

import cudf

import morpheus
from morpheus.parsers.event_parser import EventParser
from morpheus.utils.type_aliases import DataFrameType
from morpheus.utils.type_aliases import SeriesType
from morpheus.utils.type_utils import get_df_pkg_from_obj

log = logging.getLogger(__name__)

Expand All @@ -41,17 +42,17 @@ def __init__(self, interested_eventcodes=None):
self._event_regex = self._load_regex_yaml(regex_filepath)
EventParser.__init__(self, self.get_columns(), self.EVENT_NAME)

def parse(self, text: cudf.Series) -> cudf.Series:
def parse(self, text: SeriesType) -> DataFrameType:
"""Parses the Windows raw event.

Parameters
----------
text : cudf.Series
text : SeriesType
Raw event log text to be parsed

Returns
-------
cudf.DataFrame
DataFrameType
Parsed logs dataframe
"""
# Clean raw data to be consistent.
Expand All @@ -65,23 +66,25 @@ def parse(self, text: cudf.Series) -> cudf.Series:
temp = self.parse_raw_event(input_chunk, self._event_regex[eventcode])
if not temp.empty:
output_chunks.append(temp)
parsed_dataframe = cudf.concat(output_chunks)

df_pkg = get_df_pkg_from_obj(text)
parsed_dataframe = df_pkg.concat(output_chunks)
# Replace null values with empty.
parsed_dataframe = parsed_dataframe.fillna("")
return parsed_dataframe

def clean_raw_data(self, text: cudf.Series) -> cudf.Series:
def clean_raw_data(self, text: SeriesType) -> SeriesType:
"""
Lower casing and replacing escape characters.

Parameters
----------
text : cudf.Series
text : SeriesType
Raw event log text to be clean

Returns
-------
cudf.Series
SeriesType
Clean raw event log text
"""
text = (text.str.lower().str.replace("\\\\t", "").str.replace("\\\\r", "").str.replace("\\\\n", "|"))
Expand Down
15 changes: 10 additions & 5 deletions python/morpheus/morpheus/parsers/zeek.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import cudf
from morpheus.utils.type_aliases import DataFrameType
from morpheus.utils.type_aliases import DataFrameTypeStr
from morpheus.utils.type_utils import get_df_pkg

TYPE_DICT = {
"bool": "bool",
Expand All @@ -36,7 +38,7 @@
}


def parse(filepath: str) -> cudf.DataFrame:
def parse(filepath: str, df_type: DataFrameTypeStr = "cudf") -> DataFrameType:
"""
Parse Zeek log file and return cuDF dataframe. Uses header comments to get column names/types
and configure parser.
Expand All @@ -45,20 +47,23 @@ def parse(filepath: str) -> cudf.DataFrame:
----------
filepath : str
File path of Zeek log file
df_type : DataFrameTypeStr, default 'cudf'
Type of dataframe to return. Either 'cudf' or 'pandas'

Returns
-------
cudf.DataFrame
DataFrameType
Parsed Zeek log dataframe
"""
header_gdf = cudf.read_csv(filepath, names=["line"], nrows=8)
df_pkg = get_df_pkg(df_type)
header_gdf = df_pkg.read_csv(filepath, names=["line"], nrows=8)
lines_gdf = header_gdf["line"].str.split()

column_names = lines_gdf.iloc[6][1:]
column_types = lines_gdf.iloc[7][1:]
column_dtypes = list(map(lambda x: TYPE_DICT.get(x, "str"), column_types))

log_gdf = cudf.read_csv(
log_gdf = df_pkg.read_csv(
filepath,
delimiter="\t",
dtype=column_dtypes,
Expand Down
22 changes: 7 additions & 15 deletions python/morpheus/morpheus/service/vdb/faiss_vdb_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
import time
import typing

import pandas as pd

import cudf

from morpheus.service.vdb.vector_db_service import VectorDBResourceService
from morpheus.service.vdb.vector_db_service import VectorDBService
from morpheus.utils.type_aliases import DataFrameType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -81,13 +78,13 @@ def insert(self, data: list[list] | list[dict], **kwargs) -> dict:
"""
raise NotImplementedError("Insert operation is not supported in FAISS")

def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs) -> dict:
def insert_dataframe(self, df: DataFrameType, **kwargs) -> dict:
"""
Insert a dataframe entires into the vector database.

Parameters
----------
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
Dataframe to be inserted into the collection.
**kwargs
Extra keyword arguments specific to the vector database implementation.
Expand Down Expand Up @@ -368,19 +365,15 @@ def create(self, name: str, overwrite: bool = False, **kwargs):
"""
raise NotImplementedError("create operation is not supported in FAISS")

def create_from_dataframe(self,
name: str,
df: typing.Union[cudf.DataFrame, pd.DataFrame],
overwrite: bool = False,
**kwargs) -> None:
def create_from_dataframe(self, name: str, df: DataFrameType, overwrite: bool = False, **kwargs) -> None:
"""
Create collections in the vector database.

Parameters
----------
name : str
Name of the collection.
df : Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
The dataframe to create the collection from.
overwrite : bool, optional
Whether to overwrite the collection if it already exists. Default is False.
Expand Down Expand Up @@ -416,16 +409,15 @@ def insert(self, name: str, data: list[list] | list[dict], **kwargs) -> dict[str

raise NotImplementedError("create_from_dataframe operation is not supported in FAISS")

def insert_dataframe(self, name: str, df: typing.Union[cudf.DataFrame, pd.DataFrame],
**kwargs) -> dict[str, typing.Any]:
def insert_dataframe(self, name: str, df: DataFrameType, **kwargs) -> dict[str, typing.Any]:
"""
Converts dataframe to rows and insert to the vector database.

Parameters
----------
name : str
Name of the collection to be inserted.
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
Dataframe to be inserted in the collection.
**kwargs
Additional keyword arguments containing collection configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
import typing
from functools import wraps

import cudf

from morpheus.io.utils import cudf_string_cols_exceed_max_bytes
from morpheus.io.utils import truncate_string_cols_by_bytes
from morpheus.service.vdb.vector_db_service import VectorDBResourceService
from morpheus.service.vdb.vector_db_service import VectorDBService
from morpheus.utils.type_aliases import DataFrameType
from morpheus.utils.type_utils import is_cudf_type

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -327,7 +326,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -
logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype)

needs_truncate = self._truncate_long_strings
if needs_truncate and isinstance(df, cudf.DataFrame):
if needs_truncate and is_cudf_type(df):
# Cudf specific optimization, we can avoid a costly call to truncate_string_cols_by_bytes if all of the
# string columns are already below the max length
needs_truncate = cudf_string_cols_exceed_max_bytes(df, self._fields_max_length)
Expand All @@ -336,7 +335,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -
column_names = [field.name for field in self._fields if not field.auto_id]

collection_df = df[column_names]
if isinstance(collection_df, cudf.DataFrame):
if is_cudf_type(collection_df):
collection_df = collection_df.to_pandas()

if needs_truncate:
Expand Down Expand Up @@ -728,7 +727,7 @@ def _build_schema_conf(self, df: DataFrameType) -> list[dict]:
# Always add a primary key
fields.append({"name": "pk", "dtype": pymilvus.DataType.INT64, "is_primary": True, "auto_id": True})

if isinstance(df, cudf.DataFrame):
if is_cudf_type(df):
df = df.to_pandas()

# Loop over all of the columns of the first row and build the schema
Expand Down
19 changes: 7 additions & 12 deletions python/morpheus/morpheus/service/vdb/vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
from abc import ABC
from abc import abstractmethod

import pandas as pd

import cudf
from morpheus.utils.type_aliases import DataFrameType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,13 +48,13 @@ def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any])
pass

@abstractmethod
def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict:
def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict:
"""
Insert a dataframe into the vector database.

Parameters
----------
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
Dataframe to be inserted into the resource.
**kwargs : dict[str, typing.Any]
Extra keyword arguments specific to the vector database implementation.
Expand Down Expand Up @@ -241,18 +239,15 @@ def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, t
pass

@abstractmethod
def insert_dataframe(self,
name: str,
df: typing.Union[cudf.DataFrame, pd.DataFrame],
**kwargs: dict[str, typing.Any]) -> dict:
def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict:
"""
Converts dataframe to rows and insert into the vector database resource.

Parameters
----------
name : str
Name of the resource to be inserted.
df : typing.Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
Dataframe to be inserted.
**kwargs : dict[str, typing.Any]
Additional keyword arguments containing collection configuration.
Expand Down Expand Up @@ -391,7 +386,7 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.
@abstractmethod
def create_from_dataframe(self,
name: str,
df: typing.Union[cudf.DataFrame, pd.DataFrame],
df: DataFrameType,
overwrite: bool = False,
**kwargs: dict[str, typing.Any]) -> None:
"""
Expand All @@ -401,7 +396,7 @@ def create_from_dataframe(self,
----------
name : str
Name of the resource.
df : Union[cudf.DataFrame, pd.DataFrame]
df : DataFrameType
The dataframe to create the resource from.
overwrite : bool, optional
Whether to overwrite the resource if it already exists. Default is False.
Expand Down
4 changes: 3 additions & 1 deletion python/morpheus/morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.type_utils import exec_mode_to_df_type_str

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -82,7 +83,8 @@ def __init__(self,
strip_markup=strip_markup,
stop_after=stop_after,
interval_secs=interval_secs,
should_stop_fn=self.is_stop_requested)
should_stop_fn=self.is_stop_requested,
df_type=exec_mode_to_df_type_str(c.execution_mode))

@property
def name(self) -> str:
Expand Down
11 changes: 11 additions & 0 deletions python/morpheus/morpheus/utils/type_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,14 @@ def is_cudf_type(obj: typing.Any) -> bool:
Check if a given object (DataFrame, Series, RangeIndex etc...) is a cuDF type.
"""
return "cudf" in str(type(obj))


def get_df_pkg_from_obj(obj: typing.Any) -> types.ModuleType:
"""
Return the appropriate DataFrame package based on the DataFrame object.
"""
if is_cudf_type(obj):
import cudf
return cudf

return pd
1 change: 1 addition & 0 deletions tests/test_windows_event_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ def test_windows_event_parser():
test_logs = fh.readlines()
test_input = cudf.Series(test_logs)
test_output_df = wep.parse(test_input)

for parsed_rec in test_output_df.to_records():
eventcode = parsed_rec["eventcode"]
validate_func = VALIDATE_DICT.get(eventcode, unknown_record_type)
Expand Down