From beb37693268595a451d7762ebef36fd90804283d Mon Sep 17 00:00:00 2001 From: Robin Holzinger Date: Thu, 20 Jun 2024 21:34:14 +0200 Subject: [PATCH] fix: Adjust evaluation business logic (#535) # Motivation - We want to use the `real_last_timestamp` (start of next training interval - 1 --> marking end of current training interval) only for plotting the boxes in the heatmap plot. For decisions w.r.t. currently active models this doesn't for. E.g. if the next year in a dataset has no data at the year start, our training interval would extend into this next year and therefore it's model won't be considered for the current evaluation interval. - timetriggers should allow starting a statically defined start point (and not with the first sample), otherwise, the whole schedule is off a couple of days. # Note Independently of the bug we fix with the `start_timestamp` in `timetrigger`, this setting allows us to effectively do `pre-training` with the first trigger (e.g. in the continous arxiv dataset we have sparse data from 1988 - ~2005). We could simply start the schedule in year 2005, then the first trigger trains on all previous data. --- modyn/config/schema/pipeline/trigger.py | 7 ++++ modyn/supervisor/internal/eval/handler.py | 13 ++----- .../internal/triggers/timetrigger.py | 19 +++++----- modyn/supervisor/internal/utils/time_tools.py | 18 ++++++++++ .../internal/eval/test_eval_handler.py | 36 +++++++++---------- .../internal/triggers/test_timetrigger.py | 2 +- 6 files changed, 57 insertions(+), 38 deletions(-) create mode 100644 modyn/supervisor/internal/utils/time_tools.py diff --git a/modyn/config/schema/pipeline/trigger.py b/modyn/config/schema/pipeline/trigger.py index ebdec5383..6d251b3f3 100644 --- a/modyn/config/schema/pipeline/trigger.py +++ b/modyn/config/schema/pipeline/trigger.py @@ -13,6 +13,13 @@ class TimeTriggerConfig(ModynBaseModel): description="Interval length for the trigger as an integer followed by a time unit: s, m, h, d, w, y", pattern=rf"^\d+{REGEX_TIME_UNIT}$", ) + start_timestamp: int | None = Field( + None, + description=( + "The timestamp at which the triggering schedule starts. First trigger will be at start_timestamp + every." + "Use None to start at the first timestamp of the data." + ), + ) @cached_property def every_seconds(self) -> int: diff --git a/modyn/supervisor/internal/eval/handler.py b/modyn/supervisor/internal/eval/handler.py index 99982b658..0ecb25cb2 100644 --- a/modyn/supervisor/internal/eval/handler.py +++ b/modyn/supervisor/internal/eval/handler.py @@ -87,17 +87,8 @@ def get_eval_requests_after_pipeline(self, df_trainings: pd.DataFrame) -> list[E ) == set() df_trainings = df_trainings.copy() - # for sparse datasets we want to use next_training_start-1 as training interval end instead of last_timestamp - # as there could be a long gap between the max(sample_time) in one training batch and the min(sample_time) in - # the next training batch. - # e.g. if we want to train for 1.1.2020-31.12.2020 but only have timestamps on 1.1.2020, last_timestamp - # would be 1.1.2020, but the next training would start on 1.1.2021. - df_trainings["real_last_timestamp"] = ( - df_trainings["first_timestamp"].shift(-1, fill_value=df_trainings.iloc[-1]["last_timestamp"] + 1) - 1 - ) - training_intervals: list[tuple[int, int]] = [ - (row["first_timestamp"], row["real_last_timestamp"]) for _, row in df_trainings.iterrows() + (row["first_timestamp"], row["last_timestamp"]) for _, row in df_trainings.iterrows() ] eval_intervals = self.eval_strategy.get_eval_intervals(training_intervals) df_eval_intervals = pd.DataFrame( @@ -114,7 +105,7 @@ def get_eval_requests_after_pipeline(self, df_trainings: pd.DataFrame) -> list[E # Check if a combination is the active. We first compute if model was trained before # the usage starts last_timestamp (df_trainings) defines the end of the training data; # active_model_trained_before (df_eval_intervals): defines center of an eval intervals. - df_cross["active_candidate"] = df_cross["real_last_timestamp"] < df_cross["active_model_trained_before"] + df_cross["active_candidate"] = df_cross["last_timestamp"] < df_cross["active_model_trained_before"] # find the maximum model for every EvalCandidate that doesn't violate that constraint max_model_id = ( diff --git a/modyn/supervisor/internal/triggers/timetrigger.py b/modyn/supervisor/internal/triggers/timetrigger.py index 2aba9e318..a581b5d7b 100644 --- a/modyn/supervisor/internal/triggers/timetrigger.py +++ b/modyn/supervisor/internal/triggers/timetrigger.py @@ -12,20 +12,23 @@ class TimeTrigger(Trigger): Clock starts with the first observed datapoint""" def __init__(self, config: TimeTriggerConfig): - self.trigger_every_s: int = config.every_seconds + self.config = config self.next_trigger_at: int | None = None - if self.trigger_every_s < 1: - raise ValueError(f"trigger_every must be > 0, but is {self.trigger_every_s}") + if self.config.every_seconds < 1: + raise ValueError(f"trigger_every must be > 0, but is {self.config.every_seconds}") super().__init__() def inform(self, new_data: list[tuple[int, int, int]]) -> Generator[int, None, None]: if self.next_trigger_at is None: - if len(new_data) > 0: - self.next_trigger_at = new_data[0][1] + self.trigger_every_s # new_data is sorted + if self.config.start_timestamp is not None: + self.next_trigger_at = self.config.start_timestamp + self.config.every_seconds else: - return + if len(new_data) > 0: + self.next_trigger_at = new_data[0][1] + self.config.every_seconds # new_data is sorted + else: + return max_timestamp = new_data[-1][1] # new_data is sorted triggering_indices = [] @@ -42,9 +45,9 @@ def inform(self, new_data: list[tuple[int, int, int]]) -> Generator[int, None, N # This means that there was a trigger before the first item that we got informed about # However, there might have been multiple triggers, e.g., if there is one trigger every second # and 5 seconds have passed since the last item came through - # This is caught by our while loop which increases step by step for `trigger_every_s`. + # This is caught by our while loop which increases step by step for `config.every_seconds`. triggering_indices.append(idx - 1) - self.next_trigger_at += self.trigger_every_s + self.next_trigger_at += self.config.every_seconds yield from triggering_indices diff --git a/modyn/supervisor/internal/utils/time_tools.py b/modyn/supervisor/internal/utils/time_tools.py new file mode 100644 index 000000000..2518bf56b --- /dev/null +++ b/modyn/supervisor/internal/utils/time_tools.py @@ -0,0 +1,18 @@ +import pandas as pd + + +def generate_real_training_end_timestamp(df_trainings: pd.DataFrame) -> pd.Series: + """ + For sparse datasets we want to use next_training_start-1 as training interval end instead of last_timestamp + as there could be a long gap between the max(sample_time) in one training batch and the min(sample_time) in + the next training batch. + e.g. if we want to train for 1.1.2020-31.12.2020 but only have timestamps on 1.1.2020, last_timestamp + would be 1.1.2020, but the next training would start on 1.1.2021. + + Args: + df_trainings: The pipeline stage execution tracking information including training and model infos. + + Returns: + The real last timestamp series. + """ + return df_trainings["first_timestamp"].shift(-1, fill_value=df_trainings.iloc[-1]["last_timestamp"] + 1) - 1 diff --git a/modyn/tests/supervisor/internal/eval/test_eval_handler.py b/modyn/tests/supervisor/internal/eval/test_eval_handler.py index 117d3bbcf..c1f1663b4 100644 --- a/modyn/tests/supervisor/internal/eval/test_eval_handler.py +++ b/modyn/tests/supervisor/internal/eval/test_eval_handler.py @@ -95,9 +95,9 @@ def test_get_eval_requests_after_pipeline() -> None: # checking the data above if looks like model 3 is the biggest model with # last_timestamp < active_model_trained_before; # however, we use next trainings start - 1 as training interval end. - (22, True, False, 8, 12), - (23, False, True, 8, 12), - (26, False, False, 8, 12), + (22, False, False, 8, 12), + (23, True, False, 8, 12), + (26, False, True, 8, 12), (27, False, False, 8, 12), (28, False, False, 8, 12), # interval 2: for models/triggers 1-8 @@ -111,9 +111,9 @@ def test_get_eval_requests_after_pipeline() -> None: (21, False, False, 23, 27), (22, False, False, 23, 27), (23, False, False, 23, 27), - (26, True, False, 23, 27), - (27, False, True, 23, 27), - (28, False, False, 23, 27), + (26, False, False, 23, 27), + (27, True, False, 23, 27), + (28, False, True, 23, 27), # interval 4: for models/triggers 1-8 (21, False, False, 24, 28), (22, False, False, 24, 28), @@ -181,20 +181,20 @@ def test_between_two_trigger_after_pipeline() -> None: # (model_id, currently_active_model, currently_trained_model, start_interval, end_interval) expected_eval_requests = [ # interval 1: for models/triggers 1-8 - (21, False, True, 0, 5 - 1), - (26, False, False, 0, 5 - 1), - (28, False, False, 0, 5 - 1), - (29, False, False, 0, 5 - 1), + (21, False, True, 0, 0), + (26, False, False, 0, 0), + (28, False, False, 0, 0), + (29, False, False, 0, 0), # interval 2: for models/triggers 1-8 - (21, True, False, 5, 8 - 1), - (26, False, True, 5, 8 - 1), - (28, False, False, 5, 8 - 1), - (29, False, False, 5, 8 - 1), + (21, True, False, 5, 7), + (26, False, True, 5, 7), + (28, False, False, 5, 7), + (29, False, False, 5, 7), # interval 3: for models/triggers 1-8 - (21, False, False, 8, 14 - 1), - (26, True, False, 8, 14 - 1), - (28, False, True, 8, 14 - 1), - (29, False, False, 8, 14 - 1), + (21, False, False, 8, 10), + (26, True, False, 8, 10), + (28, False, True, 8, 10), + (29, False, False, 8, 10), # interval 4: for models/triggers 1-8 (21, False, False, 14, 16), (26, False, False, 14, 16), diff --git a/modyn/tests/supervisor/internal/triggers/test_timetrigger.py b/modyn/tests/supervisor/internal/triggers/test_timetrigger.py index 7e8a398e6..3a98bb341 100644 --- a/modyn/tests/supervisor/internal/triggers/test_timetrigger.py +++ b/modyn/tests/supervisor/internal/triggers/test_timetrigger.py @@ -4,7 +4,7 @@ def test_initialization() -> None: trigger = TimeTrigger(TimeTriggerConfig(every="2s")) - assert trigger.trigger_every_s == 2 + assert trigger.config.every_seconds == 2 assert trigger.next_trigger_at is None