Skip to content

Commit

Permalink
Improve DFP period functionality to allow for better sampling and ign…
Browse files Browse the repository at this point in the history
…oring period (#912)

Currently, the DFP pipeline simulates time by breaking incoming messages up by a specific period and processing each period independently. This makes it impossible to process all of the incoming data at once for batch mode with a single trained model.

This PR adds a few things:

- If `DFPFileBatcherStage.period == None`, then all messages will be processed in a single batch, instead of per period
- Fixes how periods were handled to work with counts
   - Before, `"D"` would work as expected but `"5D"` would not. This was due to using `to_period`
- The `DFPFileBatcherStage.sampling_rate_s` property was deprecated in favor of a more general `sampling` property
   - This property can support different values
      - If its a string, the value is interpreted as a frequency. The first row for each frequency will be taken
      - If its a value between [0,1), its a fraction. A percentage of rows will be taken
      - If its >=1, its a count. A random count of rows will be taken

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Devin Robison (https://github.com/drobison00)

URL: #912
  • Loading branch information
mdemoret-nv authored May 10, 2023
1 parent 31b6748 commit 446f452
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import typing
import warnings
from collections import namedtuple
from datetime import datetime

Expand All @@ -37,17 +38,29 @@ def __init__(self,
c: Config,
date_conversion_func,
period="D",
sampling_rate_s=0,
sampling_rate_s: typing.Optional[int] = None,
start_time: datetime = None,
end_time: datetime = None):
end_time: datetime = None,
sampling: typing.Union[str, float, int, None] = None):
super().__init__(c)

self._date_conversion_func = date_conversion_func
self._sampling_rate_s = sampling_rate_s
self._period = period
self._start_time = start_time
self._end_time = end_time

if (sampling_rate_s is not None and sampling_rate_s > 0):
assert sampling is None, "Cannot set both sampling and sampling_rate_s at the same time"

# Show the deprecation message
warnings.warn(("The `sampling_rate_s` argument has been deprecated. "
"Please use `sampling={sampling_rate_s}S` instead"),
DeprecationWarning)

sampling = f"{sampling_rate_s}S"

self._sampling = sampling

@property
def name(self) -> str:
return "dfp-file-batcher"
Expand All @@ -60,8 +73,11 @@ def accepted_types(self) -> typing.Tuple:

def on_data(self, file_objects: fsspec.core.OpenFiles):

timestamps = []
full_names = []
file_objs = []

# Determine the date of the file, and apply the window filter if we have one
ts_and_files = []
for file_object in file_objects:
ts = self._date_conversion_func(file_object)

Expand All @@ -70,60 +86,52 @@ def on_data(self, file_objects: fsspec.core.OpenFiles):
or (self._end_time is not None and ts > self._end_time)):
continue

ts_and_files.append(TimestampFileObj(ts, file_object))

# sort the incoming data by date
ts_and_files.sort(key=lambda x: x.timestamp)
timestamps.append(ts)
full_names.append(file_object.full_name)
file_objs.append(file_object)

# Create a dataframe with the incoming metadata
if ((len(ts_and_files) > 1) and (self._sampling_rate_s > 0)):
file_sampled_list = []
# Build the dataframe
df = pd.DataFrame(index=pd.DatetimeIndex(timestamps), data={"filename": full_names, "objects": file_objects})

ts_last = ts_and_files[0].timestamp
# sort the incoming data by date
df.sort_index(inplace=True)

file_sampled_list.append(ts_and_files[0])
# If sampling was provided, perform that here
if (self._sampling is not None):

for idx in range(1, len(ts_and_files)):
ts = ts_and_files[idx].timestamp
if (isinstance(self._sampling, str)):
# We have a frequency for sampling. Resample by the frequency, taking the first
df = df.resample(self._sampling).first().dropna()

if ((ts - ts_last).seconds >= self._sampling_rate_s):
elif (self._sampling < 1.0):
# Sample a fraction of the rows
df = df.sample(frac=self._sampling).sort_index()

ts_and_files.append(ts_and_files[idx])
ts_last = ts
else:
ts_and_files = file_sampled_list
# Sample a fixed amount
df = df.sample(n=self._sampling).sort_index()

df = pd.DataFrame()
# Early exit if no files were found
if (len(df) == 0):
return []

timestamps = []
full_names = []
file_objs = []
for (ts, file_object) in ts_and_files:
timestamps.append(ts)
full_names.append(file_object.full_name)
file_objs.append(file_object)
if (self._period is None):
# No period was set so group them all into one single batch
return [(fsspec.core.OpenFiles(df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs),
len(df))]

df["dfp_timestamp"] = timestamps
df["key"] = full_names
df["objects"] = file_objs
# Now group the rows by the period
resampled = df.resample(self._period)

output_batches = []
n_groups = len(resampled)

if len(df) > 0:
# Now split by the batching settings
df_period = df["dfp_timestamp"].dt.to_period(self._period)

period_gb = df.groupby(df_period)
output_batches = []

n_groups = len(period_gb)
for group in period_gb.groups:
period_df = period_gb.get_group(group)
for _, period_df in resampled:

obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(),
mode=file_objects.mode,
fs=file_objects.fs)
obj_list = fsspec.core.OpenFiles(period_df["objects"].to_list(), mode=file_objects.mode, fs=file_objects.fs)

output_batches.append((obj_list, n_groups))
output_batches.append((obj_list, n_groups))

return output_batches

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
type=int,
default=0,
show_envvar=True,
help="Minimum time step, in milliseconds, between object logs.")
help="Samples the input data files allowing only one file per bin defined by `sample_rate_s`.")
@click.option(
"--input_file",
"-f",
Expand Down Expand Up @@ -248,8 +248,8 @@ def run_pipeline(train_users,
# Batch files into buckets by time. Use the default ISO date extractor from the filename
pipeline.add_stage(
DFPFileBatcherStage(config,
period="D",
sampling_rate_s=sample_rate_s,
period=None,
sampling=f"{sample_rate_s}S" if sample_rate_s > 0 else None,
date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex),
start_time=start_time,
end_time=end_time))
Expand Down

0 comments on commit 446f452

Please sign in to comment.