Skip to content

Commit

Permalink
Merge branch 'main' into robin/feat/cost-aware-config-models
Browse files Browse the repository at this point in the history
  • Loading branch information
robinholzi authored Aug 20, 2024
2 parents dd09c0d + 91c5fa3 commit 6e7ad46
Show file tree
Hide file tree
Showing 10 changed files with 1,051 additions and 1 deletion.
2 changes: 1 addition & 1 deletion modyn/evaluator/internal/pytorch_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _single_interval_evaluate(self, dataloader: torch.utils.data.DataLoader, int
)

self._info(f"Finished evaluation of {interval_idx}. Putting items into queue...")
self._metric_result_queue.put((interval_idx, eval_result.metric_results.items()), timeout=30)
self._metric_result_queue.put((interval_idx, list(eval_result.metric_results.items())), timeout=30)
self._info(
f"Finished evaluation of {interval_idx}: {eval_result.num_samples} samples. "
f"Queue size = {self._metric_result_queue.qsize()}"
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from collections import deque

from modyn.const.types import ForecastingMethod
from modyn.supervisor.internal.utils.forecast import forecast_value


class DataDensityTracker:
"""Observes a stream of data chunks and estimates the time density of the
data.
Assumes that the data chunks are ordered by time. For the first
chunks only number of samples, start and end time of a batch is
considered. Starting with the second batch, the time between the
last sample of the previous batch and the first sample of the
current batch is considered as well.
Most use cases have constant batch sizes.
"""

def __init__(self, window_size: int) -> None:
"""
Args:
window_size: How many batches the memory for the rolling average should hold.
"""
self.batch_memory: deque[tuple[int, int]] = deque(maxlen=window_size)
"""Memory of the last `window_size` batches containing the number of
samples and the time range of the batch in seconds."""

self._previous_batch_end_time: int | None = None

def inform_data(self, data: list[tuple[int, int]]) -> None:
"""Informs the tracker about new data batch."""
if len(data) == 0:
return

num_seconds = (
data[-1][1] - self._previous_batch_end_time
if self._previous_batch_end_time is not None
else data[-1][1] - data[0][1]
)

self.batch_memory.append((len(data), num_seconds))
self._previous_batch_end_time = data[-1][1]

@property
def previous_batch_num_samples(self) -> int:
"""Returns the number of samples in the last batch."""
assert len(self.batch_memory) > 0, "No data in memory, calibration needed."
return self.batch_memory[-1][0]

def needs_calibration(self) -> bool:
"""Checks if the tracker has enough data for a forecast."""
return len(self.batch_memory) == 0

def forecast_density(self, method: ForecastingMethod = "ridge_regression") -> float:
"""Forecasts the data density based on the current memory.
Returns:
The forecasted data density as ratio of samples per second.
"""
ratio_series = [num_samples / num_seconds for num_samples, num_seconds in self.batch_memory]
return forecast_value(observations=ratio_series, method=method)
201 changes: 201 additions & 0 deletions modyn/supervisor/internal/triggers/performance/decision_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from abc import ABC, abstractmethod

from modyn.config.schema.pipeline.trigger.performance.criterion import (
DynamicPerformanceThresholdCriterion,
StaticNumberAvoidableMisclassificationCriterion,
StaticPerformanceThresholdCriterion,
)
from modyn.const.types import ForecastingMethod, TriggerEvaluationMode
from modyn.supervisor.internal.triggers.performance.data_density_tracker import (
DataDensityTracker,
)
from modyn.supervisor.internal.triggers.performance.performance_tracker import (
PerformanceTracker,
)


class PerformanceDecisionPolicy(ABC):
"""Decision policy that will make the binary trigger decisions on
observations of a performance metric."""

@abstractmethod
def evaluate_decision(
self,
update_interval: int,
evaluation_scores: dict[str, float],
data_density: DataDensityTracker,
performance_tracker: PerformanceTracker,
mode: TriggerEvaluationMode,
method: ForecastingMethod,
) -> bool:
"""Evaluate the decision based on the given observation.
At the time of calling this the performance_tracker has already been updated with the new performance value
to allow for forecast based decisions.
Also, data_density has to be updated with the new data interval.
Args:
update_interval: The interval in which the decision is made.
performance: The observed performance metric.
mode: The mode in which the decision should be evaluated.
data_density: The data density tracker, updated with the new data interval.
performance_tracker: The performance tracker, updated with the new performance value.
Returns:
The final trigger decision.
"""

def inform_trigger(self) -> None:
"""Inform the decision policy that a trigger has been invoked."""


class StaticPerformanceThresholdDecisionPolicy(PerformanceDecisionPolicy):
"""Decision policy that will make the binary trigger decisions based on a
static threshold."""

def __init__(self, config: StaticPerformanceThresholdCriterion):
self.config = config

def evaluate_decision(
self,
update_interval: int,
evaluation_scores: dict[str, float],
data_density: DataDensityTracker,
performance_tracker: PerformanceTracker,
mode: TriggerEvaluationMode,
method: ForecastingMethod,
) -> bool:
if mode == "hindsight":
return evaluation_scores[self.config.metric] < self.config.metric_threshold

return (evaluation_scores[self.config.metric] < self.config.metric_threshold) or (
performance_tracker.forecast_next_performance(mode) < self.config.metric_threshold
)


class DynamicPerformanceThresholdDecisionPolicy(PerformanceDecisionPolicy):
"""Decision policy that will make the binary trigger decisions based on a
dynamic threshold.
Value falls below the rolling average more than the allowed
deviation.
"""

def __init__(self, config: DynamicPerformanceThresholdCriterion):
self.config = config

def evaluate_decision(
self,
update_interval: int,
evaluation_scores: dict[str, float],
data_density: DataDensityTracker,
performance_tracker: PerformanceTracker,
mode: TriggerEvaluationMode,
method: ForecastingMethod,
) -> bool:
threshold = performance_tracker.forecast_expected_performance(mode) - self.config.allowed_deviation

if mode == "hindsight":
return evaluation_scores[self.config.metric] < threshold

return (evaluation_scores[self.config.metric] < threshold) or (
performance_tracker.forecast_next_performance(mode) < threshold
)


class StaticNumberAvoidableMisclassificationDecisionPolicy(PerformanceDecisionPolicy):
"""Decision policy that will make the binary trigger decisions based on a
static number of cumulated avoidable misclassifications."""

def __init__(self, config: StaticNumberAvoidableMisclassificationCriterion):
"""
Args:
threshold: The threshold of cumulated avoidable misclassifications.
"""

self.config = config
self.cumulated_avoidable_misclassifications = 0

# pylint: disable=too-many-locals
def evaluate_decision(
self,
update_interval: int,
evaluation_scores: dict[str, float],
data_density: DataDensityTracker,
performance_tracker: PerformanceTracker,
mode: TriggerEvaluationMode,
method: ForecastingMethod,
) -> bool:
"""Utilizes the state of `DataDensityTracker` and `PerformanceTracker`
to make the decision.
We support both the "hindsight" and "forecast" mode.
In the "hindsight" mode, the decision is made based on the current performance and the cumulated avoidable misclassifications.
- Formalization:
- historic observation:
- data_cum_since_last_trigger: The cumulated data points since the last trigger.
- avoidable_misclassifications_since_last_trigger: The cumulated avoidable misclassifications since
the last trigger.
In the "lookahead" mode, the decision is made based on the current performance, the cumulated avoidable
misclassifications, future performance estimates and future data density estimates.
Similar to the "hindsight" mode, we first check if current performance already leads to a transgression
of the threshold and therefore to a trigger.
If that's not the case we estimate the cumulated avoidable misclassifications until the next point of update.
If we expect a transgression of the threshold before the next update point, we trigger.
This forward looking approach tries to avoid exceeding the misclassification budget in the first place.
"""

# compute the number of avoidable misclassifications by retrieving the actual misclassifications
# and the expected misclassifications through the expected accuracy for the last interval.
previous_interval_num_misclassifications = performance_tracker.previous_batch_num_misclassifications

# the expected performance won't change unless there's a trigger
expected_accuracy = (
performance_tracker.forecast_expected_accuracy(method=method)
if self.config.expected_accuracy is None
else self.config.expected_accuracy
)
previous_expected_num_misclassifications = (1 - expected_accuracy) * data_density.previous_batch_num_samples
new_avoidable_misclassifications = (
previous_interval_num_misclassifications - previous_expected_num_misclassifications
)
if new_avoidable_misclassifications < 0 and not self.config.allow_reduction:
new_avoidable_misclassifications = 0

self.cumulated_avoidable_misclassifications += round(new_avoidable_misclassifications)

if mode == "hindsight":
return self.cumulated_avoidable_misclassifications >= self.config.avoidable_misclassification_threshold

if mode == "lookahead":
# past misclassifications already exceed the threshold, forecasting not needed
if self.cumulated_avoidable_misclassifications >= self.config.avoidable_misclassification_threshold:
return True

forecasted_data_density = data_density.forecast_density(method=method)
forecast_accuracy = performance_tracker.forecast_next_accuracy(method=method)

accuracy_delta = expected_accuracy - forecast_accuracy
if accuracy_delta < 0 and not self.config.allow_reduction:
accuracy_delta = 0

# new misclassification = accuracy * samples; samples = data_density * interval_duration
forecast_new_avoidable_misclassifications = accuracy_delta * forecasted_data_density * update_interval

forecasted_misclassifications = (
self.cumulated_avoidable_misclassifications + forecast_new_avoidable_misclassifications
)

return forecasted_misclassifications >= self.config.avoidable_misclassification_threshold

raise ValueError(f"Unknown mode: {mode}")

def inform_trigger(self) -> None:
"""Resets the cumulated avoidable misclassifications."""
self.cumulated_avoidable_misclassifications = 0
Loading

0 comments on commit 6e7ad46

Please sign in to comment.