Skip to content

Commit

Permalink
Update TimeSeries stage to also work with Production DFP (nv-morpheus…
Browse files Browse the repository at this point in the history
…#1121)

- Update TimeSeries stage to also work with Production DFP pipeline instead of Starter DFP. Now accepts `MultiMessage` instead of `MultiResponseMessage`. Remove use of hard-coded `event_dt` and replace with `config.ae.timestamp_column_name`.
- Tested updated TimeSeries stage with Duo and Azure production DFP pipelines.
- As part of a Starter CloudTrail DFP pipeline, TimeSeries stage runs into issues when processing multiple batches, i.e. when `repeat` option in `CloudTrailSourceStage` is increased (increase input data size) or pipeline batch size is decreased. Pipeline either errors out or not all data is processed. Remove TimeSeries stage from AutoEncoder E2E benchmark for now and will create separate issue for this.

Fixes nv-morpheus#1107

Authors:
  - Eli Fajardo (https://github.com/efajardo-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#1121
  • Loading branch information
efajardo-nv authored Aug 30, 2023
1 parent 03e8abf commit 4d4a7ee
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 50 deletions.
85 changes: 46 additions & 39 deletions morpheus/stages/postprocess/timeseries_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import MultiResponseAEMessage
from morpheus.messages import MultiResponseMessage
from morpheus.messages.multi_ae_message import MultiMessage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

Expand Down Expand Up @@ -174,6 +174,7 @@ class _UserTimeSeries:

def __init__(self,
user_id: str,
timestamp_col: str,
resolution: str,
min_window: str,
hot_start: bool,
Expand All @@ -183,6 +184,7 @@ def __init__(self,
super().__init__()

self._user_id = user_id
self._timestamp_col = timestamp_col

# Size of bins
self._resolution_sec = int(round(pd.Timedelta(resolution).total_seconds()))
Expand All @@ -206,7 +208,8 @@ def __init__(self,

# Stateful members
self._pending_messages: deque[MultiResponseMessage] = deque() # Holds the existing messages pending
self._timeseries_data: pd.DataFrame = pd.DataFrame(columns=["event_dt"]) # Holds all available timeseries data
self._timeseries_data: pd.DataFrame = pd.DataFrame(columns=[self._timestamp_col
]) # Holds all available timeseries data

self._t0_epoch: float = None

Expand Down Expand Up @@ -268,22 +271,22 @@ def _determine_action(self, is_complete: bool) -> typing.Optional[_TimeSeriesAct
x: MultiResponseMessage = self._pending_messages[0]

# Get the first message timestamp
message_start = calc_bin(x.get_meta("event_dt").iloc[0], self._t0_epoch, self._resolution_sec)
message_end = calc_bin(x.get_meta("event_dt").iloc[-1], self._t0_epoch, self._resolution_sec)
message_start = calc_bin(x.get_meta(self._timestamp_col).iloc[0], self._t0_epoch, self._resolution_sec)
message_end = calc_bin(x.get_meta(self._timestamp_col).iloc[-1], self._t0_epoch, self._resolution_sec)

window_start = message_start - self._half_window_bins
window_end = message_end + self._half_window_bins

# Check left buffer
if (timeseries_start > window_start):
# logger.debug("Warming up. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s",
# timeseries_start._repr_base,
# window_start._repr_base,
# message_start._repr_base,
# message_end._repr_base,
# window_end._repr_base,
# timeseries_end._repr_base,
# timeseries_start - window_start)
logger.debug("Warming up. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s",
timeseries_start,
window_start,
message_start,
message_end,
window_end,
timeseries_end,
timeseries_start - window_start)

# Not shutting down and we arent warm, send through
if (not self._is_warm and not is_complete):
Expand All @@ -293,34 +296,35 @@ def _determine_action(self, is_complete: bool) -> typing.Optional[_TimeSeriesAct

# Check the right buffer
if (timeseries_end < window_end):
# logger.debug("Filling front. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s",
# timeseries_start._repr_base,
# window_start._repr_base,
# message_start._repr_base,
# message_end._repr_base,
# window_end._repr_base,
# timeseries_end._repr_base,
# window_end - timeseries_end)

if (not is_complete):
# Not shutting down, so hold message
logger.debug("Filling front. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s",
timeseries_start,
window_start,
message_start,
message_end,
window_end,
timeseries_end,
window_end - timeseries_end)

if (not is_complete and len(self._pending_messages) == 1):
# Last message, so stop processing
logger.debug("not is_complete, no pending")
return None

if (is_complete and self._cold_end):
# Shutting down and we have a cold ending, just empty the message
logger.debug("is_complete and self._cold_end")
return _TimeSeriesAction(send_message=True, message=self._pending_messages.popleft())

# Shutting down and hot end
# logger.debug("Hot End. Processing. TS: %s", timeseries_start._repr_base)

# By this point we have both a front and back buffer. So get ready for a calculation
# logger.debug("Perform Calc. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s.",
# timeseries_start._repr_base,
# window_start._repr_base,
# message_start._repr_base,
# message_end._repr_base,
# window_end._repr_base,
# timeseries_end._repr_base)
logger.debug("Perform Calc. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s.",
timeseries_start,
window_start,
message_start,
message_end,
window_end,
timeseries_end)

# First, remove elements in the front queue that are too old
self._timeseries_data.drop(self._timeseries_data[self._timeseries_data["event_bin"] < window_start].index,
Expand All @@ -347,7 +351,7 @@ def _calc_timeseries(self, x: MultiResponseMessage, is_complete: bool):
# Save this message in the pending queue
self._pending_messages.append(x)

new_timedata = x.get_meta(["event_dt"])
new_timedata = x.get_meta([self._timestamp_col])

# Save this message event times in the event list. Ensure the values are always sorted
self._timeseries_data = pd.concat([self._timeseries_data, new_timedata]).sort_index()
Expand All @@ -363,13 +367,13 @@ def _calc_timeseries(self, x: MultiResponseMessage, is_complete: bool):

# If this is our first time data, set the t0 time
if (self._t0_epoch is None):
self._t0_epoch = self._timeseries_data["event_dt"].iloc[0]
self._t0_epoch = self._timeseries_data[self._timestamp_col].iloc[0]

# TODO(MDD): Floor to the day to unsure all buckets are always aligned with val data
self._t0_epoch = self._t0_epoch.floor(freq="D")

# Calc the bins for the timeseries data
self._timeseries_data["event_bin"] = self._calc_bin_series(self._timeseries_data["event_dt"])
self._timeseries_data["event_bin"] = self._calc_bin_series(self._timeseries_data[self._timestamp_col])

# At this point there are 3 things that can happen
# 1. We are warming up to build a front buffer. Save the current message times and send the message on
Expand Down Expand Up @@ -441,6 +445,8 @@ def __init__(self,
zscore_threshold: float = 8.0):
super().__init__(c)

self._timestamp_col = c.ae.timestamp_column_name

self._feature_length = c.feature_length

self._resolution = resolution
Expand Down Expand Up @@ -470,15 +476,16 @@ def accepted_types(self) -> typing.Tuple:
Accepted input types.
"""
return (MultiResponseMessage, )
return (MultiMessage, )

def supports_cpp_node(self):
return False

def _call_timeseries_user(self, x: MultiResponseAEMessage):
def _call_timeseries_user(self, x: MultiMessage):

if (x.user_id not in self._timeseries_per_user):
self._timeseries_per_user[x.user_id] = _UserTimeSeries(user_id=x.user_id,
timestamp_col=self._timestamp_col,
resolution=self._resolution,
min_window=self._min_window,
hot_start=self._hot_start,
Expand All @@ -493,7 +500,7 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea
stream = input_stream[0]
out_type = input_stream[1]

def on_next(x: MultiResponseAEMessage):
def on_next(x: MultiMessage):

message_list: typing.List[MultiResponseMessage] = self._call_timeseries_user(x)

Expand All @@ -503,8 +510,8 @@ def on_completed():

to_send = []

for ts in self._timeseries_per_user.values():
message_list: typing.List[MultiResponseMessage] = ts._calc_timeseries(None, True)
for timestamp in self._timeseries_per_user.values():
message_list: typing.List[MultiResponseMessage] = timestamp._calc_timeseries(None, True)

to_send = to_send + message_list

Expand Down
9 changes: 0 additions & 9 deletions tests/benchmarks/test_bench_e2e_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage
from morpheus.stages.postprocess.add_scores_stage import AddScoresStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.postprocess.timeseries_stage import TimeSeriesStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_ae_stage import PreprocessAEStage
from morpheus.stages.preprocess.preprocess_fil_stage import PreprocessFILStage
Expand Down Expand Up @@ -112,14 +111,6 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file
pipeline.add_stage(PreprocessAEStage(config))
pipeline.add_stage(AutoEncoderInferenceStage(config))
pipeline.add_stage(AddScoresStage(config))
pipeline.add_stage(
TimeSeriesStage(config,
resolution="1m",
min_window=" 12 h",
hot_start=True,
cold_end=False,
filter_percent=90.0,
zscore_threshold=8.0))
pipeline.add_stage(MonitorStage(config))
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))
Expand Down
8 changes: 6 additions & 2 deletions tests/test_dfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def test_dfp_roleg(mock_ae, config, tmp_path):
config.ae = ConfigAutoEncoder()
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "role-g"
config.ae.timestamp_column_name = "event_dt"

with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh:
config.ae.feature_columns = [x.strip() for x in fh.readlines()]
Expand Down Expand Up @@ -113,7 +114,8 @@ def test_dfp_roleg(mock_ae, config, tmp_path):
results_file_name=results_file_name,
index_col="_index_",
exclude=("event_dt", "zscore"),
rel_tol=0.15))
rel_tol=0.1))

pipe.add_stage(SerializeStage(config, include=[]))
pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False))

Expand Down Expand Up @@ -157,6 +159,7 @@ def test_dfp_user123(mock_ae, config, tmp_path):
config.ae = ConfigAutoEncoder()
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "user123"
config.ae.timestamp_column_name = "event_dt"

with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh:
config.ae.feature_columns = [x.strip() for x in fh.readlines()]
Expand Down Expand Up @@ -238,6 +241,7 @@ def test_dfp_user123_multi_segment(mock_ae, config, tmp_path):
config.ae = ConfigAutoEncoder()
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "user123"
config.ae.timestamp_column_name = "event_dt"

with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh:
config.ae.feature_columns = [x.strip() for x in fh.readlines()]
Expand Down Expand Up @@ -284,7 +288,7 @@ def test_dfp_user123_multi_segment(mock_ae, config, tmp_path):
rel_tol=0.1))
pipe.add_segment_boundary(MultiResponseMessage) # Boundary 7
pipe.add_stage(SerializeStage(config, include=[]))
pipe.add_segment_boundary(MessageMeta) # Boundary 9
pipe.add_segment_boundary(MessageMeta) # Boundary 8
pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False))

pipe.run()
Expand Down
2 changes: 2 additions & 0 deletions tests/test_dfp_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def test_dfp_roleg(mock_ae: mock.MagicMock,
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "role-g"
config.ae.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'))
config.ae.timestamp_column_name = "event_dt"

input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")
train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")
Expand Down Expand Up @@ -189,6 +190,7 @@ def test_dfp_user123(mock_ae: mock.MagicMock,
config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName"
config.ae.userid_filter = "user123"
config.ae.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'))
config.ae.timestamp_column_name = "event_dt"

input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")
train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")
Expand Down

0 comments on commit 4d4a7ee

Please sign in to comment.