Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add more powerful drift windowing strategies, warmup and dynamic thresholds #564

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9d71dfb
Add more powerful windowing strategies, warumup and dynamic thresholds
robinholzi Jul 4, 2024
244da67
Merge branch 'main' into robinholzi/feat/more-powerful-drift-windows-…
robinholzi Jul 4, 2024
85ed4f5
Merge branch 'main' into robinholzi/feat/more-powerful-drift-windows-…
robinholzi Jul 22, 2024
4ca50ed
Add tests
robinholzi Jul 24, 2024
91e0dda
Merge branch 'main' into robinholzi/feat/more-powerful-drift-windows-…
robinholzi Jul 24, 2024
28dbb9f
Merge branch 'main' into robinholzi/feat/more-powerful-drift-windows-…
robinholzi Jul 26, 2024
6b22b6d
fix linting
robinholzi Jul 26, 2024
0d42176
fix linting
robinholzi Jul 26, 2024
0028e1b
Implement suggestions, v1
robinholzi Aug 7, 2024
9f3b70f
Merge branch 'main' into robinholzi/feat/more-powerful-drift-windows-…
robinholzi Aug 7, 2024
bbacf1e
Integrate suggestions, rename things, more tests, documentation
robinholzi Aug 8, 2024
44f29d1
Fix
robinholzi Aug 8, 2024
836af4f
Merge branch 'main' into robinholzi/feat/more-powerful-drift-windows-…
robinholzi Aug 12, 2024
f950c25
Fix
robinholzi Aug 12, 2024
8912476
Merge branch 'main' into robinholzi/feat/more-powerful-drift-windows-…
robinholzi Aug 12, 2024
c9091aa
Tests and adjustments to warmup
robinholzi Aug 12, 2024
4792fd4
Integrate suggestions, rename things
robinholzi Aug 12, 2024
ed0baaa
fix
robinholzi Aug 12, 2024
ccb6ba3
Fix
robinholzi Aug 13, 2024
46f52d3
Move averaging logic into detector
robinholzi Aug 13, 2024
a2392ad
Merge branch 'main' into robinholzi/feat/more-powerful-drift-windows-…
robinholzi Aug 13, 2024
ea036c1
Final adjustments
robinholzi Aug 14, 2024
9594453
Small fix wrt interval tests
robinholzi Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/pipeline/triggering/DRIFT_TRIGGERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ classDiagram
<<abstract>>
}

DataDriftTrigger "warmup_trigger" *-- "1" Trigger
DataDriftTrigger *-- "1" DataDriftTriggerConfig
DataDriftTrigger *-- "1" DetectionWindows
DataDriftTrigger *-- "|metrics|" DriftDetector
Expand Down Expand Up @@ -91,7 +92,9 @@ classDiagram

### DriftDetector Hierarchy

The `DriftDetector` class is an abstract base class for detectors like `AlibiDriftDetector` and `EvidentlyDriftDetector`, which use different metrics to measure the distance between the current and reference data distributions. It generates distance values.
The `DriftDetector` class is an abstract base class for detectors like `AlibiDriftDetector` and `EvidentlyDriftDetector`, which use different metrics to measure the distance between the current and reference data distributions.
Both the underlying drift detection packages generate their own binary drift decision through hypothesis testing or threshold. In the `DriftDetector` we only use the distance metric
and later derive a binary decision based from that using our own threshold based decision policies. Therefore we ignore the binary decision generated by the underlying drift detection packages.

The `BaseMetric` class hierarchy is a series of Pydantic configuration classes while the `Detectors` are actual business logic classes that implement the distance calculation.

Expand Down Expand Up @@ -193,7 +196,9 @@ classDiagram

The `DriftDecisionPolicy` class is an abstract base class for policies like `ThresholdDecisionPolicy`, `DynamicDecisionPolicy`, and `HypothesisTestDecisionPolicy`.

Each Decision policy wraps one DriftMetric (e.g. MMD, CVM, KS) and one DecisionCriterion (e.g. Threshold, DynamicThreshold, HypothesisTest) to make a decision based on the distance metric. It e.g. observes the series of distance value measurements from it's `DriftMetric` and makes a decision after having calibrated on the seen distances.
Each decision policy wraps one DriftMetric (e.g. MMD, CVM, KS) and one DecisionCriterion (e.g. Threshold, DynamicThreshold, HypothesisTest) to make a decision based on the distance metric. It e.g. observes the series of distance value measurements from it's `DriftMetric` and makes a decision after having calibrated on the seen distances.

If a `DecisionPolicy` needs to be calibrated before being able to make a decision, we have to run the `DriftTrigger` with a warm-up period. This warm-up period is defined as a fixed number of intervals where another simple drift policy is used to make decisions while also evaluating the `DecisionPolicy` to calibrate it.

Within one `DataDriftTrigger` the different results from different `DriftMetrics`'s `DriftDecisionPolicies` can be aggregated to a final decision using a voting mechanism (see `DataDriftTriggerConfig.aggregation_strategy`).

Expand Down
4 changes: 2 additions & 2 deletions modyn/config/schema/pipeline/trigger/drift/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class DataDriftTriggerConfig(ModynBaseModel):
None,
description=(
"The number of intervals before starting to use the drift detection. Some "
"`DecisionCriteria` use this to calibrate the threshold. During the warmup, every interval will cause "
"a trigger."
"`DecisionCriteria` use this to calibrate the threshold. During the warmup, a simpler `warmup_policy` "
"is consulted for the triggering decision."
),
)
warmup_policy: SimpleTriggerConfig | None = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,7 @@

from pydantic import Field

from modyn.config.schema.base_model import ModynBaseModel


class _BaseWindowingStrategy(ModynBaseModel):
allow_overlap: bool = Field(
False,
description=(
"Whether the windows are allowed to overlap. This is useful for time-based windows."
"If set to False, the current window will be reset after each trigger."
),
)
from .window import _BaseWindowingStrategy


class AmountWindowingStrategy(_BaseWindowingStrategy):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class _BaseWindowingStrategy(ModynBaseModel):
allow_overlap: bool = Field(
False,
description=(
"Whether the windows are allowed to overlap. This is useful for time-based windows."
"Whether the windows are allowed to overlap."
"If set to False, the current window will be reset after each trigger."
),
)
18 changes: 9 additions & 9 deletions modyn/supervisor/internal/triggers/datadrifttrigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ def __init__(self, config: DataDriftTriggerConfig):
self._sample_left_until_detection = (
config.detection_interval_data_points
) # allows to detect drift in a fixed interval
self._windows = _setup_detection_window_manager(config.windowing_strategy)
self._windows = _setup_detection_windows(config.windowing_strategy)
self._triggered_once = False

self.evidently_detector = EvidentlyDriftDetector(config.metrics)
self.alibi_detector = AlibiDriftDetector(config.metrics)

# Every decision policy wraps one metric and is responsible for making decisions based on the metric's results
# and the metric's range of distance values
self.decision_policies = _setup_decision_engines(config)
self.decision_policies = _setup_decision_policies(config)

# [WARMUP CONFIGURATION]
self.warmup_completed = config.warmup_policy is None
Expand Down Expand Up @@ -321,7 +321,7 @@ def _run_detection(
**self.alibi_detector.detect_drift(reference_embeddings, current_embeddings, is_warmup),
}

# make the final decisions with the decision engines
# make the final decisions with the decision policies
for metric_name, metric_result in drift_results.items():
# some metrics return a list of distances (for every sample) instead of a single distance
# we take the mean of the distances to get a scalar distance value
Expand Down Expand Up @@ -377,7 +377,7 @@ def _any_metric_needs_calibration(self) -> bool:
return any(metric.decision_criterion.needs_calibration for metric in self.config.metrics.values())


def _setup_detection_window_manager(
def _setup_detection_windows(
windowing_strategy: DriftWindowingStrategy,
) -> DetectionWindows:
if isinstance(windowing_strategy, AmountWindowingStrategy):
Expand All @@ -387,17 +387,17 @@ def _setup_detection_window_manager(
raise ValueError(f"Unsupported windowing strategy: {windowing_strategy}")


def _setup_decision_engines(
def _setup_decision_policies(
config: DataDriftTriggerConfig,
) -> dict[str, DriftDecisionPolicy]:
decision_engines: dict[str, DriftDecisionPolicy] = {}
policies: dict[str, DriftDecisionPolicy] = {}
for metric_name, metric_config in config.metrics.items():
criterion = metric_config.decision_criterion
assert (
metric_config.num_permutations is None
), "Modyn doesn't allow hypothesis testing, it doesn't work in our context"
if isinstance(criterion, ThresholdDecisionCriterion):
decision_engines[metric_name] = ThresholdDecisionPolicy(config)
policies[metric_name] = ThresholdDecisionPolicy(config)
elif isinstance(criterion, DynamicDecisionPolicy):
decision_engines[metric_name] = DynamicDecisionPolicy(config)
return decision_engines
policies[metric_name] = DynamicDecisionPolicy(config)
return policies
16 changes: 14 additions & 2 deletions modyn/supervisor/internal/triggers/drift/decision_policy.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from abc import ABC, abstractmethod
from collections import deque

from modyn.config.schema.pipeline.trigger.drift.metric import DynamicThresholdCriterion, ThresholdDecisionCriterion
from modyn.config.schema.pipeline.trigger.drift.metric import (
DynamicThresholdCriterion,
ThresholdDecisionCriterion,
)


class DriftDecisionPolicy(ABC):
Expand Down Expand Up @@ -38,7 +41,16 @@ def evaluate_decision(self, distance: float) -> bool:

class DynamicDecisionPolicy(DriftDecisionPolicy):
"""Decision policy that will make the binary is_drift decisions based on a
dynamic threshold."""
dynamic threshold.

We compare a new distance value with the series of previous distance values
and decide if it's more extreme than a certain percentile of the series. Therefore we count the
`num_more_extreme` values that are greater than the new distance and compare it with the
`percentile` threshold.
robinholzi marked this conversation as resolved.
Show resolved Hide resolved

TODO: we might want to also support some rolling average policy that will trigger if a distance is deviates
from the average by a certain amount.
"""

def __init__(self, config: DynamicThresholdCriterion):
self.config = config
Expand Down
28 changes: 18 additions & 10 deletions modyn/supervisor/internal/triggers/drift/detection_window/amount.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from collections import deque

from modyn.config.schema.pipeline.trigger.drift.detection_window import AmountWindowingStrategy
from modyn.config.schema.pipeline.trigger.drift.detection_window import (
AmountWindowingStrategy,
)

from .window import DetectionWindows

Expand All @@ -12,13 +14,19 @@ def __init__(self, config: AmountWindowingStrategy):

# using maxlen the deque will automatically remove the oldest elements if the buffers are full
self.current: deque[tuple[int, int]] = deque(maxlen=config.amount_cur)

# If the reference window is bigger than the current window, we need a reservoir to store the
# pushed out elements from the current window as we might still need them for the reference window. If a
# trigger happens the whole reservoir and the current window will be copied/moved to the reference window.
# Therefore the reservoir should be the size of the difference between the reference and current window.
self.current_reservoir: deque[tuple[int, int]] = deque(maxlen=max(0, config.amount_ref - config.amount_cur))
robinholzi marked this conversation as resolved.
Show resolved Hide resolved

self.reference: deque[tuple[int, int]] = deque(maxlen=config.amount_ref)

# in overlapping mode (we need dedicated buffer to keep track of the new samples that are not in
# the reference buffer, yet). The current_ and current_reservoir_ are not enough as after
# a trigger they will contain the same elements as before hindering us from copying the
# current elements to the reference buffer (creating duplicates)
# In overlapping mode, we need a dedicated buffer to track new samples that are not yet in the reference buffer.
# The current_ and current_reservoir_ are insufficient because, after a trigger, they will contain the same elements as before,
# which prevents us from copying the current elements to the reference buffer without creating duplicates.
# `exclusive_current` contains exactly the new elements that are not yet in the reference buffer.
self.exclusive_current: deque[tuple[int, int]] = deque(
maxlen=config.amount_ref if self.config.allow_overlap else 0
)
Expand All @@ -31,14 +39,15 @@ def inform_data(self, data: list[tuple[int, int]]) -> None:
self.exclusive_current.extend(data)
else:
# use the existing buffers
num_pushed_out = len(self.current) + len(data) - self.config.amount_cur
remaining_pushes = len(self.current) + len(data) - self.config.amount_cur

# move data from current window to reservoir
# move data from current window to reservoir by first copying the oldest elements in the reservoir
# and then later extending the current window with the new data automatically removing the oldest elements
for pushed_out in self.current:
MaxiBoether marked this conversation as resolved.
Show resolved Hide resolved
if num_pushed_out == 0:
if remaining_pushes == 0:
break
self.current_reservoir.append(pushed_out)
num_pushed_out -= 1
remaining_pushes -= 1

self.current.extend(data)

Expand All @@ -56,5 +65,4 @@ def inform_trigger(self) -> None:
# Move data from current to reference window
self.reference.extend(self.current)

# if allow_overlap, don't reset the current window
self.current.clear()
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from collections import deque

from modyn.config.schema.pipeline.trigger.drift.detection_window import TimeWindowingStrategy
from modyn.config.schema.pipeline.trigger.drift.detection_window import (
TimeWindowingStrategy,
)

from .window import DetectionWindows


class TimeDetectionWindows(DetectionWindows):
def __init__(self, config: TimeWindowingStrategy):
super().__init__()
self.config = config
self.current: deque[tuple[int, int]] = deque()
self.current_reservoir: deque[tuple[int, int]] = deque()
self.reference: deque[tuple[int, int]] = deque()

# in overlapping mode (we need dedicated buffer to keep track of the new samples that are not in
# the reference buffer, yet). The current_ and current_reservoir_ are not enough as after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class DetectionWindows(ABC):

All windows contain tuples with (sample_id, timestamp).

The manager is responsible for the following tasks:
This class is responsible for the following tasks:
- Keep track of the current and reference window
- Update the current window with new data
- Move data from the current and reservoir window to the reference window
Expand Down
Loading