From 91c5fa3954bc153da4ae62b399479bf188d90063 Mon Sep 17 00:00:00 2001 From: Robin Holzinger Date: Tue, 20 Aug 2024 18:16:08 +0200 Subject: [PATCH] feat: Add PerformanceTrigger decision policies (#590) --- modyn/evaluator/internal/pytorch_evaluator.py | 2 +- .../internal/triggers/performance/__init__.py | 0 .../performance/data_density_tracker.py | 62 +++ .../triggers/performance/decision_policy.py | 201 +++++++++ .../performance/performance_tracker.py | 132 ++++++ modyn/supervisor/internal/utils/forecast.py | 23 + ...olicy.py => test_drift_decision_policy.py} | 0 .../performance/test_data_density_tracker.py | 93 ++++ .../test_performance_decision_policy.py | 408 ++++++++++++++++++ .../performance/test_performance_tracker.py | 131 ++++++ 10 files changed, 1051 insertions(+), 1 deletion(-) create mode 100644 modyn/supervisor/internal/triggers/performance/__init__.py create mode 100644 modyn/supervisor/internal/triggers/performance/data_density_tracker.py create mode 100644 modyn/supervisor/internal/triggers/performance/decision_policy.py create mode 100644 modyn/supervisor/internal/triggers/performance/performance_tracker.py create mode 100644 modyn/supervisor/internal/utils/forecast.py rename modyn/tests/supervisor/internal/triggers/drift/{test_decision_policy.py => test_drift_decision_policy.py} (100%) create mode 100644 modyn/tests/supervisor/internal/triggers/performance/test_data_density_tracker.py create mode 100644 modyn/tests/supervisor/internal/triggers/performance/test_performance_decision_policy.py create mode 100644 modyn/tests/supervisor/internal/triggers/performance/test_performance_tracker.py diff --git a/modyn/evaluator/internal/pytorch_evaluator.py b/modyn/evaluator/internal/pytorch_evaluator.py index da9ba9b77..1abe14c80 100644 --- a/modyn/evaluator/internal/pytorch_evaluator.py +++ b/modyn/evaluator/internal/pytorch_evaluator.py @@ -103,7 +103,7 @@ def _single_interval_evaluate(self, dataloader: torch.utils.data.DataLoader, int ) self._info(f"Finished evaluation of {interval_idx}. Putting items into queue...") - self._metric_result_queue.put((interval_idx, eval_result.metric_results.items()), timeout=30) + self._metric_result_queue.put((interval_idx, list(eval_result.metric_results.items())), timeout=30) self._info( f"Finished evaluation of {interval_idx}: {eval_result.num_samples} samples. " f"Queue size = {self._metric_result_queue.qsize()}" diff --git a/modyn/supervisor/internal/triggers/performance/__init__.py b/modyn/supervisor/internal/triggers/performance/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/modyn/supervisor/internal/triggers/performance/data_density_tracker.py b/modyn/supervisor/internal/triggers/performance/data_density_tracker.py new file mode 100644 index 000000000..83791407c --- /dev/null +++ b/modyn/supervisor/internal/triggers/performance/data_density_tracker.py @@ -0,0 +1,62 @@ +from collections import deque + +from modyn.const.types import ForecastingMethod +from modyn.supervisor.internal.utils.forecast import forecast_value + + +class DataDensityTracker: + """Observes a stream of data chunks and estimates the time density of the + data. + + Assumes that the data chunks are ordered by time. For the first + chunks only number of samples, start and end time of a batch is + considered. Starting with the second batch, the time between the + last sample of the previous batch and the first sample of the + current batch is considered as well. + + Most use cases have constant batch sizes. + """ + + def __init__(self, window_size: int) -> None: + """ + Args: + window_size: How many batches the memory for the rolling average should hold. + """ + self.batch_memory: deque[tuple[int, int]] = deque(maxlen=window_size) + """Memory of the last `window_size` batches containing the number of + samples and the time range of the batch in seconds.""" + + self._previous_batch_end_time: int | None = None + + def inform_data(self, data: list[tuple[int, int]]) -> None: + """Informs the tracker about new data batch.""" + if len(data) == 0: + return + + num_seconds = ( + data[-1][1] - self._previous_batch_end_time + if self._previous_batch_end_time is not None + else data[-1][1] - data[0][1] + ) + + self.batch_memory.append((len(data), num_seconds)) + self._previous_batch_end_time = data[-1][1] + + @property + def previous_batch_num_samples(self) -> int: + """Returns the number of samples in the last batch.""" + assert len(self.batch_memory) > 0, "No data in memory, calibration needed." + return self.batch_memory[-1][0] + + def needs_calibration(self) -> bool: + """Checks if the tracker has enough data for a forecast.""" + return len(self.batch_memory) == 0 + + def forecast_density(self, method: ForecastingMethod = "ridge_regression") -> float: + """Forecasts the data density based on the current memory. + + Returns: + The forecasted data density as ratio of samples per second. + """ + ratio_series = [num_samples / num_seconds for num_samples, num_seconds in self.batch_memory] + return forecast_value(observations=ratio_series, method=method) diff --git a/modyn/supervisor/internal/triggers/performance/decision_policy.py b/modyn/supervisor/internal/triggers/performance/decision_policy.py new file mode 100644 index 000000000..3c4f9370a --- /dev/null +++ b/modyn/supervisor/internal/triggers/performance/decision_policy.py @@ -0,0 +1,201 @@ +from abc import ABC, abstractmethod + +from modyn.config.schema.pipeline.trigger.performance.criterion import ( + DynamicPerformanceThresholdCriterion, + StaticNumberAvoidableMisclassificationCriterion, + StaticPerformanceThresholdCriterion, +) +from modyn.const.types import ForecastingMethod, TriggerEvaluationMode +from modyn.supervisor.internal.triggers.performance.data_density_tracker import ( + DataDensityTracker, +) +from modyn.supervisor.internal.triggers.performance.performance_tracker import ( + PerformanceTracker, +) + + +class PerformanceDecisionPolicy(ABC): + """Decision policy that will make the binary trigger decisions on + observations of a performance metric.""" + + @abstractmethod + def evaluate_decision( + self, + update_interval: int, + evaluation_scores: dict[str, float], + data_density: DataDensityTracker, + performance_tracker: PerformanceTracker, + mode: TriggerEvaluationMode, + method: ForecastingMethod, + ) -> bool: + """Evaluate the decision based on the given observation. + + At the time of calling this the performance_tracker has already been updated with the new performance value + to allow for forecast based decisions. + + Also, data_density has to be updated with the new data interval. + + Args: + update_interval: The interval in which the decision is made. + performance: The observed performance metric. + mode: The mode in which the decision should be evaluated. + data_density: The data density tracker, updated with the new data interval. + performance_tracker: The performance tracker, updated with the new performance value. + + Returns: + The final trigger decision. + """ + + def inform_trigger(self) -> None: + """Inform the decision policy that a trigger has been invoked.""" + + +class StaticPerformanceThresholdDecisionPolicy(PerformanceDecisionPolicy): + """Decision policy that will make the binary trigger decisions based on a + static threshold.""" + + def __init__(self, config: StaticPerformanceThresholdCriterion): + self.config = config + + def evaluate_decision( + self, + update_interval: int, + evaluation_scores: dict[str, float], + data_density: DataDensityTracker, + performance_tracker: PerformanceTracker, + mode: TriggerEvaluationMode, + method: ForecastingMethod, + ) -> bool: + if mode == "hindsight": + return evaluation_scores[self.config.metric] < self.config.metric_threshold + + return (evaluation_scores[self.config.metric] < self.config.metric_threshold) or ( + performance_tracker.forecast_next_performance(mode) < self.config.metric_threshold + ) + + +class DynamicPerformanceThresholdDecisionPolicy(PerformanceDecisionPolicy): + """Decision policy that will make the binary trigger decisions based on a + dynamic threshold. + + Value falls below the rolling average more than the allowed + deviation. + """ + + def __init__(self, config: DynamicPerformanceThresholdCriterion): + self.config = config + + def evaluate_decision( + self, + update_interval: int, + evaluation_scores: dict[str, float], + data_density: DataDensityTracker, + performance_tracker: PerformanceTracker, + mode: TriggerEvaluationMode, + method: ForecastingMethod, + ) -> bool: + threshold = performance_tracker.forecast_expected_performance(mode) - self.config.allowed_deviation + + if mode == "hindsight": + return evaluation_scores[self.config.metric] < threshold + + return (evaluation_scores[self.config.metric] < threshold) or ( + performance_tracker.forecast_next_performance(mode) < threshold + ) + + +class StaticNumberAvoidableMisclassificationDecisionPolicy(PerformanceDecisionPolicy): + """Decision policy that will make the binary trigger decisions based on a + static number of cumulated avoidable misclassifications.""" + + def __init__(self, config: StaticNumberAvoidableMisclassificationCriterion): + """ + Args: + threshold: The threshold of cumulated avoidable misclassifications. + """ + + self.config = config + self.cumulated_avoidable_misclassifications = 0 + + # pylint: disable=too-many-locals + def evaluate_decision( + self, + update_interval: int, + evaluation_scores: dict[str, float], + data_density: DataDensityTracker, + performance_tracker: PerformanceTracker, + mode: TriggerEvaluationMode, + method: ForecastingMethod, + ) -> bool: + """Utilizes the state of `DataDensityTracker` and `PerformanceTracker` + to make the decision. + + We support both the "hindsight" and "forecast" mode. + + In the "hindsight" mode, the decision is made based on the current performance and the cumulated avoidable misclassifications. + + - Formalization: + - historic observation: + - data_cum_since_last_trigger: The cumulated data points since the last trigger. + - avoidable_misclassifications_since_last_trigger: The cumulated avoidable misclassifications since + the last trigger. + + In the "lookahead" mode, the decision is made based on the current performance, the cumulated avoidable + misclassifications, future performance estimates and future data density estimates. + Similar to the "hindsight" mode, we first check if current performance already leads to a transgression + of the threshold and therefore to a trigger. + + If that's not the case we estimate the cumulated avoidable misclassifications until the next point of update. + If we expect a transgression of the threshold before the next update point, we trigger. + + This forward looking approach tries to avoid exceeding the misclassification budget in the first place. + """ + + # compute the number of avoidable misclassifications by retrieving the actual misclassifications + # and the expected misclassifications through the expected accuracy for the last interval. + previous_interval_num_misclassifications = performance_tracker.previous_batch_num_misclassifications + + # the expected performance won't change unless there's a trigger + expected_accuracy = ( + performance_tracker.forecast_expected_accuracy(method=method) + if self.config.expected_accuracy is None + else self.config.expected_accuracy + ) + previous_expected_num_misclassifications = (1 - expected_accuracy) * data_density.previous_batch_num_samples + new_avoidable_misclassifications = ( + previous_interval_num_misclassifications - previous_expected_num_misclassifications + ) + if new_avoidable_misclassifications < 0 and not self.config.allow_reduction: + new_avoidable_misclassifications = 0 + + self.cumulated_avoidable_misclassifications += round(new_avoidable_misclassifications) + + if mode == "hindsight": + return self.cumulated_avoidable_misclassifications >= self.config.avoidable_misclassification_threshold + + if mode == "lookahead": + # past misclassifications already exceed the threshold, forecasting not needed + if self.cumulated_avoidable_misclassifications >= self.config.avoidable_misclassification_threshold: + return True + + forecasted_data_density = data_density.forecast_density(method=method) + forecast_accuracy = performance_tracker.forecast_next_accuracy(method=method) + + accuracy_delta = expected_accuracy - forecast_accuracy + if accuracy_delta < 0 and not self.config.allow_reduction: + accuracy_delta = 0 + + # new misclassification = accuracy * samples; samples = data_density * interval_duration + forecast_new_avoidable_misclassifications = accuracy_delta * forecasted_data_density * update_interval + + forecasted_misclassifications = ( + self.cumulated_avoidable_misclassifications + forecast_new_avoidable_misclassifications + ) + + return forecasted_misclassifications >= self.config.avoidable_misclassification_threshold + + raise ValueError(f"Unknown mode: {mode}") + + def inform_trigger(self) -> None: + """Resets the cumulated avoidable misclassifications.""" + self.cumulated_avoidable_misclassifications = 0 diff --git a/modyn/supervisor/internal/triggers/performance/performance_tracker.py b/modyn/supervisor/internal/triggers/performance/performance_tracker.py new file mode 100644 index 000000000..ccdf81316 --- /dev/null +++ b/modyn/supervisor/internal/triggers/performance/performance_tracker.py @@ -0,0 +1,132 @@ +from collections import deque + +from modyn.const.types import ForecastingMethod +from modyn.supervisor.internal.utils.forecast import forecast_value + + +class PerformanceTracker: + """Observes a stream of performance evaluations and estimates performance + on the next chunk. + + While no trigger happens, the estimated performances is calculated + from the series of evaluations after every of the last n-triggers. + The next observed performance is also forecasted from the series of + evaluations since the last trigger. When a trigger happens, this + series of observations evaluations is reset. + + Provides both the wrapped performance metrics as well as accuracy + information. + """ + + def __init__(self, trigger_eval_window_size: int) -> None: + """ + Args: + window_size: How many evaluations after triggers should be kept in memory. + """ + self.trigger_evaluation_memory: deque[tuple[int, int, dict[str, float]]] = deque( + maxlen=trigger_eval_window_size + ) + """Memory of the last `window_size` evaluations after triggers with + their number of samples, misclassifications and evaluation scores for + different metrics. + + After every trigger, the memory is updated with the new + evaluation. + """ + + self.since_last_trigger: list[tuple[int, int, dict[str, float]]] = [] + """Memory of the evaluations since the last trigger with their number + of samples, misclassifications and evaluation scores for different + metrics. + + Upon trigger, this memory is reset. + """ + + def inform_evaluation( + self, + num_samples: int, + num_misclassifications: int, + evaluation_scores: dict[str, float], + ) -> None: + """Informs the tracker about a new evaluation.""" + self.since_last_trigger.append((num_samples, num_misclassifications, evaluation_scores)) + + def inform_trigger( + self, + num_samples: int, + num_misclassifications: int, + evaluation_scores: dict[str, float], + ) -> None: + """Informs the tracker about a new trigger and restarts the tracker of + expected performance evaluations. + + The expected performance evaluations are the evaluations right after triggers (`since_last_trigger`). + """ + self.trigger_evaluation_memory.append((num_samples, num_misclassifications, evaluation_scores)) + + # reset the since_last_trigger tracker: first element in the new series is the performance right after trigger + self.since_last_trigger = [(num_samples, num_misclassifications, evaluation_scores)] + + @property + def previous_batch_num_misclassifications(self) -> int: + """Returns the number of misclassifications in the previous batch.""" + return self.since_last_trigger[-1][1] + + def forecast_expected_accuracy(self, method: ForecastingMethod = "ridge_regression") -> float: + """Forecasts the expected accuracy based on the current memory of + evaluations right after triggers. + + Returns: + The forecasted accuracy. + """ + return forecast_value( + observations=[1 - p[1] / p[0] for p in self.trigger_evaluation_memory], + method=method, + ) + + def forecast_next_accuracy(self, method: ForecastingMethod = "ridge_regression") -> float: + """Forecasts the next accuracy (assuming no trigger) based on the + memory of evaluations since the last trigger. + + Returns: + The forecasted (observed) accuracy. + """ + return max( + 0, + min( + 1, + forecast_value( + observations=[1 - p[1] / p[0] for p in self.since_last_trigger], + method=method, + ), + ), + ) + + def forecast_expected_performance(self, metric: str, method: ForecastingMethod = "ridge_regression") -> float: + """Forecasts the performance based on the current memory of evaluations + right after triggers. + + Args: + metric: The metric to forecast the performance for. + method: The method to use for forecasting. + + Returns: + The forecasted performance. + """ + return forecast_value( + observations=[p[2][metric] for p in self.trigger_evaluation_memory], + method=method, + ) + + def forecast_next_performance(self, metric: str, method: ForecastingMethod = "ridge_regression") -> float: + """Forecasts the next performance based on the memory of evaluations + since the last trigger. + + Args: + metric: The metric to forecast the performance for. + method: The method to use for forecasting. + + Returns: + The forecasted (observed) performance. + """ + return forecast_value(observations=[p[2][metric] for p in self.since_last_trigger], method=method) diff --git a/modyn/supervisor/internal/utils/forecast.py b/modyn/supervisor/internal/utils/forecast.py new file mode 100644 index 000000000..7909191b9 --- /dev/null +++ b/modyn/supervisor/internal/utils/forecast.py @@ -0,0 +1,23 @@ +"""A simple one-dimensional time series forecasting utility.""" + +from sklearn import linear_model + +from modyn.const.types import ForecastingMethod + + +def forecast_value( + observations: list[float], + method: ForecastingMethod = "ridge_regression", + min_observations_for_ridge: int = 5, +) -> float: + """Forecasts the next value based on a series of past observations.""" + + assert len(observations) > 0, "No trigger happened yet." + + if len(observations) < min_observations_for_ridge or method == "rolling_average": + return sum(observations) / len(observations) + + # Ridge regression estimator for scalar time series forecasting + reg = linear_model.Ridge(alpha=0.5) + reg.fit([[i] for i in range(len(observations))], observations) + return reg.predict([[len(observations)]])[0] diff --git a/modyn/tests/supervisor/internal/triggers/drift/test_decision_policy.py b/modyn/tests/supervisor/internal/triggers/drift/test_drift_decision_policy.py similarity index 100% rename from modyn/tests/supervisor/internal/triggers/drift/test_decision_policy.py rename to modyn/tests/supervisor/internal/triggers/drift/test_drift_decision_policy.py diff --git a/modyn/tests/supervisor/internal/triggers/performance/test_data_density_tracker.py b/modyn/tests/supervisor/internal/triggers/performance/test_data_density_tracker.py new file mode 100644 index 000000000..a0df26d2c --- /dev/null +++ b/modyn/tests/supervisor/internal/triggers/performance/test_data_density_tracker.py @@ -0,0 +1,93 @@ +import pytest + +from modyn.supervisor.internal.triggers.performance.data_density_tracker import ( + DataDensityTracker, +) + + +def test_initial_state() -> None: + tracker = DataDensityTracker(window_size=3) + assert len(tracker.batch_memory) == 0 + assert tracker._previous_batch_end_time is None + + +def test_inform_data_empty_batch() -> None: + tracker = DataDensityTracker(window_size=3) + tracker.inform_data([]) + assert len(tracker.batch_memory) == 0 # Should not change anything + + +def test_inform_data_batches() -> None: + tracker = DataDensityTracker(window_size=3) + assert tracker.needs_calibration() + + with pytest.raises(AssertionError): + tracker.previous_batch_num_samples + + tracker.inform_data([(1, 10), (2, 20), (3, 30)]) + assert not tracker.needs_calibration() + assert len(tracker.batch_memory) == 1 + assert tracker._previous_batch_end_time == 30 + assert tracker.batch_memory[-1] == (3, 20) # (len(data), end_time - start_time) + assert tracker.previous_batch_num_samples == 3 + + tracker.inform_data([(4, 40), (5, 50), (6, 60)]) + assert len(tracker.batch_memory) == 2 + assert tracker.batch_memory[-1] == ( + 3, + 30, + ) # Time gap is 10 due to the interval between batches + assert tracker.previous_batch_num_samples == 3 + + +def test_inform_data_window_rollover() -> None: + tracker = DataDensityTracker(window_size=3) + tracker.inform_data([(1, 10), (2, 20), (3, 30)]) + tracker.inform_data([(4, 40), (5, 50), (6, 60)]) + tracker.inform_data([(7, 70), (8, 80), (9, 90)]) + tracker.inform_data([(10, 100), (11, 110), (12, 120)]) # This should push out the first batch + assert len(tracker.batch_memory) == 3 + assert tracker.batch_memory[0] == ( + 3, + 30, + ) # First entry in the deque should be the second batch + + +def test_forecast_density_empty_memory() -> None: + tracker = DataDensityTracker(window_size=3) + with pytest.raises(AssertionError): + tracker.forecast_density() + + +def test_forecast_density_simple_average() -> None: + tracker = DataDensityTracker(window_size=3) + tracker.inform_data([(1, 10), (2, 20), (3, 30)]) # density: 3/20[s] + tracker.inform_data([(4, 40), (5, 50), (6, 60)]) # density: 3/30[s] + density = tracker.forecast_density() + assert density == (3 / 20 + 3 / 30) / 2 + + +def test_forecast_density_ridge_regression() -> None: + tracker = DataDensityTracker(window_size=5) + tracker.inform_data([(1, 10), (2, 20), (3, 30)]) + tracker.inform_data([(4, 40), (5, 50), (6, 60)]) + tracker.inform_data([(7, 70), (8, 80), (9, 90)]) + tracker.inform_data([(10, 100), (11, 110), (12, 120)]) + tracker.inform_data([(13, 130), (14, 140), (15, 150)]) + tracker.inform_data([(16, 160), (17, 170), (18, 180)]) + density = tracker.forecast_density("ridge_regression") + assert isinstance(density, float) + density = tracker.forecast_density("rolling_average") + + +def test_forecast_density_with_varied_batches() -> None: + tracker = DataDensityTracker(window_size=5) + tracker.inform_data([(1, 10), (2, 20)]) + assert tracker.previous_batch_num_samples == 2 + tracker.inform_data([(3, 30), (4, 40), (5, 50)]) + assert tracker.previous_batch_num_samples == 3 + tracker.inform_data([(6, 60), (7, 70)]) + assert tracker.previous_batch_num_samples == 2 + density = tracker.forecast_density() + assert isinstance(density, float) + assert density > 0 # Ensure a positive density value diff --git a/modyn/tests/supervisor/internal/triggers/performance/test_performance_decision_policy.py b/modyn/tests/supervisor/internal/triggers/performance/test_performance_decision_policy.py new file mode 100644 index 000000000..b8adaa1eb --- /dev/null +++ b/modyn/tests/supervisor/internal/triggers/performance/test_performance_decision_policy.py @@ -0,0 +1,408 @@ +from unittest.mock import MagicMock, PropertyMock, patch + +import pytest + +from modyn.config.schema.pipeline.trigger.performance.criterion import ( + DynamicPerformanceThresholdCriterion, + StaticNumberAvoidableMisclassificationCriterion, + StaticPerformanceThresholdCriterion, +) +from modyn.supervisor.internal.triggers.performance.data_density_tracker import ( + DataDensityTracker, +) +from modyn.supervisor.internal.triggers.performance.decision_policy import ( + DynamicPerformanceThresholdDecisionPolicy, + StaticNumberAvoidableMisclassificationDecisionPolicy, + StaticPerformanceThresholdDecisionPolicy, +) +from modyn.supervisor.internal.triggers.performance.performance_tracker import ( + PerformanceTracker, +) + + +@pytest.fixture +def static_threshold_policy() -> StaticPerformanceThresholdDecisionPolicy: + return StaticPerformanceThresholdDecisionPolicy( + config=StaticPerformanceThresholdCriterion(metric_threshold=0.8, metric="acc") + ) + + +@pytest.fixture +def dynamic_threshold_policy() -> DynamicPerformanceThresholdDecisionPolicy: + return DynamicPerformanceThresholdDecisionPolicy( + config=DynamicPerformanceThresholdCriterion(allowed_deviation=0.2, metric="acc") + ) + + +@pytest.fixture +def misclassification_policy() -> StaticNumberAvoidableMisclassificationDecisionPolicy: + return StaticNumberAvoidableMisclassificationDecisionPolicy( + config=StaticNumberAvoidableMisclassificationCriterion( + allow_reduction=True, avoidable_misclassification_threshold=10 + ) + ) + + +# -------------------------------------------------------------------------------------------------------------------- # +# Dummies # +# -------------------------------------------------------------------------------------------------------------------- # + + +@pytest.fixture +def dummy_data_density_tracker() -> DataDensityTracker: + return DataDensityTracker(window_size=3) + + +@pytest.fixture +def dummy_performance_tracker() -> PerformanceTracker: + return PerformanceTracker(trigger_eval_window_size=3) + + +# -------------------------------------------------------------------------------------------------------------------- # +# StaticPerformanceThresholdDecisionPolicy # +# -------------------------------------------------------------------------------------------------------------------- # + + +def test_static_performance_hindsight( + dummy_data_density_tracker: DataDensityTracker, + dummy_performance_tracker: PerformanceTracker, + static_threshold_policy: StaticPerformanceThresholdDecisionPolicy, +) -> None: + """Test static threshold decision policy in hindsight mode.""" + eval_decision_kwargs = { + "update_interval": 10, + "data_density": dummy_data_density_tracker, + "performance_tracker": dummy_performance_tracker, + "mode": "hindsight", + "method": "rolling_average", + } + assert static_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.79}) + assert not static_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.81}) + + with pytest.raises(KeyError): + static_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"NOT_ACC": 0.81}) + + +@patch.object(PerformanceTracker, "forecast_next_performance", side_effect=[0.81, 0.79]) +def test_static_performance_forecast( + mock_forecast_next_performance: MagicMock, + dummy_performance_tracker: PerformanceTracker, + dummy_data_density_tracker: DataDensityTracker, + static_threshold_policy: StaticPerformanceThresholdDecisionPolicy, +) -> None: + """Test static threshold decision policy in lookahead mode.""" + eval_decision_kwargs = { + "update_interval": 10, + "data_density": dummy_data_density_tracker, + "performance_tracker": dummy_performance_tracker, + "mode": "lookahead", + "method": "rolling_average", + } + # current performance already below threshold, trigger independent of forecast + assert static_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.79}) + + # current performance above threshold, trigger only if forecast is below threshold + assert not static_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.81}) + assert static_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.81}) + + assert mock_forecast_next_performance.call_count == 2 + + +# -------------------------------------------------------------------------------------------------------------------- # +# DynamicPerformanceThresholdDecisionPolicy # +# -------------------------------------------------------------------------------------------------------------------- # + + +@patch.object(PerformanceTracker, "forecast_expected_performance", side_effect=[0.5, 0.5, -1]) +def test_dynamic_performance_hindsight( + mock_forecast_expected_performance: MagicMock, + dummy_performance_tracker: PerformanceTracker, + dummy_data_density_tracker: DataDensityTracker, + dynamic_threshold_policy: DynamicPerformanceThresholdDecisionPolicy, +) -> None: + """Test dynamic threshold decision policy in hindsight mode.""" + + eval_decision_kwargs = { + "update_interval": 10, + "data_density": dummy_data_density_tracker, + "performance_tracker": dummy_performance_tracker, + "mode": "hindsight", + "method": "rolling_average", + } + # current performance already below threshold, trigger independent of forecast + assert dynamic_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.5 - 0.21}) + assert not dynamic_threshold_policy.evaluate_decision( + **eval_decision_kwargs, evaluation_scores={"acc": 0.5 - 0.19} + ) # allowed deviation not reached + + assert mock_forecast_expected_performance.call_count == 2 + + with pytest.raises(KeyError): + dynamic_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"NOT_ACC": 0.5}) + + +@patch.object(PerformanceTracker, "forecast_next_performance", side_effect=[0.29, 0.31]) +@patch.object( + PerformanceTracker, + "forecast_expected_performance", + side_effect=[0.5, 0.5, 0.5], +) +def test_dynamic_performance_forecast( + mock_forecast_expected_performance: MagicMock, + mock_forecast_next_performance: MagicMock, + dummy_performance_tracker: PerformanceTracker, + dummy_data_density_tracker: DataDensityTracker, + dynamic_threshold_policy: DynamicPerformanceThresholdDecisionPolicy, +) -> None: + """Test dynamic threshold decision policy in forecast mode.""" + + eval_decision_kwargs = { + "update_interval": 10, + "data_density": dummy_data_density_tracker, + "performance_tracker": dummy_performance_tracker, + "mode": "lookahead", + "method": "rolling_average", + } + # current performance already below threshold, trigger independent of forecast + assert dynamic_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.29}) + + assert mock_forecast_expected_performance.call_count == 1 + assert mock_forecast_next_performance.call_count == 0 + + # current performance above threshold, trigger only if forecast is below threshold + assert dynamic_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.5}) + assert not dynamic_threshold_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0.5}) + + assert mock_forecast_expected_performance.call_count == 3 + assert mock_forecast_next_performance.call_count == 2 + + +# -------------------------------------------------------------------------------------------------------------------- # +# StaticNumberAvoidableMisclassificationDecisionPolicy # +# -------------------------------------------------------------------------------------------------------------------- # + + +# TODO: self.config.expected_accuracy + + +@patch.object( + DataDensityTracker, + "previous_batch_num_samples", + side_effect=[100, 100, 100, 100, 100], + new_callable=PropertyMock, +) +@patch.object( + PerformanceTracker, + "forecast_expected_accuracy", + side_effect=[1.0, 0.95, 1.0, 1.0, 1.0], +) +@patch.object( + PerformanceTracker, + "previous_batch_num_misclassifications", + side_effect=[5, 9, 1, 9, 4], + new_callable=PropertyMock, +) +def test_misclassification_hindsight( + mock_previous_batch_num_misclassifications: MagicMock, + mock_forecast_expected_accuracy: MagicMock, + mock_previous_batch_num_samples: MagicMock, + dummy_performance_tracker: PerformanceTracker, + dummy_data_density_tracker: DataDensityTracker, + misclassification_policy: StaticNumberAvoidableMisclassificationDecisionPolicy, +) -> None: + """Test static number avoidable misclassification policy in hindsight + mode.""" + eval_decision_kwargs = { + "update_interval": 10, + "data_density": dummy_data_density_tracker, + "performance_tracker": dummy_performance_tracker, + "mode": "hindsight", + "method": "rolling_average", + } + + assert misclassification_policy.cumulated_avoidable_misclassifications == 0 + + # don't trigger below the 10th misclassification + + # observed misclassifications=5, with expected accuracy of 1.0 every misclassification is avoidable + assert not misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) + assert misclassification_policy.cumulated_avoidable_misclassifications == 5 + assert mock_previous_batch_num_misclassifications.call_count == 1 + + # observed misclassifications=8, expected misclassifications: 5 --> 9-5=4 avoidable misclassifications + # cumulated_avoidable_misclassifications: 5+4 + assert not misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) + assert misclassification_policy.cumulated_avoidable_misclassifications == 5 + 4 + assert mock_previous_batch_num_misclassifications.call_count == 2 + + # observed misclassifications: 1, with expected accuracy of 1.0 every misclassification is avoidable + assert misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) + assert misclassification_policy.cumulated_avoidable_misclassifications == 10 + assert mock_previous_batch_num_misclassifications.call_count == 3 + + misclassification_policy.inform_trigger() # reset misclassifications + assert misclassification_policy.cumulated_avoidable_misclassifications == 0 + + # observed misclassifications: 9, with expected accuracy of 1.0 every misclassification is avoidable + assert not misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) + assert misclassification_policy.cumulated_avoidable_misclassifications == 9 + assert mock_previous_batch_num_misclassifications.call_count == 4 + + # observed misclassifications: 4, with expected accuracy of 1.0 every misclassification is avoidable + assert misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) + assert misclassification_policy.cumulated_avoidable_misclassifications == 13 + assert mock_previous_batch_num_misclassifications.call_count == 5 + + +@patch.object( + DataDensityTracker, + "previous_batch_num_samples", + side_effect=[100, 100, 100], + new_callable=PropertyMock, +) +@patch.object(PerformanceTracker, "forecast_expected_accuracy", side_effect=[0, 0, 0]) +@patch.object( + PerformanceTracker, + "previous_batch_num_misclassifications", + side_effect=[90, 70, 70], + new_callable=PropertyMock, +) +def test_misclassification_static_expected_performance( + mock_previous_batch_num_misclassifications: MagicMock, + mock_forecast_expected_accuracy: MagicMock, + mock_previous_batch_num_samples: MagicMock, + dummy_performance_tracker: PerformanceTracker, + dummy_data_density_tracker: DataDensityTracker, + misclassification_policy: StaticNumberAvoidableMisclassificationDecisionPolicy, +) -> None: + """Test static number avoidable misclassification policy in hindsight + mode.""" + misclassification_policy.config.expected_accuracy = 0.25 + misclassification_policy.config.allow_reduction = True + + eval_decision_kwargs = { + "update_interval": 10, + "data_density": dummy_data_density_tracker, + "performance_tracker": dummy_performance_tracker, + "mode": "hindsight", + "method": "rolling_average", + } + # expected misclassifications = (1-0.25)*100 = 75, observed misclassifications = 90 + assert misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) + assert misclassification_policy.cumulated_avoidable_misclassifications == 15 + assert mock_previous_batch_num_misclassifications.call_count == 1 + assert mock_previous_batch_num_samples.call_count == 1 + + misclassification_policy.inform_trigger() # reset misclassifications + misclassification_policy.cumulated_avoidable_misclassifications = 10 + + # negative avoidable misclassifications + # expected misclassifications = (1-0.25)*100 = 75, observed misclassifications = 70 + assert not misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) + assert misclassification_policy.cumulated_avoidable_misclassifications == 5 + assert mock_previous_batch_num_misclassifications.call_count == 2 + assert mock_previous_batch_num_samples.call_count == 2 + + # forbid reduction: with reduction cumulated_avoidable_misclassifications reduced from 5 to 0, here constant at 5 + misclassification_policy.config.allow_reduction = False + assert not misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) + assert misclassification_policy.cumulated_avoidable_misclassifications == 5 + assert mock_previous_batch_num_misclassifications.call_count == 3 + assert mock_previous_batch_num_samples.call_count == 3 + + # not used in the case of static expected performance + assert mock_forecast_expected_accuracy.call_count == 0 + + +@patch.object( + DataDensityTracker, + "previous_batch_num_samples", + side_effect=[100, 100, 100], + new_callable=PropertyMock, +) +@patch.object(DataDensityTracker, "forecast_density", side_effect=[1, 1]) +@patch.object(PerformanceTracker, "forecast_expected_accuracy", side_effect=[]) +@patch.object(PerformanceTracker, "forecast_next_accuracy", side_effect=[0.4, 0.2]) +@patch.object(PerformanceTracker, "forecast_next_performance", side_effect=[]) +@patch.object( + PerformanceTracker, + "previous_batch_num_misclassifications", + side_effect=[60, 57, 57], + new_callable=PropertyMock, +) +def test_misclassification_forecast( + mock_previous_batch_num_misclassifications: MagicMock, + mock_forecast_next_performance: MagicMock, + mock_forecast_next_accuracy: MagicMock, + mock_forecast_expected_accuracy: MagicMock, + mock_forecast_density: MagicMock, + mock_previous_batch_num_samples: MagicMock, + dummy_performance_tracker: PerformanceTracker, + dummy_data_density_tracker: DataDensityTracker, + misclassification_policy: StaticNumberAvoidableMisclassificationDecisionPolicy, +) -> None: + """Test static number avoidable misclassification policy in forecast + mode.""" + misclassification_policy.config.expected_accuracy = 0.5 + + eval_decision_kwargs = { + "data_density": dummy_data_density_tracker, + "performance_tracker": dummy_performance_tracker, + "mode": "lookahead", + "method": "rolling_average", + } + + assert misclassification_policy.cumulated_avoidable_misclassifications == 0 + + # 50 expected misclassifications, 60 observed misclassifications + # 10 past avoidable misclassifications, forecasting not needed, already exceeding + assert misclassification_policy.evaluate_decision( + **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval=0 + ) + assert misclassification_policy.cumulated_avoidable_misclassifications == 10 + assert mock_previous_batch_num_misclassifications.call_count == 1 + assert mock_previous_batch_num_samples.call_count == 1 + assert mock_forecast_next_accuracy.call_count == 0 + assert mock_forecast_density.call_count == 0 + + misclassification_policy.inform_trigger() # reset misclassifications + mock_previous_batch_num_misclassifications.reset_mock() + mock_previous_batch_num_samples.reset_mock() + + # 7 avoidable past misclassifications (57 observed, 50 expected) + # 1 forecasted misclassifications: exp acc=0.5, forecasted acc=0.4, update interval 10, data density=1 + # --> 8 avoidable misclassifications --> don't trigger + assert not misclassification_policy.evaluate_decision( + **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval=10 + ) + # forecasted misclassifications not persisted in the counter + assert misclassification_policy.cumulated_avoidable_misclassifications == 7 + assert mock_previous_batch_num_misclassifications.call_count == 1 + assert mock_previous_batch_num_samples.call_count == 1 + assert mock_forecast_next_accuracy.call_count == 1 + assert mock_forecast_density.call_count == 1 + + # reset + misclassification_policy.cumulated_avoidable_misclassifications = 0 + mock_previous_batch_num_misclassifications.reset_mock() + mock_previous_batch_num_samples.reset_mock() + mock_forecast_next_accuracy.reset_mock() + mock_forecast_density.reset_mock() + + # 7 avoidable past misclassifications (57 observed, 50 expected) + # 3 forecasted misclassifications: exp acc=0.5, forecasted acc=0.2, update interval 10, data density=1 + # --> 10 avoidable misclassifications --> don't trigger + assert misclassification_policy.evaluate_decision( + **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval=10 + ) + assert misclassification_policy.cumulated_avoidable_misclassifications == 7 + assert mock_previous_batch_num_misclassifications.call_count == 1 + assert mock_previous_batch_num_samples.call_count == 1 + assert mock_forecast_next_accuracy.call_count == 1 + assert mock_forecast_density.call_count == 1 + + # we only need the accuracy values here + assert mock_forecast_next_performance.call_count == 0 + + # we use a static expected accuracy to make things easier + assert mock_forecast_expected_accuracy.call_count == 0 diff --git a/modyn/tests/supervisor/internal/triggers/performance/test_performance_tracker.py b/modyn/tests/supervisor/internal/triggers/performance/test_performance_tracker.py new file mode 100644 index 000000000..1b4c4d578 --- /dev/null +++ b/modyn/tests/supervisor/internal/triggers/performance/test_performance_tracker.py @@ -0,0 +1,131 @@ +import pytest + +from modyn.supervisor.internal.triggers.performance.performance_tracker import ( + PerformanceTracker, +) + + +def test_initial_state() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + with pytest.raises(IndexError): + tracker.previous_batch_num_misclassifications + assert len(tracker.trigger_evaluation_memory) == 0 + assert len(tracker.since_last_trigger) == 0 + + +def test_inform_evaluation() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + tracker.inform_evaluation(10, 2, {"acc": 0.8}) + assert tracker.previous_batch_num_misclassifications == 2 + assert len(tracker.since_last_trigger) == 1 + assert tracker.since_last_trigger[0] == (10, 2, {"acc": 0.8}) + + +def test_inform_trigger() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + tracker.inform_evaluation(10, 2, {"acc": 0.8}) + tracker.inform_trigger(10, 1, {"acc": 0.9}) + assert tracker.previous_batch_num_misclassifications == 1 + assert len(tracker.trigger_evaluation_memory) == 1 + assert tracker.trigger_evaluation_memory[-1] == (10, 1, {"acc": 0.9}) + assert len(tracker.since_last_trigger) == 1 # Reset after trigger + + +def test_inform_trigger_memory_rollover() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + tracker.inform_trigger(10, 2, {"acc": 0.8}) + tracker.inform_trigger(20, 3, {"acc": 0.85}) + tracker.inform_trigger(10, 1, {"acc": 0.9}) + tracker.inform_trigger(20, 1, {"acc": 0.95}) # This should push out the first evaluation + assert tracker.previous_batch_num_misclassifications == 1 + assert len(tracker.trigger_evaluation_memory) == 3 + # First entry should be the second trigger + assert tracker.trigger_evaluation_memory[0] == (20, 3, {"acc": 0.85}) + + +def test_forecast_expected_performance_no_trigger() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + with pytest.raises(AssertionError): + tracker.forecast_expected_performance(metric="acc") + + +def test_forecast_expected_performance_simple_average() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + tracker.inform_trigger(10, 2, {"acc": 0.8}) + tracker.inform_trigger(20, 3, {"acc": 0.85}) + performance = tracker.forecast_expected_performance(metric="acc", method="rolling_average") + assert performance == 0.825 # Simple average of 0.8 and 0.85 + performance = tracker.forecast_expected_performance(metric="acc", method="ridge_regression") + # Simple average of 0.8 and 0.85 (not enough samples for ridge regression) + assert performance == 0.825 + + with pytest.raises(KeyError): + tracker.forecast_expected_performance(metric="NOT_ACC", method="ridge_regression") + + +def test_forecast_expected_performance_ridge_regression() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=5) + tracker.inform_trigger(10, 2, {"acc": 0.8}) + tracker.inform_trigger(20, 3, {"acc": 0.85}) + tracker.inform_trigger(10, 1, {"acc": 0.9}) + tracker.inform_trigger(20, 1, {"acc": 0.95}) + tracker.inform_trigger(10, 0, {"acc": 1.0}) + tracker.inform_trigger(20, 0, {"acc": 1.0}) + performance = tracker.forecast_expected_performance(metric="acc", method="rolling_average") + assert isinstance(performance, float) # Should run ridge regression + assert performance == (0.85 + 0.9 + 0.95 + 1.0 + 1.0) / 5 + performance_ridge = tracker.forecast_expected_performance(metric="acc", method="ridge_regression") + assert 0.9 <= performance_ridge + + +def test_forecast_next_performance_no_trigger() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + with pytest.raises(AssertionError): + tracker.forecast_next_performance(metric="acc", method="ridge_regression") + + +def test_forecast_next_performance_simple_average() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + tracker.inform_evaluation(10, 2, {"acc": 0.8}) + tracker.inform_evaluation(20, 3, {"acc": 0.85}) + performance = tracker.forecast_next_performance(metric="acc", method="rolling_average") + assert performance == 0.825 # Simple average of 0.8 and 0.85 + + +def test_forecast_next_performance_ridge_regression() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=5) + tracker.inform_evaluation(10, 1, {"acc": 0.8}) + tracker.inform_evaluation(10, 1, {"acc": 0.85}) + tracker.inform_evaluation(10, 1, {"acc": 0.9}) + tracker.inform_evaluation(10, 1, {"acc": 0.95}) + tracker.inform_evaluation(10, 1, {"acc": 1.0}) + performance = tracker.forecast_next_performance(metric="acc") + assert isinstance(performance, float) + + +def test_forecast_next_performance_with_no_evaluations() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=3) + tracker.inform_trigger(10, 1, {"acc": 0.8}) + tracker.inform_trigger(10, 1, {"acc": 0.85}) + performance = tracker.forecast_next_performance(metric="acc") + assert isinstance(performance, float) + assert performance == 0.85 # Should return the last trigger evaluation + + +def test_accuracy_forecasts() -> None: + tracker = PerformanceTracker(trigger_eval_window_size=2) + tracker.inform_evaluation(10, 1, {"acc": 0}) + tracker.inform_evaluation(10, 2, {"acc": 0}) + tracker.inform_evaluation(10, 3, {"acc": 0}) + tracker.inform_evaluation(10, 4, {"acc": 0}) + tracker.inform_evaluation(10, 5, {"acc": 0}) + performance = tracker.forecast_next_accuracy(method="rolling_average") + assert abs(performance - (9 + 8 + 7 + 6 + 5) / 5 / 10) < 1e-5 + + tracker.inform_trigger(10, 1, {"acc": 0}) + tracker.inform_trigger(10, 2, {"acc": 0}) + tracker.inform_trigger(10, 3, {"acc": 0}) + tracker.inform_trigger(10, 4, {"acc": 0}) + tracker.inform_trigger(10, 5, {"acc": 0}) + performance = tracker.forecast_expected_accuracy("rolling_average") # window size 2 + assert abs(tracker.forecast_expected_accuracy("rolling_average") - (6 + 5) / 2 / 10) < 1e-5