Skip to content

Commit

Permalink
fix: Adjust evaluation business logic (#535)
Browse files Browse the repository at this point in the history
# Motivation

- We want to use the `real_last_timestamp` (start of next training
interval - 1 --> marking end of current training interval) only for
plotting the boxes in the heatmap plot. For decisions w.r.t. currently
active models this doesn't for. E.g. if the next year in a dataset has
no data at the year start, our training interval would extend into this
next year and therefore it's model won't be considered for the current
evaluation interval.
- timetriggers should allow starting a statically defined start point
(and not with the first sample), otherwise, the whole schedule is off a
couple of days.

# Note

Independently of the bug we fix with the `start_timestamp` in
`timetrigger`, this setting allows us to effectively do `pre-training`
with the first trigger (e.g. in the continous arxiv dataset we have
sparse data from 1988 - ~2005). We could simply start the schedule in
year 2005, then the first trigger trains on all previous data.
  • Loading branch information
robinholzi authored Jun 20, 2024
1 parent c0b0ae8 commit beb3769
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 38 deletions.
7 changes: 7 additions & 0 deletions modyn/config/schema/pipeline/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ class TimeTriggerConfig(ModynBaseModel):
description="Interval length for the trigger as an integer followed by a time unit: s, m, h, d, w, y",
pattern=rf"^\d+{REGEX_TIME_UNIT}$",
)
start_timestamp: int | None = Field(
None,
description=(
"The timestamp at which the triggering schedule starts. First trigger will be at start_timestamp + every."
"Use None to start at the first timestamp of the data."
),
)

@cached_property
def every_seconds(self) -> int:
Expand Down
13 changes: 2 additions & 11 deletions modyn/supervisor/internal/eval/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,8 @@ def get_eval_requests_after_pipeline(self, df_trainings: pd.DataFrame) -> list[E
) == set()
df_trainings = df_trainings.copy()

# for sparse datasets we want to use next_training_start-1 as training interval end instead of last_timestamp
# as there could be a long gap between the max(sample_time) in one training batch and the min(sample_time) in
# the next training batch.
# e.g. if we want to train for 1.1.2020-31.12.2020 but only have timestamps on 1.1.2020, last_timestamp
# would be 1.1.2020, but the next training would start on 1.1.2021.
df_trainings["real_last_timestamp"] = (
df_trainings["first_timestamp"].shift(-1, fill_value=df_trainings.iloc[-1]["last_timestamp"] + 1) - 1
)

training_intervals: list[tuple[int, int]] = [
(row["first_timestamp"], row["real_last_timestamp"]) for _, row in df_trainings.iterrows()
(row["first_timestamp"], row["last_timestamp"]) for _, row in df_trainings.iterrows()
]
eval_intervals = self.eval_strategy.get_eval_intervals(training_intervals)
df_eval_intervals = pd.DataFrame(
Expand All @@ -114,7 +105,7 @@ def get_eval_requests_after_pipeline(self, df_trainings: pd.DataFrame) -> list[E
# Check if a combination is the active. We first compute if model was trained before
# the usage starts last_timestamp (df_trainings) defines the end of the training data;
# active_model_trained_before (df_eval_intervals): defines center of an eval intervals.
df_cross["active_candidate"] = df_cross["real_last_timestamp"] < df_cross["active_model_trained_before"]
df_cross["active_candidate"] = df_cross["last_timestamp"] < df_cross["active_model_trained_before"]

# find the maximum model for every EvalCandidate that doesn't violate that constraint
max_model_id = (
Expand Down
19 changes: 11 additions & 8 deletions modyn/supervisor/internal/triggers/timetrigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,23 @@ class TimeTrigger(Trigger):
Clock starts with the first observed datapoint"""

def __init__(self, config: TimeTriggerConfig):
self.trigger_every_s: int = config.every_seconds
self.config = config
self.next_trigger_at: int | None = None

if self.trigger_every_s < 1:
raise ValueError(f"trigger_every must be > 0, but is {self.trigger_every_s}")
if self.config.every_seconds < 1:
raise ValueError(f"trigger_every must be > 0, but is {self.config.every_seconds}")

super().__init__()

def inform(self, new_data: list[tuple[int, int, int]]) -> Generator[int, None, None]:
if self.next_trigger_at is None:
if len(new_data) > 0:
self.next_trigger_at = new_data[0][1] + self.trigger_every_s # new_data is sorted
if self.config.start_timestamp is not None:
self.next_trigger_at = self.config.start_timestamp + self.config.every_seconds
else:
return
if len(new_data) > 0:
self.next_trigger_at = new_data[0][1] + self.config.every_seconds # new_data is sorted
else:
return

max_timestamp = new_data[-1][1] # new_data is sorted
triggering_indices = []
Expand All @@ -42,9 +45,9 @@ def inform(self, new_data: list[tuple[int, int, int]]) -> Generator[int, None, N
# This means that there was a trigger before the first item that we got informed about
# However, there might have been multiple triggers, e.g., if there is one trigger every second
# and 5 seconds have passed since the last item came through
# This is caught by our while loop which increases step by step for `trigger_every_s`.
# This is caught by our while loop which increases step by step for `config.every_seconds`.

triggering_indices.append(idx - 1)
self.next_trigger_at += self.trigger_every_s
self.next_trigger_at += self.config.every_seconds

yield from triggering_indices
18 changes: 18 additions & 0 deletions modyn/supervisor/internal/utils/time_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pandas as pd


def generate_real_training_end_timestamp(df_trainings: pd.DataFrame) -> pd.Series:
"""
For sparse datasets we want to use next_training_start-1 as training interval end instead of last_timestamp
as there could be a long gap between the max(sample_time) in one training batch and the min(sample_time) in
the next training batch.
e.g. if we want to train for 1.1.2020-31.12.2020 but only have timestamps on 1.1.2020, last_timestamp
would be 1.1.2020, but the next training would start on 1.1.2021.
Args:
df_trainings: The pipeline stage execution tracking information including training and model infos.
Returns:
The real last timestamp series.
"""
return df_trainings["first_timestamp"].shift(-1, fill_value=df_trainings.iloc[-1]["last_timestamp"] + 1) - 1
36 changes: 18 additions & 18 deletions modyn/tests/supervisor/internal/eval/test_eval_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def test_get_eval_requests_after_pipeline() -> None:
# checking the data above if looks like model 3 is the biggest model with
# last_timestamp < active_model_trained_before;
# however, we use next trainings start - 1 as training interval end.
(22, True, False, 8, 12),
(23, False, True, 8, 12),
(26, False, False, 8, 12),
(22, False, False, 8, 12),
(23, True, False, 8, 12),
(26, False, True, 8, 12),
(27, False, False, 8, 12),
(28, False, False, 8, 12),
# interval 2: for models/triggers 1-8
Expand All @@ -111,9 +111,9 @@ def test_get_eval_requests_after_pipeline() -> None:
(21, False, False, 23, 27),
(22, False, False, 23, 27),
(23, False, False, 23, 27),
(26, True, False, 23, 27),
(27, False, True, 23, 27),
(28, False, False, 23, 27),
(26, False, False, 23, 27),
(27, True, False, 23, 27),
(28, False, True, 23, 27),
# interval 4: for models/triggers 1-8
(21, False, False, 24, 28),
(22, False, False, 24, 28),
Expand Down Expand Up @@ -181,20 +181,20 @@ def test_between_two_trigger_after_pipeline() -> None:
# (model_id, currently_active_model, currently_trained_model, start_interval, end_interval)
expected_eval_requests = [
# interval 1: for models/triggers 1-8
(21, False, True, 0, 5 - 1),
(26, False, False, 0, 5 - 1),
(28, False, False, 0, 5 - 1),
(29, False, False, 0, 5 - 1),
(21, False, True, 0, 0),
(26, False, False, 0, 0),
(28, False, False, 0, 0),
(29, False, False, 0, 0),
# interval 2: for models/triggers 1-8
(21, True, False, 5, 8 - 1),
(26, False, True, 5, 8 - 1),
(28, False, False, 5, 8 - 1),
(29, False, False, 5, 8 - 1),
(21, True, False, 5, 7),
(26, False, True, 5, 7),
(28, False, False, 5, 7),
(29, False, False, 5, 7),
# interval 3: for models/triggers 1-8
(21, False, False, 8, 14 - 1),
(26, True, False, 8, 14 - 1),
(28, False, True, 8, 14 - 1),
(29, False, False, 8, 14 - 1),
(21, False, False, 8, 10),
(26, True, False, 8, 10),
(28, False, True, 8, 10),
(29, False, False, 8, 10),
# interval 4: for models/triggers 1-8
(21, False, False, 14, 16),
(26, False, False, 14, 16),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

def test_initialization() -> None:
trigger = TimeTrigger(TimeTriggerConfig(every="2s"))
assert trigger.trigger_every_s == 2
assert trigger.config.every_seconds == 2
assert trigger.next_trigger_at is None


Expand Down

0 comments on commit beb3769

Please sign in to comment.