diff --git a/morpheus/stages/postprocess/timeseries_stage.py b/morpheus/stages/postprocess/timeseries_stage.py index d84a7730e2..b724e2d308 100644 --- a/morpheus/stages/postprocess/timeseries_stage.py +++ b/morpheus/stages/postprocess/timeseries_stage.py @@ -28,8 +28,8 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes -from morpheus.messages import MultiResponseAEMessage from morpheus.messages import MultiResponseMessage +from morpheus.messages.multi_ae_message import MultiMessage from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair @@ -174,6 +174,7 @@ class _UserTimeSeries: def __init__(self, user_id: str, + timestamp_col: str, resolution: str, min_window: str, hot_start: bool, @@ -183,6 +184,7 @@ def __init__(self, super().__init__() self._user_id = user_id + self._timestamp_col = timestamp_col # Size of bins self._resolution_sec = int(round(pd.Timedelta(resolution).total_seconds())) @@ -206,7 +208,8 @@ def __init__(self, # Stateful members self._pending_messages: deque[MultiResponseMessage] = deque() # Holds the existing messages pending - self._timeseries_data: pd.DataFrame = pd.DataFrame(columns=["event_dt"]) # Holds all available timeseries data + self._timeseries_data: pd.DataFrame = pd.DataFrame(columns=[self._timestamp_col + ]) # Holds all available timeseries data self._t0_epoch: float = None @@ -268,22 +271,22 @@ def _determine_action(self, is_complete: bool) -> typing.Optional[_TimeSeriesAct x: MultiResponseMessage = self._pending_messages[0] # Get the first message timestamp - message_start = calc_bin(x.get_meta("event_dt").iloc[0], self._t0_epoch, self._resolution_sec) - message_end = calc_bin(x.get_meta("event_dt").iloc[-1], self._t0_epoch, self._resolution_sec) + message_start = calc_bin(x.get_meta(self._timestamp_col).iloc[0], self._t0_epoch, self._resolution_sec) + message_end = calc_bin(x.get_meta(self._timestamp_col).iloc[-1], self._t0_epoch, self._resolution_sec) window_start = message_start - self._half_window_bins window_end = message_end + self._half_window_bins # Check left buffer if (timeseries_start > window_start): - # logger.debug("Warming up. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s", - # timeseries_start._repr_base, - # window_start._repr_base, - # message_start._repr_base, - # message_end._repr_base, - # window_end._repr_base, - # timeseries_end._repr_base, - # timeseries_start - window_start) + logger.debug("Warming up. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s", + timeseries_start, + window_start, + message_start, + message_end, + window_end, + timeseries_end, + timeseries_start - window_start) # Not shutting down and we arent warm, send through if (not self._is_warm and not is_complete): @@ -293,34 +296,35 @@ def _determine_action(self, is_complete: bool) -> typing.Optional[_TimeSeriesAct # Check the right buffer if (timeseries_end < window_end): - # logger.debug("Filling front. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s", - # timeseries_start._repr_base, - # window_start._repr_base, - # message_start._repr_base, - # message_end._repr_base, - # window_end._repr_base, - # timeseries_end._repr_base, - # window_end - timeseries_end) - - if (not is_complete): - # Not shutting down, so hold message + logger.debug("Filling front. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s. Delta: %s", + timeseries_start, + window_start, + message_start, + message_end, + window_end, + timeseries_end, + window_end - timeseries_end) + + if (not is_complete and len(self._pending_messages) == 1): + # Last message, so stop processing + logger.debug("not is_complete, no pending") return None - if (is_complete and self._cold_end): # Shutting down and we have a cold ending, just empty the message + logger.debug("is_complete and self._cold_end") return _TimeSeriesAction(send_message=True, message=self._pending_messages.popleft()) # Shutting down and hot end # logger.debug("Hot End. Processing. TS: %s", timeseries_start._repr_base) # By this point we have both a front and back buffer. So get ready for a calculation - # logger.debug("Perform Calc. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s.", - # timeseries_start._repr_base, - # window_start._repr_base, - # message_start._repr_base, - # message_end._repr_base, - # window_end._repr_base, - # timeseries_end._repr_base) + logger.debug("Perform Calc. TS: %s, WS: %s, MS: %s, ME: %s, WE: %s, TE: %s.", + timeseries_start, + window_start, + message_start, + message_end, + window_end, + timeseries_end) # First, remove elements in the front queue that are too old self._timeseries_data.drop(self._timeseries_data[self._timeseries_data["event_bin"] < window_start].index, @@ -347,7 +351,7 @@ def _calc_timeseries(self, x: MultiResponseMessage, is_complete: bool): # Save this message in the pending queue self._pending_messages.append(x) - new_timedata = x.get_meta(["event_dt"]) + new_timedata = x.get_meta([self._timestamp_col]) # Save this message event times in the event list. Ensure the values are always sorted self._timeseries_data = pd.concat([self._timeseries_data, new_timedata]).sort_index() @@ -363,13 +367,13 @@ def _calc_timeseries(self, x: MultiResponseMessage, is_complete: bool): # If this is our first time data, set the t0 time if (self._t0_epoch is None): - self._t0_epoch = self._timeseries_data["event_dt"].iloc[0] + self._t0_epoch = self._timeseries_data[self._timestamp_col].iloc[0] # TODO(MDD): Floor to the day to unsure all buckets are always aligned with val data self._t0_epoch = self._t0_epoch.floor(freq="D") # Calc the bins for the timeseries data - self._timeseries_data["event_bin"] = self._calc_bin_series(self._timeseries_data["event_dt"]) + self._timeseries_data["event_bin"] = self._calc_bin_series(self._timeseries_data[self._timestamp_col]) # At this point there are 3 things that can happen # 1. We are warming up to build a front buffer. Save the current message times and send the message on @@ -441,6 +445,8 @@ def __init__(self, zscore_threshold: float = 8.0): super().__init__(c) + self._timestamp_col = c.ae.timestamp_column_name + self._feature_length = c.feature_length self._resolution = resolution @@ -470,15 +476,16 @@ def accepted_types(self) -> typing.Tuple: Accepted input types. """ - return (MultiResponseMessage, ) + return (MultiMessage, ) def supports_cpp_node(self): return False - def _call_timeseries_user(self, x: MultiResponseAEMessage): + def _call_timeseries_user(self, x: MultiMessage): if (x.user_id not in self._timeseries_per_user): self._timeseries_per_user[x.user_id] = _UserTimeSeries(user_id=x.user_id, + timestamp_col=self._timestamp_col, resolution=self._resolution, min_window=self._min_window, hot_start=self._hot_start, @@ -493,7 +500,7 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea stream = input_stream[0] out_type = input_stream[1] - def on_next(x: MultiResponseAEMessage): + def on_next(x: MultiMessage): message_list: typing.List[MultiResponseMessage] = self._call_timeseries_user(x) @@ -503,8 +510,8 @@ def on_completed(): to_send = [] - for ts in self._timeseries_per_user.values(): - message_list: typing.List[MultiResponseMessage] = ts._calc_timeseries(None, True) + for timestamp in self._timeseries_per_user.values(): + message_list: typing.List[MultiResponseMessage] = timestamp._calc_timeseries(None, True) to_send = to_send + message_list diff --git a/tests/benchmarks/test_bench_e2e_pipelines.py b/tests/benchmarks/test_bench_e2e_pipelines.py index 36bb63eb83..76637614f8 100644 --- a/tests/benchmarks/test_bench_e2e_pipelines.py +++ b/tests/benchmarks/test_bench_e2e_pipelines.py @@ -35,7 +35,6 @@ from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage from morpheus.stages.postprocess.add_scores_stage import AddScoresStage from morpheus.stages.postprocess.serialize_stage import SerializeStage -from morpheus.stages.postprocess.timeseries_stage import TimeSeriesStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.preprocess_ae_stage import PreprocessAEStage from morpheus.stages.preprocess.preprocess_fil_stage import PreprocessFILStage @@ -112,14 +111,6 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file pipeline.add_stage(PreprocessAEStage(config)) pipeline.add_stage(AutoEncoderInferenceStage(config)) pipeline.add_stage(AddScoresStage(config)) - pipeline.add_stage( - TimeSeriesStage(config, - resolution="1m", - min_window=" 12 h", - hot_start=True, - cold_end=False, - filter_percent=90.0, - zscore_threshold=8.0)) pipeline.add_stage(MonitorStage(config)) pipeline.add_stage(SerializeStage(config)) pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) diff --git a/tests/test_dfp.py b/tests/test_dfp.py index 16d31e7eaf..c602a76fbc 100755 --- a/tests/test_dfp.py +++ b/tests/test_dfp.py @@ -75,6 +75,7 @@ def test_dfp_roleg(mock_ae, config, tmp_path): config.ae = ConfigAutoEncoder() config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" config.ae.userid_filter = "role-g" + config.ae.timestamp_column_name = "event_dt" with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh: config.ae.feature_columns = [x.strip() for x in fh.readlines()] @@ -113,7 +114,8 @@ def test_dfp_roleg(mock_ae, config, tmp_path): results_file_name=results_file_name, index_col="_index_", exclude=("event_dt", "zscore"), - rel_tol=0.15)) + rel_tol=0.1)) + pipe.add_stage(SerializeStage(config, include=[])) pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) @@ -157,6 +159,7 @@ def test_dfp_user123(mock_ae, config, tmp_path): config.ae = ConfigAutoEncoder() config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" config.ae.userid_filter = "user123" + config.ae.timestamp_column_name = "event_dt" with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh: config.ae.feature_columns = [x.strip() for x in fh.readlines()] @@ -238,6 +241,7 @@ def test_dfp_user123_multi_segment(mock_ae, config, tmp_path): config.ae = ConfigAutoEncoder() config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" config.ae.userid_filter = "user123" + config.ae.timestamp_column_name = "event_dt" with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh: config.ae.feature_columns = [x.strip() for x in fh.readlines()] @@ -284,7 +288,7 @@ def test_dfp_user123_multi_segment(mock_ae, config, tmp_path): rel_tol=0.1)) pipe.add_segment_boundary(MultiResponseMessage) # Boundary 7 pipe.add_stage(SerializeStage(config, include=[])) - pipe.add_segment_boundary(MessageMeta) # Boundary 9 + pipe.add_segment_boundary(MessageMeta) # Boundary 8 pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) pipe.run() diff --git a/tests/test_dfp_kafka.py b/tests/test_dfp_kafka.py index 88045df684..ee0dc1f3bc 100755 --- a/tests/test_dfp_kafka.py +++ b/tests/test_dfp_kafka.py @@ -89,6 +89,7 @@ def test_dfp_roleg(mock_ae: mock.MagicMock, config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" config.ae.userid_filter = "role-g" config.ae.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt')) + config.ae.timestamp_column_name = "event_dt" input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") @@ -189,6 +190,7 @@ def test_dfp_user123(mock_ae: mock.MagicMock, config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" config.ae.userid_filter = "user123" config.ae.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt')) + config.ae.timestamp_column_name = "event_dt" input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv")