Skip to content

Commit

Permalink
Fix dask error in DFP Integrated training pipeline (#1931)
Browse files Browse the repository at this point in the history
* `FileToDFController` now exposes a `download_method` constructor argument allowing the caller control over the `Downloader` class's download method.
* Since the `file_to_df_loader` module creates and closes a `FileToDFController` instance on a per-call basis, set `download_method=SINGLE_THREAD`. Fixes an issue where dask was being shutdown early after the first message.

Closes [#1916](#1916)

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1931
  • Loading branch information
dagardner-nv authored Oct 16, 2024
1 parent 642e994 commit c7c8b4a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
9 changes: 7 additions & 2 deletions python/morpheus/morpheus/controllers/file_to_df_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from morpheus.utils.column_info import PreparedDFInfo
from morpheus.utils.column_info import process_dataframe
from morpheus.utils.downloader import Downloader
from morpheus.utils.downloader import DownloadMethods

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,6 +106,9 @@ class FileToDFController:
Directory where cache will be stored.
timestamp_column_name : str
Name of the timestamp column.
download_method : typing.Union[DownloadMethods, str], optional, default = DownloadMethods.DASK_THREAD
The download method to use, if the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable is set, it takes
presedence.
"""

def __init__(self,
Expand All @@ -113,7 +117,8 @@ def __init__(self,
file_type: FileTypes,
parser_kwargs: dict,
cache_dir: str,
timestamp_column_name: str):
timestamp_column_name: str,
download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD):

self._schema = schema
self._file_type = file_type
Expand All @@ -122,7 +127,7 @@ def __init__(self,
self._cache_dir = os.path.join(cache_dir, "file_cache")
self._timestamp_column_name = timestamp_column_name

self._downloader = Downloader()
self._downloader = Downloader(download_method=download_method)

def _get_or_create_dataframe_from_batch(
self, file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> typing.Tuple[cudf.DataFrame, bool]:
Expand Down
4 changes: 3 additions & 1 deletion python/morpheus/morpheus/loaders/file_to_df_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from morpheus.controllers.file_to_df_controller import FileToDFController
from morpheus.messages import ControlMessage
from morpheus.messages.message_meta import MessageMeta
from morpheus.utils.downloader import DownloadMethods
from morpheus.utils.loader_ids import FILE_TO_DF_LOADER
from morpheus.utils.loader_utils import register_loader

Expand Down Expand Up @@ -97,7 +98,8 @@ def file_to_df_loader(control_message: ControlMessage, task: dict):
file_type=file_type,
parser_kwargs=parser_kwargs,
cache_dir=cache_dir,
timestamp_column_name=timestamp_column_name)
timestamp_column_name=timestamp_column_name,
download_method=DownloadMethods.SINGLE_THREAD)
pdf = controller.convert_to_dataframe(file_object_batch=(fsspec.open_files(files), n_groups))
df = cudf.from_pandas(pdf)

Expand Down

0 comments on commit c7c8b4a

Please sign in to comment.