Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified file source stage #1184

Open
wants to merge 20 commits into
base: branch-23.11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies:
- automake=1.16.5
- benchmark=1.6.1
- boost-cpp=1.74
- boto3
- cachetools=5.0.0
- ccache>=3.7
- clangdev=14
Expand Down Expand Up @@ -91,6 +92,7 @@ dependencies:
- pytorch=2.0.1
- rapidjson=1.1.0
- requests=2.31
- s3fs>=2023.6
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
- scikit-build=0.17.1
- scikit-learn=1.2.2
- sphinx
Expand Down
224 changes: 89 additions & 135 deletions morpheus/stages/input/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,140 +21,126 @@

import fsspec
import mrc
import s3fs
from mrc.core import operators as ops

from morpheus.cli import register_stage
from morpheus.common import FileTypes
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.directory_watcher import DirectoryWatcher

logger = logging.getLogger(__name__)


@register_stage("file-source", modes=[PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER])
@register_stage("file-source")
class FileSource(PreallocatorMixin, SingleOutputSource):
"""
Load messages from a file.

Source stage is used to load messages from a file and dumping the contents into the pipeline immediately. Useful for
testing performance and accuracy of a pipeline.
FileSource is used to produce messages loaded from a file. Useful for testing performance and
accuracy of a pipeline.

Parameters
----------
config : `morpheus.config.Config`
config : morpheus.config.Config
Pipeline configuration instance.
files : List[str]
List of paths to be read from, can be a list of S3 URLs (`s3://path`) and can include wildcard characters `*`
as defined by `fsspec`:
https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files
watch : bool, default = False
When True, will check `files` for new files and emit them as they appear. (Note: `watch_interval` is
applicable when `watch` is True and there are no remote paths in `files`.)
When True, will check `files` for new files and emit them as they appear.
watch_interval : float, default = 1.0
When `watch` is True, this is the time in seconds between polling the paths in `files` for new files.
(Note: Applicable when path in `files` are remote and when `watch` is True)
sort_glob : bool, default = False
If true, the list of files matching `input_glob` will be processed in sorted order.
(Note: Applicable when all paths in `files` are local.)
recursive : bool, default = True
If true, events will be emitted for the files in subdirectories matching `input_glob`.
(Note: Applicable when all paths in `files` are local.)
queue_max_size : int, default = 128
Maximum queue size to hold the file paths to be processed that match `input_glob`.
(Note: Applicable when all paths in `files` are local.)
batch_timeout : float, default = 5.0
Timeout to retrieve batch messages from the queue.
(Note: Applicable when all paths in `files` are local.)
file_type : `morpheus.common.FileTypes`, optional, case_sensitive = False
sort : bool, default = False
If true, the list of files will be processed in sorted order.
file_type : morpheus.common.FileTypes, optional, case_sensitive = False
Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension.
Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'.
repeat : int, default = 1, min = 1
Repeats the input dataset multiple times. Useful to extend small datasets for debugging.
filter_null : bool, default = True
Whether or not to filter rows with a null 'data' column. Null values in the 'data' column can cause issues down
the line with processing. Setting this to True is recommended.
parser_kwargs : dict, default = {}
parser_kwargs : dict, default = None
Extra options to pass to the file parser.
max_files: int, default = -1
Max number of files to read. Useful for debugging to limit startup time. Default value of -1 is unlimited.
"""

def __init__(self,
config: Config,
files: typing.List[str],
watch: bool = False,
watch_interval: float = 1.0,
sort_glob: bool = False,
recursive: bool = True,
queue_max_size: int = 128,
batch_timeout: float = 5.0,
sort: bool = False,
file_type: FileTypes = FileTypes.Auto,
repeat: int = 1,
filter_null: bool = True,
parser_kwargs: dict = None):
parser_kwargs: dict = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Adding this argument makes me uneasy, since it will be difficult to deprecate in the future if necessary.

Question: Is this being added as a new feature, or is this something that existed on any of the other file source implementations?

max_files: int = -1):

super().__init__(config)

if not files:
if not files or len(files) == 0:
raise ValueError("The 'files' cannot be empty.")

if watch and len(files) != 1:
raise ValueError("When 'watch' is True, the 'files' should contain exactly one file path.")

self._files = list(files)
self._protocols = self._extract_unique_protocols()

if len(self._protocols) > 1:
raise ValueError("Accepts same protocol input files, but it received multiple protocols.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: It appears that == 0 and < -1 are invalid values for max_files.

Important: Check that max_files is in a valid range (if you decide to keep -1 as the default, adjust accordingly).

Suggested change
if max_files and max_files <= 0:
raise ValueError(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising an error if self._files is None or []. We will get at least one value in the self._protocols, so i didn't put an extra check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but what about max_files? If max_files == 0 or max_files < -1, then this stage won't produce any files. In that case we should either warn or raise an exception.

Copy link
Contributor Author

@bsuryadevara bsuryadevara Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max_files flag takes effect only when set to a value greater than zero; otherwise, it is treated as continuous polling without any imposed limit. Default value is -1, so I thought raising an error or warn would not needed. Let me know if you still want to add the warning message.

self._watch = watch
self._sort_glob = sort_glob
self._recursive = recursive
self._queue_max_size = queue_max_size
self._batch_timeout = batch_timeout
self._sort = sort
self._file_type = file_type
self._filter_null = filter_null
self._parser_kwargs = parser_kwargs or {}
self._watch_interval = watch_interval
self._repeat_count = repeat
self._max_files = max_files
self._stop_requested = False

@property
def name(self) -> str:
"""Return the name of the stage"""
"""Return the name of the stage."""
return "file-source"

def supports_cpp_node(self) -> bool:
"""Indicates whether or not this stage supports a C++ node"""
"""Indicates whether or not this stage supports a C++ node."""
return False

def _has_remote_paths(self) -> bool:
return any(urlsplit(file).scheme for file in self._files if "://" in file)
def stop(self):
"""Performs cleanup steps when pipeline is stopped."""

# Indicate we need to stop
self._stop_requested = True

return super().stop()

def _extract_unique_protocols(self) -> set:
"""Extracts unique protocols from the given file paths."""
protocols = set()

for file in self._files:
scheme = urlsplit(file).scheme
if scheme:
protocols.add(scheme.lower())
else:
protocols.add("file")

return protocols

def _build_source(self, builder: mrc.Builder) -> StreamPair:

if self._build_cpp_node():
raise RuntimeError("Does not support C++ nodes")

if self._watch and not self._has_remote_paths():
input_glob = self._files[0]
watcher = DirectoryWatcher(
input_glob=input_glob,
watch_directory=self._watch,
max_files=None, # This is not being used in the latest version.
sort_glob=self._sort_glob,
recursive=self._recursive,
queue_max_size=self._queue_max_size,
batch_timeout=self._batch_timeout)

out_stream = watcher.build_node(self.unique_name, builder)
out_type = list[str]
raise RuntimeError("Does not support C++ nodes.")

if self._watch:
generator_function = self._polling_generate_frames_fsspec
else:
if self._watch:
generator_function = self._polling_generate_frames_fsspec
else:
generator_function = self._generate_frames_fsspec
generator_function = self._generate_frames_fsspec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Nice! This looks cleaner now that DirectoryWatcher is gone!


out_stream = builder.make_source(self.unique_name, generator_function())
out_type = fsspec.core.OpenFiles
out_stream = builder.make_source(self.unique_name, generator_function())
out_type = fsspec.core.OpenFiles

# Supposed to just return a source here
return out_stream, out_type
Expand All @@ -165,19 +151,24 @@ def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]:

if (len(files) == 0):
raise RuntimeError(f"No files matched input strings: '{self._files}'. "
"Check your input pattern and ensure any credentials are correct")
"Check your input pattern and ensure any credentials are correct.")

if self._sort_glob:
if self._sort:
files = sorted(files, key=lambda f: f.full_name)

if self._max_files > 0:
files = files[:self._max_files]

yield files

def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]:
files_seen = set()
curr_time = time.monotonic()
next_update_epoch = curr_time
processed_files_count = 0
has_s3_protocol = "s3" in self._protocols

while (True):
while (not self._stop_requested):
# Before doing any work, find the next update epoch after the current time
while (next_update_epoch <= curr_time):
# Only ever add `self._watch_interval` to next_update_epoch so all updates are at repeating intervals
Expand All @@ -186,7 +177,12 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil
file_set = set()
filtered_files = []

# Clear cached instance, otherwise we don't receive newly touched files.
if has_s3_protocol:
s3fs.S3FileSystem.clear_instance_cache()

files = fsspec.open_files(self._files)

for file in files:
file_set.add(file.full_name)
if file.full_name not in files_seen:
Expand All @@ -198,10 +194,17 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil
# need to re-ingest that new file.
files_seen = file_set

if len(filtered_files) > 0:
if self._sort_glob:
filtered_files_count = len(filtered_files)

if filtered_files_count > 0:
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

if self._sort:
filtered_files = sorted(filtered_files, key=lambda f: f.full_name)

if self._max_files > 0:
filtered_files = filtered_files[:self._max_files - processed_files_count]
Copy link
Contributor

@cwharris cwharris Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: If processed_files_count > self._max_files we get filtered_files[:n] where n < 0, meaning we'll take the last n files, which doesn't sound like what we want.

Important: make sure we don't accidentally read from the end of the list of filtered_files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see we won't get a negative number because processed_files_count is calculated based on _max_files. My bad. No change needed.

processed_files_count += len(filtered_files)
cwharris marked this conversation as resolved.
Show resolved Hide resolved

yield fsspec.core.OpenFiles(filtered_files, fs=files.fs)

curr_time = time.monotonic()
Expand All @@ -213,14 +216,14 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil
time.sleep(sleep_duration)
curr_time = time.monotonic()

if self._max_files > 0 and self._max_files <= processed_files_count:
logger.debug("Maximum file limit reached. Exiting polling service...")
self._stop_requested = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Oh, I see. This is how we are stopping the source when we reach the max file limit. This is fine, but in general cancellation tokens are reserved for flagging from outside of the function that checks them. I think we can move this logic up in to the previous if self._max_files > 0 condition and use break or return there rather than flagging the cancellation token. Up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated as suggested. I tried to avoid break and yield (multiple times), which is the reason i choosed this approach.


@staticmethod
def generate_frames(file: fsspec.core.OpenFile,
file_type: FileTypes,
filter_null: bool,
parser_kwargs: dict,
repeat_count: int) -> list[MessageMeta]:
def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, parser_kwargs: dict) -> MessageMeta:
"""
Generate message frames from a file.
Generate message frame from a file.

This function reads data from a file and generates message frames (MessageMeta) based on the file's content.
It can be used to load and process messages from a file for testing and analysis within a Morpheus pipeline.
Expand All @@ -231,85 +234,36 @@ def generate_frames(file: fsspec.core.OpenFile,
An open file object using fsspec.
file_type : FileTypes
Indicates the type of the file to read. Supported types include 'csv', 'json', 'jsonlines', and 'parquet'.
filter_null : bool
Determines whether to filter out rows with null values in the 'data' column. Filtering null values is
recommended to prevent potential issues during processing.
parser_kwargs : dict
Additional keyword arguments to pass to the file parser.
repeat_count : int
The number of times to repeat the data reading process. Each repetition generates a new set of message
frames.

Returns
-------
List[MessageMeta]
MessageMeta objects, each containing a dataframe of messages from the file.
MessageMeta
MessageMeta object, each containing a dataframe of messages from the file.
"""
df = read_file_to_df(
file.full_name,
file_type=file_type,
filter_nulls=filter_null,
filter_nulls=False,
parser_kwargs=parser_kwargs,
df_type="cudf",
)

metas = []

for i in range(repeat_count):
meta = MessageMeta(df)

x = MessageMeta(df)

# If we are looping, copy the object. Do this before we push the object in case it changes
if (i + 1 < repeat_count):
df = df.copy()

# Shift the index to allow for unique indices without reading more data
df.index += len(df)

metas.append(x)

return metas

@staticmethod
def convert_to_fsspec_files(files: typing.Union[list[str], fsspec.core.OpenFiles]) -> fsspec.core.OpenFiles:
"""
Convert a list of file paths to fsspec OpenFiles.

This static method takes a list of file paths or an existing fsspec OpenFiles object and ensures that the
input is converted to an OpenFiles object for uniform handling in Morpheus pipeline stages.

Parameters
----------
files : Union[List[str], fsspec.core.OpenFiles]
A list of file paths or an existing fsspec OpenFiles object.

Returns
-------
fsspec.core.OpenFiles
An fsspec OpenFiles object representing the input files.
"""

# Convert fsspec open files
if not isinstance(files, fsspec.core.OpenFiles):
files = fsspec.open_files(files)

return files
return meta

def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair:

out_stream = out_pair[0]

post_node = builder.make_node(
self.unique_name + "-post",
ops.map(self.convert_to_fsspec_files),
ops.flatten(), # Flatten list of open fsspec files
ops.map(
partial(self.generate_frames,
file_type=self._file_type,
filter_null=self._filter_null,
parser_kwargs=self._parser_kwargs,
repeat_count=self._repeat_count)), # Generate dataframe for each file
ops.flatten())
ops.map(partial(self.generate_frames, file_type=self._file_type,
parser_kwargs=self._parser_kwargs)) # Generate dataframe for each file
)

builder.make_edge(out_stream, post_node)

Expand Down
Loading
Loading