From ecdd576396e6d81c50aa354f4fe36404ec3e7ae3 Mon Sep 17 00:00:00 2001 From: Robin Holzinger Date: Fri, 30 Aug 2024 11:17:32 +0200 Subject: [PATCH] feat: Add Avoidable misclassification based `CostTrigger` (#605) --- .../schema/pipeline/trigger/cost/cost.py | 7 +- .../pipeline/trigger/performance/criterion.py | 14 +- .../trigger/performance/performance.py | 32 ++-- ...avoidablemissclassification_costtrigger.py | 94 +++++++++++ .../internal/triggers/batchedtrigger.py | 2 + ...avoidablemissclassification_costtrigger.py | 50 ------ .../cost/incorporation_latency_tracker.py | 5 +- .../triggers/{cost => }/costtrigger.py | 5 +- .../dataincorporationlatency_costtrigger.py | 2 +- .../triggers/performance/decision_policy.py | 66 ++++---- .../misclassification_estimator.py | 77 +++++++++ .../performance/performancetrigger_mixin.py | 151 ++++++++++++++++++ .../internal/triggers/performancetrigger.py | 146 ++--------------- .../test_incorporation_latency_tracker.py | 12 +- .../test_performance_decision_policy.py | 67 ++++---- .../test_performancetrigger_mixin.py | 146 +++++++++++++++++ ...emisclassification_latency_cost_trigger.py | 132 +++++++++++++++ ...test_incorporation_latency_cost_trigger.py | 2 +- .../triggers/test_performancetrigger.py | 44 ++--- 19 files changed, 738 insertions(+), 316 deletions(-) create mode 100644 modyn/supervisor/internal/triggers/avoidablemissclassification_costtrigger.py delete mode 100644 modyn/supervisor/internal/triggers/cost/avoidablemissclassification_costtrigger.py rename modyn/supervisor/internal/triggers/{cost => }/costtrigger.py (94%) rename modyn/supervisor/internal/triggers/{cost => }/dataincorporationlatency_costtrigger.py (94%) create mode 100644 modyn/supervisor/internal/triggers/performance/misclassification_estimator.py create mode 100644 modyn/supervisor/internal/triggers/performance/performancetrigger_mixin.py create mode 100644 modyn/tests/supervisor/internal/triggers/performance/test_performancetrigger_mixin.py create mode 100644 modyn/tests/supervisor/internal/triggers/test_avoidablemisclassification_latency_cost_trigger.py rename modyn/tests/supervisor/internal/triggers/{cost => }/test_incorporation_latency_cost_trigger.py (97%) diff --git a/modyn/config/schema/pipeline/trigger/cost/cost.py b/modyn/config/schema/pipeline/trigger/cost/cost.py index 87227bb1e..593b716f1 100644 --- a/modyn/config/schema/pipeline/trigger/cost/cost.py +++ b/modyn/config/schema/pipeline/trigger/cost/cost.py @@ -9,6 +9,7 @@ from modyn.config.schema.pipeline.trigger.common.batched import BatchedTriggerConfig +from ..performance.criterion import _NumberAvoidableMisclassificationCriterion from ..performance.performance import _InternalPerformanceTriggerConfig @@ -59,7 +60,11 @@ def conversion_factor(self) -> float: return self.incorporation_delay_per_training_second -class AvoidableMisclassificationCostTriggerConfig(_CostTriggerConfig, _InternalPerformanceTriggerConfig): +class AvoidableMisclassificationCostTriggerConfig( + _CostTriggerConfig, + _InternalPerformanceTriggerConfig, + _NumberAvoidableMisclassificationCriterion, +): """Cost aware trigger policy configuration that using the number of avoidable misclassifications integration latency as a regret metric. diff --git a/modyn/config/schema/pipeline/trigger/performance/criterion.py b/modyn/config/schema/pipeline/trigger/performance/criterion.py index 4de65d669..437e4a200 100644 --- a/modyn/config/schema/pipeline/trigger/performance/criterion.py +++ b/modyn/config/schema/pipeline/trigger/performance/criterion.py @@ -68,13 +68,6 @@ class _NumberAvoidableMisclassificationCriterion(ModynBaseModel): "If not set, the expected performance will be inferred dynamically with a rolling average." ), ) - - -class StaticNumberAvoidableMisclassificationCriterion(_NumberAvoidableMisclassificationCriterion): - id: Literal["StaticNumberMisclassificationCriterion"] = Field("StaticNumberMisclassificationCriterion") - avoidable_misclassification_threshold: int = Field( - description="The threshold for the misclassification rate that will invoke a trigger." - ) allow_reduction: bool = Field( False, description=( @@ -84,6 +77,13 @@ class StaticNumberAvoidableMisclassificationCriterion(_NumberAvoidableMisclassif ) +class StaticNumberAvoidableMisclassificationCriterion(_NumberAvoidableMisclassificationCriterion): + id: Literal["StaticNumberMisclassificationCriterion"] = Field("StaticNumberMisclassificationCriterion") + avoidable_misclassification_threshold: int = Field( + description="The threshold for the misclassification rate that will invoke a trigger." + ) + + # -------------------------------------------------------------------------------------------------------------------- # # Union # # -------------------------------------------------------------------------------------------------------------------- # diff --git a/modyn/config/schema/pipeline/trigger/performance/performance.py b/modyn/config/schema/pipeline/trigger/performance/performance.py index 9edd3e3a4..d8ba26536 100644 --- a/modyn/config/schema/pipeline/trigger/performance/performance.py +++ b/modyn/config/schema/pipeline/trigger/performance/performance.py @@ -31,7 +31,7 @@ def validate_metrics(cls, dataset: EvalDataConfig) -> EvalDataConfig: class _InternalPerformanceTriggerConfig(BatchedTriggerConfig): data_density_window_size: int = Field( - 0, + 20, description="The window size for the data density estimation. Only used for lookahead mode.", ) performance_triggers_window_size: int = Field( @@ -43,17 +43,6 @@ class _InternalPerformanceTriggerConfig(BatchedTriggerConfig): description="Configuration for the evaluation of the performance trigger." ) - decision_criteria: dict[str, PerformanceTriggerCriterion] = Field( - description=( - "The decision criteria to be used for the performance trigger. If any of the criteria is met, " - "the trigger will be executed. The criteria will be evaluated in the order they are defined. " - "Every criterion is linked to a metric. Some of the criteria implicitly only work on accuracy which is " - "the default metric that is always generated and cannot be disabled. To define a " - "`StaticPerformanceThresholdCriterion` on Accuracy, the evaluation config has to define the accuracy metric." - ), - min_length=1, - ) - mode: TriggerEvaluationMode = Field( "hindsight", description="Whether to also consider forecasted future performance in the drift decision.", @@ -66,6 +55,21 @@ class _InternalPerformanceTriggerConfig(BatchedTriggerConfig): ), ) + +class PerformanceTriggerConfig(_InternalPerformanceTriggerConfig): + id: Literal["PerformanceTrigger"] = Field("PerformanceTrigger") + + decision_criteria: dict[str, PerformanceTriggerCriterion] = Field( + description=( + "The decision criteria to be used for the performance trigger. If any of the criteria is met, " + "the trigger will be executed. The criteria will be evaluated in the order they are defined. " + "Every criterion is linked to a metric. Some of the criteria implicitly only work on accuracy which is " + "the default metric that is always generated and cannot be disabled. To define a " + "`StaticPerformanceThresholdCriterion` on Accuracy, the evaluation config has to define the accuracy metric." + ), + min_length=1, + ) + @model_validator(mode="after") def validate_decision_criteria(self) -> "PerformanceTriggerConfig": """Assert that all criteria use metrics that are defined in the @@ -77,7 +81,3 @@ def validate_decision_criteria(self) -> "PerformanceTriggerConfig": f"Criterion {criterion.id} uses metric {criterion.metric} which is not defined in the evaluation config." ) return self - - -class PerformanceTriggerConfig(_InternalPerformanceTriggerConfig): - id: Literal["PerformanceTrigger"] = Field("PerformanceTrigger") diff --git a/modyn/supervisor/internal/triggers/avoidablemissclassification_costtrigger.py b/modyn/supervisor/internal/triggers/avoidablemissclassification_costtrigger.py new file mode 100644 index 000000000..a5964ca10 --- /dev/null +++ b/modyn/supervisor/internal/triggers/avoidablemissclassification_costtrigger.py @@ -0,0 +1,94 @@ +import logging + +from typing_extensions import override + +from modyn.config.schema.pipeline.trigger.cost.cost import ( + AvoidableMisclassificationCostTriggerConfig, +) +from modyn.supervisor.internal.triggers.costtrigger import CostTrigger +from modyn.supervisor.internal.triggers.performance.misclassification_estimator import ( + NumberAvoidableMisclassificationEstimator, +) +from modyn.supervisor.internal.triggers.performance.performancetrigger_mixin import ( + PerformanceTriggerMixin, +) +from modyn.supervisor.internal.triggers.trigger import TriggerContext + +logger = logging.getLogger(__name__) + + +class AvoidableMisclassificationCostTrigger(CostTrigger, PerformanceTriggerMixin): + """Triggers when the avoidable misclassification cost incorporation latency + (regret) exceeds the estimated training time.""" + + def __init__(self, config: AvoidableMisclassificationCostTriggerConfig): + CostTrigger.__init__(self, config) + PerformanceTriggerMixin.__init__(self, config) + + self.config = config + self.context: TriggerContext | None = None + + self.misclassification_estimator = NumberAvoidableMisclassificationEstimator( + config.expected_accuracy, config.allow_reduction + ) + + @override + def init_trigger(self, context: TriggerContext) -> None: + # Call CostTrigger's init_trigger method to initialize the trigger context + CostTrigger.init_trigger(self, context) + + # Call PerformanceTriggerMixin's init_trigger method to initialize the internal performance detection state + PerformanceTriggerMixin._init_trigger(self, context) + + @override + def inform_new_model( + self, + most_recent_model_id: int, + number_samples: int | None = None, + training_time: float | None = None, + ) -> None: + """Update the cost and performance trackers with the new model + metadata.""" + + # Call CostTrigger's inform_new_model method to update the cost tracker + CostTrigger.inform_new_model(self, most_recent_model_id, number_samples, training_time) + + # Call the internal PerformanceTriggerMixin's inform_new_model method to update the performance tracker + PerformanceTriggerMixin._inform_new_model(self, most_recent_model_id, self._last_detection_interval) + + # ---------------------------------------------------------------------------------------------------------------- # + # INTERNAL # + # ---------------------------------------------------------------------------------------------------------------- # + + @override + def _compute_regret_metric(self, batch: list[tuple[int, int]], batch_start: int, batch_duration: int) -> float: + """Compute the regret metric for the current state of the trigger.""" + + self.data_density.inform_data(batch) + num_samples, num_misclassifications, evaluation_scores = self._run_evaluation(interval_data=batch) + + self.performance_tracker.inform_evaluation( + num_samples=num_samples, + num_misclassifications=num_misclassifications, + evaluation_scores=evaluation_scores, + ) + + estimated_new_avoidable_misclassifications, _ = ( + self.misclassification_estimator.estimate_avoidable_misclassifications( + update_interval_samples=self.config.evaluation_interval_data_points, + data_density=self.data_density, + performance_tracker=self.performance_tracker, + method=self.config.forecasting_method, + ) + ) + + # Let's build a latency regret metrics based on the estimated number of avoidable misclassifications. + # Using avoidable misclassification latency makes sense because we generally aim to to trigger + # when many misclassifications could have been avoided. We therefore try to minimize the time that + # misclassifications remain unaddressed while an old model is still in use. + # We chose the latency based area under curve method as a linear model based on the absolute number of + # avoidable misclassifications seems unstable. More advantageous regret non-linear regret functions + # could be explored in the future. + return self._incorporation_latency_tracker.add_latency( + estimated_new_avoidable_misclassifications, batch_duration + ) diff --git a/modyn/supervisor/internal/triggers/batchedtrigger.py b/modyn/supervisor/internal/triggers/batchedtrigger.py index b0e2705b1..b6d4f868f 100644 --- a/modyn/supervisor/internal/triggers/batchedtrigger.py +++ b/modyn/supervisor/internal/triggers/batchedtrigger.py @@ -22,6 +22,7 @@ def __init__(self, config: BatchedTriggerConfig) -> None: # allows to detect drift in a fixed interval self._sample_left_until_detection = config.evaluation_interval_data_points + self._last_detection_interval: list[tuple[int, int]] = [] self._leftover_data: list[tuple[int, int]] = [] """Stores data that was not processed in the last inform call because @@ -69,6 +70,7 @@ def inform( # ----------------------------------------------- Detection ---------------------------------------------- # triggered = self._evaluate_batch(next_detection_interval, trigger_candidate_idx) + self._last_detection_interval = next_detection_interval # ----------------------------------------------- Response ----------------------------------------------- # diff --git a/modyn/supervisor/internal/triggers/cost/avoidablemissclassification_costtrigger.py b/modyn/supervisor/internal/triggers/cost/avoidablemissclassification_costtrigger.py deleted file mode 100644 index 0a949c4cb..000000000 --- a/modyn/supervisor/internal/triggers/cost/avoidablemissclassification_costtrigger.py +++ /dev/null @@ -1,50 +0,0 @@ -from __future__ import annotations - -import logging - -from typing_extensions import override - -from modyn.config.schema.pipeline.trigger.cost.cost import ( - AvoidableMisclassificationCostTriggerConfig, -) -from modyn.supervisor.internal.triggers.cost.costtrigger import CostTrigger -from modyn.supervisor.internal.triggers.trigger import TriggerContext - -logger = logging.getLogger(__name__) - - -class AvoidableMisclassificationCostTrigger(CostTrigger): - """Triggers when the avoidable misclassification cost incorporation latency - (regret) exceeds the estimated training time.""" - - def __init__(self, config: AvoidableMisclassificationCostTriggerConfig): - super().__init__(config) - self.config = config - self.context: TriggerContext | None = None - - self._sample_left_until_detection = config.evaluation_interval_data_points - self._triggered_once = False - - @override - def init_trigger(self, context: TriggerContext) -> None: - self.context = context - - @override - def inform_new_model( - self, - most_recent_model_id: int, - number_samples: int | None = None, - training_time: float | None = None, - ) -> None: - """Update the cost and performance trackers with the new model - metadata.""" - super().inform_new_model(most_recent_model_id, number_samples, training_time) - - # ---------------------------------------------------------------------------------------------------------------- # - # INTERNAL # - # ---------------------------------------------------------------------------------------------------------------- # - - @override - def _compute_regret_metric(self, batch: list[tuple[int, int]], batch_start: int, batch_duration: int) -> float: - """Compute the regret metric for the current state of the trigger.""" - raise NotImplementedError() diff --git a/modyn/supervisor/internal/triggers/cost/incorporation_latency_tracker.py b/modyn/supervisor/internal/triggers/cost/incorporation_latency_tracker.py index dfc333e46..52c4a5391 100644 --- a/modyn/supervisor/internal/triggers/cost/incorporation_latency_tracker.py +++ b/modyn/supervisor/internal/triggers/cost/incorporation_latency_tracker.py @@ -31,8 +31,11 @@ def add_latency(self, regret: float, batch_duration: float) -> float: Returns: Most recent cumulative regret value. """ + + # newly arrived `regret` has existed for `batch_duration / 2` seconds on average; + # old regret persists for the entire `batch_duration` + self._cumulative_latency_regret += self._current_regret * batch_duration + regret * (batch_duration / 2.0) self._current_regret += regret - self._cumulative_latency_regret += self._current_regret * batch_duration return self._cumulative_latency_regret diff --git a/modyn/supervisor/internal/triggers/cost/costtrigger.py b/modyn/supervisor/internal/triggers/costtrigger.py similarity index 94% rename from modyn/supervisor/internal/triggers/cost/costtrigger.py rename to modyn/supervisor/internal/triggers/costtrigger.py index ce3e4b81c..8cc35ba54 100644 --- a/modyn/supervisor/internal/triggers/cost/costtrigger.py +++ b/modyn/supervisor/internal/triggers/costtrigger.py @@ -29,13 +29,10 @@ def __init__(self, config: CostTriggerConfig): self.config = config self.context: TriggerContext | None = None - self._sample_left_until_detection = config.evaluation_interval_data_points self._triggered_once = False self._previous_batch_end_time: int | None = None - self._leftover_data: list[tuple[int, int]] = [] - """Stores data that was not processed in the last inform call because - the detection interval was not filled.""" + # cost information self._unincorporated_samples = 0 self._cost_tracker = CostTracker(config.cost_tracking_window_size) self._incorporation_latency_tracker = IncorporationLatencyTracker() diff --git a/modyn/supervisor/internal/triggers/cost/dataincorporationlatency_costtrigger.py b/modyn/supervisor/internal/triggers/dataincorporationlatency_costtrigger.py similarity index 94% rename from modyn/supervisor/internal/triggers/cost/dataincorporationlatency_costtrigger.py rename to modyn/supervisor/internal/triggers/dataincorporationlatency_costtrigger.py index d6e3e9e71..ce7558481 100644 --- a/modyn/supervisor/internal/triggers/cost/dataincorporationlatency_costtrigger.py +++ b/modyn/supervisor/internal/triggers/dataincorporationlatency_costtrigger.py @@ -7,7 +7,7 @@ from modyn.config.schema.pipeline.trigger.cost.cost import ( AvoidableMisclassificationCostTriggerConfig, ) -from modyn.supervisor.internal.triggers.cost.costtrigger import CostTrigger +from modyn.supervisor.internal.triggers.costtrigger import CostTrigger logger = logging.getLogger(__name__) diff --git a/modyn/supervisor/internal/triggers/performance/decision_policy.py b/modyn/supervisor/internal/triggers/performance/decision_policy.py index 3c4f9370a..b1431ced3 100644 --- a/modyn/supervisor/internal/triggers/performance/decision_policy.py +++ b/modyn/supervisor/internal/triggers/performance/decision_policy.py @@ -9,6 +9,9 @@ from modyn.supervisor.internal.triggers.performance.data_density_tracker import ( DataDensityTracker, ) +from modyn.supervisor.internal.triggers.performance.misclassification_estimator import ( + NumberAvoidableMisclassificationEstimator, +) from modyn.supervisor.internal.triggers.performance.performance_tracker import ( PerformanceTracker, ) @@ -21,7 +24,7 @@ class PerformanceDecisionPolicy(ABC): @abstractmethod def evaluate_decision( self, - update_interval: int, + update_interval_samples: int, evaluation_scores: dict[str, float], data_density: DataDensityTracker, performance_tracker: PerformanceTracker, @@ -36,7 +39,7 @@ def evaluate_decision( Also, data_density has to be updated with the new data interval. Args: - update_interval: The interval in which the decision is made. + update_interval_samples: The interval in which the decision is made. performance: The observed performance metric. mode: The mode in which the decision should be evaluated. data_density: The data density tracker, updated with the new data interval. @@ -59,7 +62,7 @@ def __init__(self, config: StaticPerformanceThresholdCriterion): def evaluate_decision( self, - update_interval: int, + update_interval_samples: int, evaluation_scores: dict[str, float], data_density: DataDensityTracker, performance_tracker: PerformanceTracker, @@ -87,7 +90,7 @@ def __init__(self, config: DynamicPerformanceThresholdCriterion): def evaluate_decision( self, - update_interval: int, + update_interval_samples: int, evaluation_scores: dict[str, float], data_density: DataDensityTracker, performance_tracker: PerformanceTracker, @@ -116,11 +119,14 @@ def __init__(self, config: StaticNumberAvoidableMisclassificationCriterion): self.config = config self.cumulated_avoidable_misclassifications = 0 + self.misclassification_estimator = NumberAvoidableMisclassificationEstimator( + expected_accuracy=config.expected_accuracy, + allow_reduction=config.allow_reduction, + ) - # pylint: disable=too-many-locals def evaluate_decision( self, - update_interval: int, + update_interval_samples: int, evaluation_scores: dict[str, float], data_density: DataDensityTracker, performance_tracker: PerformanceTracker, @@ -151,45 +157,29 @@ def evaluate_decision( This forward looking approach tries to avoid exceeding the misclassification budget in the first place. """ - # compute the number of avoidable misclassifications by retrieving the actual misclassifications - # and the expected misclassifications through the expected accuracy for the last interval. - previous_interval_num_misclassifications = performance_tracker.previous_batch_num_misclassifications - - # the expected performance won't change unless there's a trigger - expected_accuracy = ( - performance_tracker.forecast_expected_accuracy(method=method) - if self.config.expected_accuracy is None - else self.config.expected_accuracy + new_avoidable_misclassifications, forecast_new_avoidable_misclassifications = ( + self.misclassification_estimator.estimate_avoidable_misclassifications( + update_interval_samples=update_interval_samples, + data_density=data_density, + performance_tracker=performance_tracker, + method=method, + ) ) - previous_expected_num_misclassifications = (1 - expected_accuracy) * data_density.previous_batch_num_samples - new_avoidable_misclassifications = ( - previous_interval_num_misclassifications - previous_expected_num_misclassifications + self.cumulated_avoidable_misclassifications += round(new_avoidable_misclassifications) + hindsight_exceeded = ( + self.cumulated_avoidable_misclassifications >= self.config.avoidable_misclassification_threshold ) - if new_avoidable_misclassifications < 0 and not self.config.allow_reduction: - new_avoidable_misclassifications = 0 - self.cumulated_avoidable_misclassifications += round(new_avoidable_misclassifications) + if hindsight_exceeded: + # past misclassifications already exceed the threshold, forecasting not needed + return True if mode == "hindsight": - return self.cumulated_avoidable_misclassifications >= self.config.avoidable_misclassification_threshold + return False if mode == "lookahead": - # past misclassifications already exceed the threshold, forecasting not needed - if self.cumulated_avoidable_misclassifications >= self.config.avoidable_misclassification_threshold: - return True - - forecasted_data_density = data_density.forecast_density(method=method) - forecast_accuracy = performance_tracker.forecast_next_accuracy(method=method) - - accuracy_delta = expected_accuracy - forecast_accuracy - if accuracy_delta < 0 and not self.config.allow_reduction: - accuracy_delta = 0 - - # new misclassification = accuracy * samples; samples = data_density * interval_duration - forecast_new_avoidable_misclassifications = accuracy_delta * forecasted_data_density * update_interval - - forecasted_misclassifications = ( - self.cumulated_avoidable_misclassifications + forecast_new_avoidable_misclassifications + forecasted_misclassifications = self.cumulated_avoidable_misclassifications + round( + forecast_new_avoidable_misclassifications ) return forecasted_misclassifications >= self.config.avoidable_misclassification_threshold diff --git a/modyn/supervisor/internal/triggers/performance/misclassification_estimator.py b/modyn/supervisor/internal/triggers/performance/misclassification_estimator.py new file mode 100644 index 000000000..1a5c1e75a --- /dev/null +++ b/modyn/supervisor/internal/triggers/performance/misclassification_estimator.py @@ -0,0 +1,77 @@ +from modyn.const.types import ForecastingMethod +from modyn.supervisor.internal.triggers.performance.data_density_tracker import ( + DataDensityTracker, +) +from modyn.supervisor.internal.triggers.performance.performance_tracker import ( + PerformanceTracker, +) + + +class NumberAvoidableMisclassificationEstimator: + """Immutable class offering estimation functionality for the number of + avoidable misclassifications. + + Used in `StaticNumberAvoidableMisclassificationDecisionPolicy` to make decisions based on the cumulated + avoidable misclassifications. + """ + + def __init__(self, expected_accuracy: float | None = None, allow_reduction: bool = False): + """ + Args: + threshold: The threshold of cumulated avoidable misclassifications. + """ + + self.expected_accuracy = expected_accuracy + self.allow_reduction = allow_reduction + + def estimate_avoidable_misclassifications( + self, + update_interval_samples: int, + data_density: DataDensityTracker, + performance_tracker: PerformanceTracker, + method: ForecastingMethod, + ) -> tuple[float, float]: + """Utilizes the state of `DataDensityTracker` and `PerformanceTracker` + to estimate the number of avoidable misclassifications. + + We support both the "hindsight" and "forecast" mode. + + In the "hindsight" mode, the decision is made based on the current performance and the cumulated avoidable misclassifications. + + Returns: + Tuple of the number of avoidable misclassifications and the forecasted number of avoidable misclassifications. + """ + + # --------------------------------- compute new_avoidable_misclassifications --------------------------------- # + # compute the number of avoidable misclassifications by retrieving the actual misclassifications + # and the expected misclassifications through the expected accuracy for the last interval. + previous_interval_num_misclassifications = performance_tracker.previous_batch_num_misclassifications + + # the expected performance won't change unless there's a trigger + expected_accuracy = ( + performance_tracker.forecast_expected_accuracy(method=method) + if self.expected_accuracy is None + else self.expected_accuracy + ) + previous_expected_num_misclassifications = (1 - expected_accuracy) * data_density.previous_batch_num_samples + new_avoidable_misclassifications = ( + previous_interval_num_misclassifications - previous_expected_num_misclassifications + ) + if new_avoidable_misclassifications < 0 and not self.allow_reduction: + new_avoidable_misclassifications = 0 + + # --------------------------------- compute new_avoidable_misclassifications --------------------------------- # + # forecasted_data_density = data_density.forecast_density(method=method) + forecast_accuracy = performance_tracker.forecast_next_accuracy(method=method) + + accuracy_delta = expected_accuracy - forecast_accuracy + if accuracy_delta < 0 and not self.allow_reduction: + accuracy_delta = 0 + + # new misclassification = accuracy * samples; samples = data_density * interval_duration + forecast_new_avoidable_misclassifications = accuracy_delta * update_interval_samples + + return ( + new_avoidable_misclassifications, + forecast_new_avoidable_misclassifications, + ) diff --git a/modyn/supervisor/internal/triggers/performance/performancetrigger_mixin.py b/modyn/supervisor/internal/triggers/performance/performancetrigger_mixin.py new file mode 100644 index 000000000..cba14d332 --- /dev/null +++ b/modyn/supervisor/internal/triggers/performance/performancetrigger_mixin.py @@ -0,0 +1,151 @@ +from modyn.config.schema.pipeline.trigger.performance.performance import ( + _InternalPerformanceTriggerConfig, +) +from modyn.evaluator.internal.core_evaluation import perform_evaluation, setup_metrics +from modyn.evaluator.internal.metrics.accuracy import Accuracy +from modyn.supervisor.internal.triggers.performance.data_density_tracker import ( + DataDensityTracker, +) +from modyn.supervisor.internal.triggers.performance.performance_tracker import ( + PerformanceTracker, +) +from modyn.supervisor.internal.triggers.trigger import TriggerContext +from modyn.supervisor.internal.triggers.utils.datasets.dataloader_info import ( + DataLoaderInfo, +) +from modyn.supervisor.internal.triggers.utils.datasets.prepare_dataloader import ( + prepare_trigger_dataloader_fixed_keys, +) +from modyn.supervisor.internal.triggers.utils.model.downloader import ModelDownloader +from modyn.supervisor.internal.triggers.utils.model.stateful_model import StatefulModel +from modyn.utils.utils import LABEL_TRANSFORMER_FUNC_NAME, deserialize_function + + +class PerformanceTriggerMixin: + """Mixin that provides internal functionality for performance triggers but + not the trigger policy itself.""" + + def __init__(self, config: _InternalPerformanceTriggerConfig) -> None: + self.config = config + self.context: TriggerContext | None = None + + self.data_density = DataDensityTracker(config.data_density_window_size) + self.performance_tracker = PerformanceTracker(config.performance_triggers_window_size) + + self.model_refresh_needed = False + self.most_recent_model_id: int | None = None + self.dataloader_info: DataLoaderInfo | None = None + self.model_downloader: ModelDownloader | None = None + self.model: StatefulModel | None = None + + self._metrics = setup_metrics(config.evaluation.dataset.metrics) + + self._label_transformer_function = ( + deserialize_function( + config.evaluation.label_transformer_function, + LABEL_TRANSFORMER_FUNC_NAME, + ) + if config.evaluation.label_transformer_function + else None + ) + + # ---------------------------------------------------------------------------------------------------------------- # + # Internal # + # ---------------------------------------------------------------------------------------------------------------- # + + def _init_trigger(self, context: TriggerContext) -> None: + self.context = context + self._init_dataloader_info() + self._init_model_downloader() + + def _inform_new_model(self, most_recent_model_id: int, last_detection_interval: list[tuple[int, int]]) -> None: + """Needs to called by the subclass's inform_new_model method.""" + + self.most_recent_model_id = most_recent_model_id + self.model_refresh_needed = True + + # Perform an evaluation of the NEW model on the last evaluation interval, we will derive expected performance + # forecasts from these evaluations. + num_samples, num_misclassifications, evaluation_scores = self._run_evaluation( + interval_data=last_detection_interval + ) + + self.performance_tracker.inform_trigger( + num_samples=num_samples, + num_misclassifications=num_misclassifications, + evaluation_scores=evaluation_scores, + ) + + def _run_evaluation( + self, + interval_data: list[tuple[int, int]], + ) -> tuple[int, int, dict[str, float]]: # pragma: no cover + """Run the evaluation on the given interval data.""" + assert self.most_recent_model_id is not None + assert self.dataloader_info is not None + assert self.model_downloader is not None + assert self.context and self.context.pipeline_config is not None + + evaluation_dataloader = prepare_trigger_dataloader_fixed_keys( + self.dataloader_info, [key for key, _ in interval_data] + ) + + # Download most recent model as stateful model + if self.model_refresh_needed: + self.model = self.model_downloader.setup_manager( + self.most_recent_model_id, self.context.pipeline_config.training.device + ) + self.model_refresh_needed = False + + # Run evaluation + assert self.model is not None + + eval_results = perform_evaluation( + model=self.model, + dataloader=evaluation_dataloader, + device=self.config.evaluation.device, + metrics=self._metrics, + label_transformer_function=self._label_transformer_function, + amp=False, + ) + + evaluation_scores = { + metric_name: metric.get_evaluation_result() for metric_name, metric in self._metrics.items() + } + + accuracy_metric = eval_results.metrics_data["Accuracy"] + assert isinstance(accuracy_metric, Accuracy) + num_misclassifications = accuracy_metric.samples_seen - accuracy_metric.total_correct + + return (eval_results.num_samples, num_misclassifications, evaluation_scores) + + def _init_dataloader_info(self) -> None: + assert self.context + + training_config = self.context.pipeline_config.training + data_config = self.context.pipeline_config.data + + self.dataloader_info = DataLoaderInfo( + pipeline_id=self.context.pipeline_id, + dataset_id=data_config.dataset_id, + num_dataloaders=training_config.dataloader_workers, + batch_size=training_config.batch_size, + bytes_parser=data_config.bytes_parser_function, + transform_list=data_config.transformations, + storage_address=f"{self.context.modyn_config.storage.address}", + selector_address=f"{self.context.modyn_config.selector.address}", + num_prefetched_partitions=training_config.num_prefetched_partitions, + parallel_prefetch_requests=training_config.parallel_prefetch_requests, + shuffle=training_config.shuffle, + tokenizer=data_config.tokenizer, + ) + + def _init_model_downloader(self) -> None: + assert self.context is not None + + self.model_downloader = ModelDownloader( + self.context.modyn_config, + self.context.pipeline_id, + self.context.base_dir, + f"{self.context.modyn_config.modyn_model_storage.address}", + ) diff --git a/modyn/supervisor/internal/triggers/performancetrigger.py b/modyn/supervisor/internal/triggers/performancetrigger.py index 87ce43932..f9cce9cc1 100644 --- a/modyn/supervisor/internal/triggers/performancetrigger.py +++ b/modyn/supervisor/internal/triggers/performancetrigger.py @@ -12,40 +12,26 @@ from modyn.config.schema.pipeline.trigger.performance.performance import ( PerformanceTriggerConfig, ) -from modyn.evaluator.internal.core_evaluation import perform_evaluation, setup_metrics -from modyn.evaluator.internal.metrics.accuracy import Accuracy from modyn.supervisor.internal.triggers.batchedtrigger import BatchedTrigger -from modyn.supervisor.internal.triggers.performance.data_density_tracker import ( - DataDensityTracker, -) from modyn.supervisor.internal.triggers.performance.decision_policy import ( DynamicPerformanceThresholdDecisionPolicy, PerformanceDecisionPolicy, StaticNumberAvoidableMisclassificationDecisionPolicy, StaticPerformanceThresholdDecisionPolicy, ) -from modyn.supervisor.internal.triggers.performance.performance_tracker import ( - PerformanceTracker, +from modyn.supervisor.internal.triggers.performance.performancetrigger_mixin import ( + PerformanceTriggerMixin, ) from modyn.supervisor.internal.triggers.trigger import TriggerContext -from modyn.supervisor.internal.triggers.utils.datasets.dataloader_info import ( - DataLoaderInfo, -) -from modyn.supervisor.internal.triggers.utils.datasets.prepare_dataloader import ( - prepare_trigger_dataloader_fixed_keys, -) -from modyn.supervisor.internal.triggers.utils.model.downloader import ModelDownloader -from modyn.supervisor.internal.triggers.utils.model.stateful_model import StatefulModel from modyn.supervisor.internal.triggers.utils.models import ( PerformanceTriggerEvalLog, TriggerPolicyEvaluationLog, ) -from modyn.utils.utils import LABEL_TRANSFORMER_FUNC_NAME, deserialize_function logger = logging.getLogger(__name__) -class PerformanceTrigger(BatchedTrigger): +class PerformanceTrigger(BatchedTrigger, PerformanceTriggerMixin): """Trigger based on the performance of the model. We support a simple performance drift approach that compares the @@ -58,40 +44,20 @@ class PerformanceTrigger(BatchedTrigger): """ def __init__(self, config: PerformanceTriggerConfig) -> None: - super().__init__(config) + BatchedTrigger.__init__(self, config) + PerformanceTriggerMixin.__init__(self, config) self.config = config self.context: TriggerContext | None = None - self.most_recent_model_id: int | None = None - - self.dataloader_info: DataLoaderInfo | None = None - self.model_downloader: ModelDownloader | None = None - self.model: StatefulModel | None = None - - self.data_density = DataDensityTracker(config.data_density_window_size) - self.performance_tracker = PerformanceTracker(config.performance_triggers_window_size) self.decision_policies = _setup_decision_policies(config) self._triggered_once = False - self.model_refresh_needed = False - self._metrics = setup_metrics(config.evaluation.dataset.metrics) - self._last_detection_interval: list[tuple[int, int]] = [] - - self._label_transformer_function = ( - deserialize_function( - config.evaluation.label_transformer_function, - LABEL_TRANSFORMER_FUNC_NAME, - ) - if config.evaluation.label_transformer_function - else None - ) @override def init_trigger(self, context: TriggerContext) -> None: - self.context = context - self._init_dataloader_info() - self._init_model_downloader() + # Call PerformanceTriggerMixin's init_trigger method to initialize the internal performance detection state + PerformanceTriggerMixin._init_trigger(self, context) @override def _evaluate_batch( @@ -101,10 +67,11 @@ def _evaluate_batch( log: TriggerPolicyEvaluationLog | None = None, ) -> bool: # Run the evaluation (even if we don't use the result, e.g. for the first forced trigger) - self._last_detection_interval = batch self.data_density.inform_data(batch) - num_samples, num_misclassifications, evaluation_scores = self._run_evaluation(interval_data=batch) + num_samples, num_misclassifications, evaluation_scores = PerformanceTriggerMixin._run_evaluation( + self, interval_data=batch + ) self.performance_tracker.inform_evaluation( num_samples=num_samples, @@ -128,7 +95,7 @@ def _evaluate_batch( triggered = False for policy_name, policy in self.decision_policies.items(): policy_decisions[policy_name] = policy.evaluate_decision( - update_interval=self.config.evaluation_interval_data_points, + update_interval_samples=self.config.evaluation_interval_data_points, evaluation_scores=evaluation_scores, data_density=self.data_density, performance_tracker=self.performance_tracker, @@ -165,95 +132,8 @@ def inform_new_model( number_samples: int | None = None, training_time: float | None = None, ) -> None: - self.most_recent_model_id = most_recent_model_id - self.model_refresh_needed = True - - # Perform an evaluation of the NEW model on the last evaluation interval, we will derive expected performance - # forecasts from these evaluations. - num_samples, num_misclassifications, evaluation_scores = self._run_evaluation( - interval_data=self._last_detection_interval - ) - - self.performance_tracker.inform_trigger( - num_samples=num_samples, - num_misclassifications=num_misclassifications, - evaluation_scores=evaluation_scores, - ) - - # ---------------------------------------------------------------------------------------------------------------- # - # Internal # - # ---------------------------------------------------------------------------------------------------------------- # - - def _run_evaluation(self, interval_data: list[tuple[int, int]]) -> tuple[int, int, dict[str, float]]: - """Run the evaluation on the given interval data.""" - assert self.most_recent_model_id is not None - assert self.dataloader_info is not None - assert self.model_downloader is not None - assert self.context and self.context.pipeline_config is not None - - evaluation_dataloader = prepare_trigger_dataloader_fixed_keys( - self.dataloader_info, [key for key, _ in interval_data] - ) - - # Download most recent model as stateful model - if self.model_refresh_needed: - self.model = self.model_downloader.setup_manager( - self.most_recent_model_id, self.context.pipeline_config.training.device - ) - self.model_refresh_needed = False - - # Run evaluation - assert self.model is not None - - eval_results = perform_evaluation( - model=self.model, - dataloader=evaluation_dataloader, - device=self.config.evaluation.device, - metrics=self._metrics, - label_transformer_function=self._label_transformer_function, - amp=False, - ) - - evaluation_scores = { - metric_name: metric.get_evaluation_result() for metric_name, metric in self._metrics.items() - } - - accuracy_metric = eval_results.metrics_data["Accuracy"] - assert isinstance(accuracy_metric, Accuracy) - num_misclassifications = accuracy_metric.samples_seen - accuracy_metric.total_correct - - return (eval_results.num_samples, num_misclassifications, evaluation_scores) - - def _init_dataloader_info(self) -> None: - assert self.context - - training_config = self.context.pipeline_config.training - data_config = self.context.pipeline_config.data - - self.dataloader_info = DataLoaderInfo( - self.context.pipeline_id, - dataset_id=data_config.dataset_id, - num_dataloaders=training_config.dataloader_workers, - batch_size=training_config.batch_size, - bytes_parser=data_config.bytes_parser_function, - transform_list=data_config.transformations, - storage_address=f"{self.context.modyn_config.storage.address}", - selector_address=f"{self.context.modyn_config.selector.address}", - num_prefetched_partitions=training_config.num_prefetched_partitions, - parallel_prefetch_requests=training_config.parallel_prefetch_requests, - shuffle=training_config.shuffle, - tokenizer=data_config.tokenizer, - ) - - def _init_model_downloader(self) -> None: - assert self.context is not None - - self.model_downloader = ModelDownloader( - self.context.modyn_config, - self.context.pipeline_id, - self.context.base_dir, - f"{self.context.modyn_config.modyn_model_storage.address}", - ) + # Delegate to internal implementation + PerformanceTriggerMixin._inform_new_model(self, most_recent_model_id, self._last_detection_interval) def _setup_decision_policies( diff --git a/modyn/tests/supervisor/internal/triggers/cost/test_incorporation_latency_tracker.py b/modyn/tests/supervisor/internal/triggers/cost/test_incorporation_latency_tracker.py index d91982683..bb54de7e6 100644 --- a/modyn/tests/supervisor/internal/triggers/cost/test_incorporation_latency_tracker.py +++ b/modyn/tests/supervisor/internal/triggers/cost/test_incorporation_latency_tracker.py @@ -7,20 +7,20 @@ def test_incorporation_latency_tracker() -> None: tracker = IncorporationLatencyTracker() assert tracker.cumulative_latency_regret == 0.0 - cumulative_regret = tracker.add_latency(1.0, 2.0) + cumulative_regret1 = tracker.add_latency(1.0, 2.0) assert tracker._current_regret == 1.0 - assert cumulative_regret == tracker.cumulative_latency_regret == 2.0 + assert cumulative_regret1 == tracker.cumulative_latency_regret == 0.0 + 0.5 * 2.0 - cumulative_regret = tracker.add_latency(0.5, 3.0) + cumulative_regret2 = tracker.add_latency(0.5, 4.0) assert tracker._current_regret == 1.5 - assert cumulative_regret == tracker.cumulative_latency_regret == 2.0 + 1.5 * 3.0 + assert cumulative_regret2 == tracker.cumulative_latency_regret == cumulative_regret1 + 1.0 * 4.0 + 0.5 * 4.0 * 0.5 tracker.inform_trigger() assert tracker.cumulative_latency_regret == 0.0 - cumulative_regret = tracker.add_latency(2.0, 1.0) + cumulative_regret3 = tracker.add_latency(2.0, 1.0) assert tracker._current_regret == 2.0 - assert cumulative_regret == tracker.cumulative_latency_regret == 2.0 + assert cumulative_regret3 == tracker.cumulative_latency_regret == 2.0 * 1.0 * 0.5 def test_add_latencies() -> None: diff --git a/modyn/tests/supervisor/internal/triggers/performance/test_performance_decision_policy.py b/modyn/tests/supervisor/internal/triggers/performance/test_performance_decision_policy.py index b8adaa1eb..686c03b6b 100644 --- a/modyn/tests/supervisor/internal/triggers/performance/test_performance_decision_policy.py +++ b/modyn/tests/supervisor/internal/triggers/performance/test_performance_decision_policy.py @@ -35,11 +35,11 @@ def dynamic_threshold_policy() -> DynamicPerformanceThresholdDecisionPolicy: @pytest.fixture -def misclassification_policy() -> StaticNumberAvoidableMisclassificationDecisionPolicy: - return StaticNumberAvoidableMisclassificationDecisionPolicy( - config=StaticNumberAvoidableMisclassificationCriterion( - allow_reduction=True, avoidable_misclassification_threshold=10 - ) +def misclassification_criterion() -> StaticNumberAvoidableMisclassificationCriterion: + # Using the config model criterion instead of the final policy to allow for adjustments of the config + # in the tests before instantiating the policy + return StaticNumberAvoidableMisclassificationCriterion( + allow_reduction=True, avoidable_misclassification_threshold=10 ) @@ -70,7 +70,7 @@ def test_static_performance_hindsight( ) -> None: """Test static threshold decision policy in hindsight mode.""" eval_decision_kwargs = { - "update_interval": 10, + "update_interval_samples": 10, "data_density": dummy_data_density_tracker, "performance_tracker": dummy_performance_tracker, "mode": "hindsight", @@ -92,7 +92,7 @@ def test_static_performance_forecast( ) -> None: """Test static threshold decision policy in lookahead mode.""" eval_decision_kwargs = { - "update_interval": 10, + "update_interval_samples": 10, "data_density": dummy_data_density_tracker, "performance_tracker": dummy_performance_tracker, "mode": "lookahead", @@ -123,7 +123,7 @@ def test_dynamic_performance_hindsight( """Test dynamic threshold decision policy in hindsight mode.""" eval_decision_kwargs = { - "update_interval": 10, + "update_interval_samples": 10, "data_density": dummy_data_density_tracker, "performance_tracker": dummy_performance_tracker, "mode": "hindsight", @@ -157,7 +157,7 @@ def test_dynamic_performance_forecast( """Test dynamic threshold decision policy in forecast mode.""" eval_decision_kwargs = { - "update_interval": 10, + "update_interval_samples": 10, "data_density": dummy_data_density_tracker, "performance_tracker": dummy_performance_tracker, "mode": "lookahead", @@ -182,9 +182,6 @@ def test_dynamic_performance_forecast( # -------------------------------------------------------------------------------------------------------------------- # -# TODO: self.config.expected_accuracy - - @patch.object( DataDensityTracker, "previous_batch_num_samples", @@ -196,6 +193,7 @@ def test_dynamic_performance_forecast( "forecast_expected_accuracy", side_effect=[1.0, 0.95, 1.0, 1.0, 1.0], ) +@patch.object(PerformanceTracker, "forecast_next_accuracy", return_value=-100) # unused dummy @patch.object( PerformanceTracker, "previous_batch_num_misclassifications", @@ -204,16 +202,19 @@ def test_dynamic_performance_forecast( ) def test_misclassification_hindsight( mock_previous_batch_num_misclassifications: MagicMock, + mock_forecast_next_accuracy: MagicMock, mock_forecast_expected_accuracy: MagicMock, mock_previous_batch_num_samples: MagicMock, dummy_performance_tracker: PerformanceTracker, dummy_data_density_tracker: DataDensityTracker, - misclassification_policy: StaticNumberAvoidableMisclassificationDecisionPolicy, + misclassification_criterion: StaticNumberAvoidableMisclassificationCriterion, ) -> None: """Test static number avoidable misclassification policy in hindsight mode.""" + misclassification_policy = StaticNumberAvoidableMisclassificationDecisionPolicy(config=misclassification_criterion) + eval_decision_kwargs = { - "update_interval": 10, + "update_interval_samples": 10, "data_density": dummy_data_density_tracker, "performance_tracker": dummy_performance_tracker, "mode": "hindsight", @@ -261,6 +262,7 @@ def test_misclassification_hindsight( new_callable=PropertyMock, ) @patch.object(PerformanceTracker, "forecast_expected_accuracy", side_effect=[0, 0, 0]) +@patch.object(PerformanceTracker, "forecast_next_accuracy", return_value=-100) # unused dummy @patch.object( PerformanceTracker, "previous_batch_num_misclassifications", @@ -269,19 +271,21 @@ def test_misclassification_hindsight( ) def test_misclassification_static_expected_performance( mock_previous_batch_num_misclassifications: MagicMock, + mock_forecast_next_accuracy: MagicMock, mock_forecast_expected_accuracy: MagicMock, mock_previous_batch_num_samples: MagicMock, dummy_performance_tracker: PerformanceTracker, dummy_data_density_tracker: DataDensityTracker, - misclassification_policy: StaticNumberAvoidableMisclassificationDecisionPolicy, + misclassification_criterion: StaticNumberAvoidableMisclassificationCriterion, ) -> None: """Test static number avoidable misclassification policy in hindsight mode.""" - misclassification_policy.config.expected_accuracy = 0.25 - misclassification_policy.config.allow_reduction = True + misclassification_criterion.expected_accuracy = 0.25 + misclassification_criterion.allow_reduction = True + misclassification_policy = StaticNumberAvoidableMisclassificationDecisionPolicy(config=misclassification_criterion) eval_decision_kwargs = { - "update_interval": 10, + "update_interval_samples": 10, "data_density": dummy_data_density_tracker, "performance_tracker": dummy_performance_tracker, "mode": "hindsight", @@ -304,7 +308,9 @@ def test_misclassification_static_expected_performance( assert mock_previous_batch_num_samples.call_count == 2 # forbid reduction: with reduction cumulated_avoidable_misclassifications reduced from 5 to 0, here constant at 5 - misclassification_policy.config.allow_reduction = False + misclassification_criterion.allow_reduction = False + misclassification_policy = StaticNumberAvoidableMisclassificationDecisionPolicy(config=misclassification_criterion) + misclassification_policy.cumulated_avoidable_misclassifications = 5 assert not misclassification_policy.evaluate_decision(**eval_decision_kwargs, evaluation_scores={"acc": 0}) assert misclassification_policy.cumulated_avoidable_misclassifications == 5 assert mock_previous_batch_num_misclassifications.call_count == 3 @@ -321,8 +327,8 @@ def test_misclassification_static_expected_performance( new_callable=PropertyMock, ) @patch.object(DataDensityTracker, "forecast_density", side_effect=[1, 1]) -@patch.object(PerformanceTracker, "forecast_expected_accuracy", side_effect=[]) -@patch.object(PerformanceTracker, "forecast_next_accuracy", side_effect=[0.4, 0.2]) +@patch.object(PerformanceTracker, "forecast_expected_accuracy", return_value=-100) +@patch.object(PerformanceTracker, "forecast_next_accuracy", side_effect=[-100, 0.4, 0.2]) @patch.object(PerformanceTracker, "forecast_next_performance", side_effect=[]) @patch.object( PerformanceTracker, @@ -339,11 +345,12 @@ def test_misclassification_forecast( mock_previous_batch_num_samples: MagicMock, dummy_performance_tracker: PerformanceTracker, dummy_data_density_tracker: DataDensityTracker, - misclassification_policy: StaticNumberAvoidableMisclassificationDecisionPolicy, + misclassification_criterion: StaticNumberAvoidableMisclassificationCriterion, ) -> None: """Test static number avoidable misclassification policy in forecast mode.""" - misclassification_policy.config.expected_accuracy = 0.5 + misclassification_criterion.expected_accuracy = 0.5 + misclassification_policy = StaticNumberAvoidableMisclassificationDecisionPolicy(config=misclassification_criterion) eval_decision_kwargs = { "data_density": dummy_data_density_tracker, @@ -357,30 +364,32 @@ def test_misclassification_forecast( # 50 expected misclassifications, 60 observed misclassifications # 10 past avoidable misclassifications, forecasting not needed, already exceeding assert misclassification_policy.evaluate_decision( - **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval=0 + **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval_samples=0 ) assert misclassification_policy.cumulated_avoidable_misclassifications == 10 assert mock_previous_batch_num_misclassifications.call_count == 1 assert mock_previous_batch_num_samples.call_count == 1 - assert mock_forecast_next_accuracy.call_count == 0 + assert mock_forecast_next_accuracy.call_count == 1 assert mock_forecast_density.call_count == 0 misclassification_policy.inform_trigger() # reset misclassifications mock_previous_batch_num_misclassifications.reset_mock() mock_previous_batch_num_samples.reset_mock() + mock_forecast_next_accuracy.reset_mock() + mock_forecast_density.reset_mock() # 7 avoidable past misclassifications (57 observed, 50 expected) # 1 forecasted misclassifications: exp acc=0.5, forecasted acc=0.4, update interval 10, data density=1 # --> 8 avoidable misclassifications --> don't trigger assert not misclassification_policy.evaluate_decision( - **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval=10 + **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval_samples=10 ) # forecasted misclassifications not persisted in the counter assert misclassification_policy.cumulated_avoidable_misclassifications == 7 assert mock_previous_batch_num_misclassifications.call_count == 1 assert mock_previous_batch_num_samples.call_count == 1 assert mock_forecast_next_accuracy.call_count == 1 - assert mock_forecast_density.call_count == 1 + assert mock_forecast_density.call_count == 0 # unused # reset misclassification_policy.cumulated_avoidable_misclassifications = 0 @@ -393,13 +402,13 @@ def test_misclassification_forecast( # 3 forecasted misclassifications: exp acc=0.5, forecasted acc=0.2, update interval 10, data density=1 # --> 10 avoidable misclassifications --> don't trigger assert misclassification_policy.evaluate_decision( - **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval=10 + **eval_decision_kwargs, evaluation_scores={"acc": 0}, update_interval_samples=10 ) assert misclassification_policy.cumulated_avoidable_misclassifications == 7 assert mock_previous_batch_num_misclassifications.call_count == 1 assert mock_previous_batch_num_samples.call_count == 1 assert mock_forecast_next_accuracy.call_count == 1 - assert mock_forecast_density.call_count == 1 + assert mock_forecast_density.call_count == 0 # unused # we only need the accuracy values here assert mock_forecast_next_performance.call_count == 0 diff --git a/modyn/tests/supervisor/internal/triggers/performance/test_performancetrigger_mixin.py b/modyn/tests/supervisor/internal/triggers/performance/test_performancetrigger_mixin.py new file mode 100644 index 000000000..7ea853bf6 --- /dev/null +++ b/modyn/tests/supervisor/internal/triggers/performance/test_performancetrigger_mixin.py @@ -0,0 +1,146 @@ +import os +import pathlib +from unittest.mock import MagicMock, patch + +from pytest import fixture + +from modyn.config.schema.pipeline.evaluation.config import EvalDataConfig +from modyn.config.schema.pipeline.evaluation.metrics import ( + AccuracyMetricConfig, + F1ScoreMetricConfig, +) +from modyn.config.schema.pipeline.trigger.performance.performance import ( + PerformanceTriggerEvaluationConfig, + _InternalPerformanceTriggerConfig, +) +from modyn.supervisor.internal.triggers.performance.data_density_tracker import ( + DataDensityTracker, +) +from modyn.supervisor.internal.triggers.performance.performance_tracker import ( + PerformanceTracker, +) +from modyn.supervisor.internal.triggers.performance.performancetrigger_mixin import ( + PerformanceTriggerMixin, +) +from modyn.supervisor.internal.triggers.trigger import TriggerContext +from modyn.supervisor.internal.triggers.utils.model.downloader import ModelDownloader + +PIPELINE_ID = 42 +SAMPLE = (10, 1, 1) +BASEDIR = pathlib.Path(os.path.realpath(__file__)).parent / "test_eval_dir" + + +@fixture +def performance_trigger_mixin_config() -> _InternalPerformanceTriggerConfig: + return _InternalPerformanceTriggerConfig( + evaluation_interval_data_points=42, + data_density_window_size=100, + performance_triggers_window_size=10, + evaluation=PerformanceTriggerEvaluationConfig( + device="cuda:0", + dataset=EvalDataConfig( + dataset_id="dummy_dataset", + bytes_parser_function="def bytes_parser_function(data: bytes) -> bytes:\n\treturn data", + batch_size=64, + dataloader_workers=1, + metrics=[ + AccuracyMetricConfig(evaluation_transformer_function=None), + F1ScoreMetricConfig( + evaluation_transformer_function=None, + num_classes=10, + average="macro", + pos_label=1, + ), + ], + ), + label_transformer_function=None, + ), + mode="hindsight", + forecasting_method="rolling_average", + ) + + +def test_create_performance_trigger( + performance_trigger_mixin_config: _InternalPerformanceTriggerConfig, + dummy_trigger_context: TriggerContext, +) -> None: + trigger = PerformanceTriggerMixin(performance_trigger_mixin_config) + assert trigger.config == performance_trigger_mixin_config + assert trigger.context is None + + assert isinstance(trigger.data_density, DataDensityTracker) + assert trigger.data_density.batch_memory.maxlen == performance_trigger_mixin_config.data_density_window_size + assert isinstance(trigger.performance_tracker, PerformanceTracker) + + assert not trigger.model_refresh_needed + assert trigger.most_recent_model_id is None + assert trigger.dataloader_info is None + assert trigger.model_downloader is None + assert trigger.model is None + + assert len(trigger._metrics) == 2 + assert trigger._label_transformer_function is None + + +@patch.object(PerformanceTriggerMixin, "_init_dataloader_info", return_value=None) +@patch.object(PerformanceTriggerMixin, "_init_model_downloader", return_value=None) +def test_init_trigger( + mock_init_model_downloader: MagicMock, + mock_init_dataloader_info: MagicMock, + performance_trigger_mixin_config: _InternalPerformanceTriggerConfig, + dummy_trigger_context: TriggerContext, +) -> None: + trigger = PerformanceTriggerMixin(performance_trigger_mixin_config) + trigger._init_trigger(context=dummy_trigger_context) + assert trigger.context == dummy_trigger_context + mock_init_model_downloader.assert_called_once() + mock_init_dataloader_info.assert_called_once() + + +@patch.object(PerformanceTriggerMixin, "_run_evaluation", side_effect=[(5, 2, {"Accuracy": 0.6})]) +@patch.object(PerformanceTracker, "inform_trigger", return_value=None) +def test_inform_new_model( + mock_inform_trigger: MagicMock, + mock_evaluation: MagicMock, + performance_trigger_mixin_config: _InternalPerformanceTriggerConfig, +) -> None: + trigger = PerformanceTriggerMixin(performance_trigger_mixin_config) + last_detection_interval = [(i, 100 + i) for i in range(5)] + assert not trigger.model_refresh_needed + trigger._inform_new_model(42, last_detection_interval) + assert trigger.most_recent_model_id == 42 + assert trigger.model_refresh_needed # would be reset if _run_evaluation wasn't mocked + + mock_evaluation.assert_called_once_with(interval_data=last_detection_interval) + mock_inform_trigger.assert_called_once_with( + num_samples=5, num_misclassifications=2, evaluation_scores={"Accuracy": 0.6} + ) + + +# Note: we don't test _run_evaluation as this would require more mocking than actual testing + + +def test_init_dataloader_info( + performance_trigger_mixin_config: _InternalPerformanceTriggerConfig, + dummy_trigger_context: TriggerContext, +) -> None: + trigger = PerformanceTriggerMixin(performance_trigger_mixin_config) + trigger.context = dummy_trigger_context + + trigger._init_dataloader_info() + + assert trigger.dataloader_info is not None + + +@patch.object(ModelDownloader, "connect_to_model_storage") +def test_init_model_downloader( + mock_connect_to_model_storage: MagicMock, + performance_trigger_mixin_config: _InternalPerformanceTriggerConfig, + dummy_trigger_context: TriggerContext, +) -> None: + trigger = PerformanceTriggerMixin(performance_trigger_mixin_config) + trigger.context = dummy_trigger_context + + trigger._init_model_downloader() + + assert trigger.model_downloader is not None diff --git a/modyn/tests/supervisor/internal/triggers/test_avoidablemisclassification_latency_cost_trigger.py b/modyn/tests/supervisor/internal/triggers/test_avoidablemisclassification_latency_cost_trigger.py new file mode 100644 index 000000000..44e75c6fc --- /dev/null +++ b/modyn/tests/supervisor/internal/triggers/test_avoidablemisclassification_latency_cost_trigger.py @@ -0,0 +1,132 @@ +"""We will mock CostTrigger here as the basic integration between CostTrigger +and it's children is already asserted through +`AvoidableMisclassificationCostTrigger`.""" + +from unittest.mock import ANY, MagicMock, patch + +from pytest import fixture + +from modyn.config.schema.pipeline.evaluation.config import EvalDataConfig +from modyn.config.schema.pipeline.evaluation.metrics import ( + AccuracyMetricConfig, + F1ScoreMetricConfig, +) +from modyn.config.schema.pipeline.trigger.cost.cost import ( + AvoidableMisclassificationCostTriggerConfig, +) +from modyn.config.schema.pipeline.trigger.performance.performance import ( + PerformanceTriggerEvaluationConfig, +) +from modyn.supervisor.internal.triggers.avoidablemissclassification_costtrigger import ( + AvoidableMisclassificationCostTrigger, +) +from modyn.supervisor.internal.triggers.cost.incorporation_latency_tracker import ( + IncorporationLatencyTracker, +) +from modyn.supervisor.internal.triggers.costtrigger import CostTrigger +from modyn.supervisor.internal.triggers.performance.misclassification_estimator import ( + NumberAvoidableMisclassificationEstimator, +) +from modyn.supervisor.internal.triggers.performance.performancetrigger_mixin import ( + PerformanceTriggerMixin, +) +from modyn.supervisor.internal.triggers.trigger import TriggerContext + + +@fixture +def trigger_config() -> AvoidableMisclassificationCostTriggerConfig: + return AvoidableMisclassificationCostTriggerConfig( + # performance trigger mixin + evaluation_interval_data_points=42, + data_density_window_size=100, + performance_triggers_window_size=10, + evaluation=PerformanceTriggerEvaluationConfig( + device="cuda:0", + dataset=EvalDataConfig( + dataset_id="dummy_dataset", + bytes_parser_function="def bytes_parser_function(data: bytes) -> bytes:\n\treturn data", + batch_size=64, + dataloader_workers=1, + metrics=[ + AccuracyMetricConfig(evaluation_transformer_function=None), + F1ScoreMetricConfig( + evaluation_transformer_function=None, + num_classes=10, + average="macro", + pos_label=1, + ), + ], + ), + label_transformer_function=None, + ), + mode="hindsight", + forecasting_method="rolling_average", + # cost_trigger + cost_tracking_window_size=5, + avoidable_misclassification_latency_per_training_second=1.0, + ) + + +@patch.object(PerformanceTriggerMixin, "_init_trigger") +@patch.object(CostTrigger, "init_trigger") +def test_initialization( + mock_cost_trigger_init: MagicMock, + mock_performance_trigger_mixin_init: MagicMock, + trigger_config: AvoidableMisclassificationCostTriggerConfig, + dummy_trigger_context: TriggerContext, +) -> None: + trigger = AvoidableMisclassificationCostTrigger(trigger_config) + assert trigger._sample_left_until_detection == trigger_config.evaluation_interval_data_points + assert not trigger._triggered_once + assert trigger._previous_batch_end_time is None + assert trigger._unincorporated_samples == 0 + + assert isinstance(trigger.misclassification_estimator, NumberAvoidableMisclassificationEstimator) + + assert trigger._cost_tracker.measurements.maxlen == trigger_config.cost_tracking_window_size + assert trigger._incorporation_latency_tracker.cumulative_latency_regret == 0.0 + + assert trigger.context is None + trigger.init_trigger(dummy_trigger_context) + mock_cost_trigger_init.assert_called_once_with(trigger, dummy_trigger_context) + mock_performance_trigger_mixin_init.assert_called_once_with(trigger, dummy_trigger_context) + + +@patch.object(PerformanceTriggerMixin, "_inform_new_model") +@patch.object(CostTrigger, "inform_new_model") +def test_inform_new_model( + mock_cost_trigger_inform_new_model: MagicMock, + mock_performance_trigger_mixin_inform_new_model: MagicMock, + trigger_config: AvoidableMisclassificationCostTriggerConfig, +) -> None: + trigger = AvoidableMisclassificationCostTrigger(trigger_config) + + trigger.inform_new_model(42, 43, 44.0) + mock_cost_trigger_inform_new_model.assert_called_once_with(trigger, 42, 43, 44.0) + mock_performance_trigger_mixin_inform_new_model.assert_called_once_with(trigger, 42, ANY) + + +@patch.object(IncorporationLatencyTracker, "add_latency", return_value=42.0) +@patch.object( + NumberAvoidableMisclassificationEstimator, + "estimate_avoidable_misclassifications", + return_value=(5, -100), +) +@patch.object(PerformanceTriggerMixin, "_run_evaluation", side_effect=[(5, 2, {"Accuracy": 0.6})]) +def test_compute_regret_metric( + mock_run_evaluation: MagicMock, + mock_estimate_avoidable_misclassifications: MagicMock, + mock_add_latency: MagicMock, + trigger_config: AvoidableMisclassificationCostTriggerConfig, +): + trigger = AvoidableMisclassificationCostTrigger(trigger_config) + + batch = [(i, 100 + i) for i in range(5)] + batch_duration = 99 + new_regret_latency = trigger._compute_regret_metric(batch, 0, batch_duration) + + mock_run_evaluation.assert_called_once_with(interval_data=batch) + + mock_estimate_avoidable_misclassifications.assert_called_once() + assert new_regret_latency == 42.0 + mock_add_latency.assert_called_once_with(5, 99) diff --git a/modyn/tests/supervisor/internal/triggers/cost/test_incorporation_latency_cost_trigger.py b/modyn/tests/supervisor/internal/triggers/test_incorporation_latency_cost_trigger.py similarity index 97% rename from modyn/tests/supervisor/internal/triggers/cost/test_incorporation_latency_cost_trigger.py rename to modyn/tests/supervisor/internal/triggers/test_incorporation_latency_cost_trigger.py index 1e50f9e62..2071a2334 100644 --- a/modyn/tests/supervisor/internal/triggers/cost/test_incorporation_latency_cost_trigger.py +++ b/modyn/tests/supervisor/internal/triggers/test_incorporation_latency_cost_trigger.py @@ -6,7 +6,7 @@ DataIncorporationLatencyCostTriggerConfig, ) from modyn.supervisor.internal.triggers.cost.cost_tracker import CostTracker -from modyn.supervisor.internal.triggers.cost.dataincorporationlatency_costtrigger import ( +from modyn.supervisor.internal.triggers.dataincorporationlatency_costtrigger import ( DataIncorporationLatencyCostTrigger, ) from modyn.supervisor.internal.triggers.trigger import TriggerContext diff --git a/modyn/tests/supervisor/internal/triggers/test_performancetrigger.py b/modyn/tests/supervisor/internal/triggers/test_performancetrigger.py index 552e68308..23aec3c05 100644 --- a/modyn/tests/supervisor/internal/triggers/test_performancetrigger.py +++ b/modyn/tests/supervisor/internal/triggers/test_performancetrigger.py @@ -29,8 +29,8 @@ StaticNumberAvoidableMisclassificationDecisionPolicy, StaticPerformanceThresholdDecisionPolicy, ) -from modyn.supervisor.internal.triggers.performance.performance_tracker import ( - PerformanceTracker, +from modyn.supervisor.internal.triggers.performance.performancetrigger_mixin import ( + PerformanceTriggerMixin, ) from modyn.supervisor.internal.triggers.performancetrigger import ( PerformanceTrigger, @@ -98,7 +98,6 @@ def misclassifications_criterion() -> StaticNumberAvoidableMisclassificationCrit def test_initialization(performance_trigger_config: PerformanceTriggerConfig) -> None: trigger = PerformanceTrigger(performance_trigger_config) assert trigger._sample_left_until_detection == performance_trigger_config.evaluation_interval_data_points - assert trigger.data_density.batch_memory.maxlen == performance_trigger_config.data_density_window_size assert len(trigger.decision_policies) == 1 assert isinstance( @@ -112,11 +111,9 @@ def test_initialization(performance_trigger_config: PerformanceTriggerConfig) -> assert trigger.most_recent_model_id is None -@patch.object(PerformanceTrigger, "_init_dataloader_info", return_value=None) -@patch.object(PerformanceTrigger, "_init_model_downloader", return_value=None) +@patch.object(PerformanceTriggerMixin, "_init_trigger") def test_init_trigger( - mock_init_model_downloader: MagicMock, - mock_init_dataloader_info: MagicMock, + mock_init_trigger: MagicMock, performance_trigger_config: PerformanceTriggerConfig, dummy_pipeline_config: ModynPipelineConfig, dummy_system_config: ModynConfig, @@ -124,9 +121,7 @@ def test_init_trigger( trigger_context = TriggerContext(PIPELINE_ID, dummy_pipeline_config, dummy_system_config, BASEDIR) trigger = PerformanceTrigger(performance_trigger_config) trigger.init_trigger(context=trigger_context) - assert trigger.context == trigger_context - assert mock_init_model_downloader.called - assert mock_init_dataloader_info.called + mock_init_trigger.assert_called_once_with(trigger, trigger_context) def test_setup_decision_policies( @@ -156,28 +151,19 @@ def test_setup_decision_policies( assert policies["avoidable_misclassifications"].config == misclassifications_criterion -@patch.object(PerformanceTrigger, "_run_evaluation", side_effect=[(5, 2, {"Accuracy": 0.6})]) -@patch.object(PerformanceTracker, "inform_trigger", return_value=None) +@patch.object(PerformanceTriggerMixin, "_inform_new_model") def test_inform_new_model( - mock_inform_trigger: MagicMock, - mock_evaluation: MagicMock, + mock_inform_new_model: MagicMock, performance_trigger_config: PerformanceTriggerConfig, ) -> None: trigger = PerformanceTrigger(performance_trigger_config) - data_interval = [(i, 100 + i) for i in range(5)] - trigger._last_detection_interval = data_interval - assert not trigger.model_refresh_needed - trigger.inform_new_model(42) - assert trigger.most_recent_model_id == 42 - assert trigger.model_refresh_needed # would be reset if _run_evaluation wasn't mocked + data = [(i, 100 + i) for i in range(5)] + trigger._last_detection_interval = data + trigger.inform_new_model(42, 43, 44.0) + mock_inform_new_model.assert_called_once_with(trigger, 42, data) - mock_evaluation.assert_called_once_with(interval_data=data_interval) - mock_inform_trigger.assert_called_once_with( - num_samples=5, num_misclassifications=2, evaluation_scores={"Accuracy": 0.6} - ) - -@patch.object(PerformanceTrigger, "_run_evaluation", return_value=(5, 2, {"Accuracy": 0.9})) +@patch.object(PerformanceTriggerMixin, "_run_evaluation", return_value=(5, 2, {"Accuracy": 0.9})) @patch.object(DataDensityTracker, "inform_data", return_value=None) @patch.object( StaticPerformanceThresholdDecisionPolicy, @@ -211,7 +197,7 @@ def test_inform( assert mock_inform_data.call_args_list == [call([(i, 100 + i) for i in range(4)])] assert mock_evaluation.call_args_list == [ # first time within inform, second time within inform_new_model - call(interval_data=[(i, 100 + i) for i in range(4)]), + call(trigger, interval_data=[(i, 100 + i) for i in range(4)]), call(interval_data=[(i, 100 + i) for i in range(4)]), ] @@ -236,7 +222,7 @@ def test_inform( assert mock_inform_data.call_args_list == [call([(i, 100 + i) for i in range(4, 8)])] assert mock_evaluation.call_args_list == [ # first time within inform, second time within inform_new_model - call(interval_data=[(i, 100 + i) for i in range(4, 8)]), + call(trigger, interval_data=[(i, 100 + i) for i in range(4, 8)]), call(interval_data=[(i, 100 + i) for i in range(4, 8)]), ] @@ -256,4 +242,4 @@ def test_inform( assert mock_evaluate_decision.call_count == 1 assert trigger._last_detection_interval == [(i, 100 + i) for i in range(8, 12)] assert mock_inform_data.call_args_list == [call([(i, 100 + i) for i in range(8, 12)])] - assert mock_evaluation.call_args_list == [call(interval_data=[(i, 100 + i) for i in range(8, 12)])] + assert mock_evaluation.call_args_list == [call(trigger, interval_data=[(i, 100 + i) for i in range(8, 12)])]