Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add DynamicRollingAverageThresholdPolicy #593

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/pipeline/triggering/DRIFT_TRIGGERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,23 @@ classDiagram

class DynamicThresholdCriterion {
int window_size = 10
}

class DynamicPercentileThresholdCriterion {
float percentile = 0.05
}

class DynamicRollingAverageThresholdCriterion {
float alpha = 0.1
}

DecisionCriterion <|-- ThresholdDecisionCriterion
DecisionCriterion <|-- DynamicThresholdCriterion

DynamicThresholdCriterion <|-- DynamicPercentileThresholdCriterion
DynamicThresholdCriterion <|-- DynamicRollingAverageThresholdCriterion


```

### DriftDecisionPolicy Hierarchy
Expand Down Expand Up @@ -226,8 +238,16 @@ classDiagram
}

class DynamicDecisionPolicy {
<<abstract>>
+DynamicThresholdCriterion config
+Deque~float~ score_observations
}

class DynamicPercentileThresholdPolicy {
+bool evaluate_decision(float distance)
}

class DynamicRollingAverageThresholdPolicy {
+bool evaluate_decision(float distance)
}

Expand All @@ -238,6 +258,8 @@ classDiagram

DriftDecisionPolicy <|-- ThresholdDecisionPolicy
DriftDecisionPolicy <|-- DynamicDecisionPolicy
DynamicDecisionPolicy <|-- DynamicPercentileThresholdPolicy
DynamicDecisionPolicy <|-- DynamicRollingAverageThresholdPolicy
DriftDecisionPolicy <|-- HypothesisTestDecisionPolicy


Expand Down
31 changes: 27 additions & 4 deletions modyn/config/schema/pipeline/trigger/drift/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,41 @@ class ThresholdDecisionCriterion(ModynBaseModel):
needs_calibration: Literal[False] = Field(False)


class DynamicThresholdCriterion(ModynBaseModel):
id: Literal["DynamicThresholdCriterion"] = "DynamicThresholdCriterion"
class _DynamicThresholdCriterion(ModynBaseModel):
window_size: int = Field(10)
needs_calibration: Literal[True] = Field(True)


class DynamicPercentileThresholdCriterion(_DynamicThresholdCriterion):
"""Dynamic threshold based on a extremeness percentile of the previous
distance values."""

id: Literal["DynamicPercentileThresholdCriterion"] = "DynamicPercentileThresholdCriterion"
percentile: float = Field(
0.05,
description="The percentile that a threshold has to be in to trigger a drift event.",
)
needs_calibration: Literal[True] = Field(True)


class DynamicRollingAverageThresholdCriterion(_DynamicThresholdCriterion):
"""Triggers when a new distance value deviates from the rolling average by
a certain amount or percentage."""

id: Literal["DynamicRollingAverageThresholdCriterion"] = "DynamicRollingAverageThresholdCriterion"
deviation: float = Field(
0.05,
description="The deviation from the rolling average that triggers a drift event.",
)
absolute: bool = Field(
False,
description="Whether the deviation is absolute or relative to the rolling average.",
)


DynamicThresholdCriterion = DynamicPercentileThresholdCriterion | DynamicRollingAverageThresholdCriterion

DecisionCriterion = Annotated[
ThresholdDecisionCriterion | DynamicThresholdCriterion,
ThresholdDecisionCriterion | DynamicPercentileThresholdCriterion | DynamicRollingAverageThresholdCriterion,
Field(discriminator="id"),
]

Expand Down
17 changes: 12 additions & 5 deletions modyn/supervisor/internal/triggers/datadrifttrigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@
DriftWindowingStrategy,
TimeWindowingStrategy,
)
from modyn.config.schema.pipeline.trigger.drift.metric import ThresholdDecisionCriterion
from modyn.config.schema.pipeline.trigger.drift.metric import (
DynamicPercentileThresholdCriterion,
DynamicRollingAverageThresholdCriterion,
ThresholdDecisionCriterion,
)
from modyn.config.schema.pipeline.trigger.drift.result import MetricResult
from modyn.supervisor.internal.triggers.drift.decision_policy import (
DriftDecisionPolicy,
DynamicDecisionPolicy,
DynamicPercentileThresholdPolicy,
DynamicRollingAverageThresholdPolicy,
ThresholdDecisionPolicy,
)
from modyn.supervisor.internal.triggers.drift.detection_window.amount import (
Expand Down Expand Up @@ -397,7 +402,9 @@ def _setup_decision_policies(
metric_config.num_permutations is None
), "Modyn doesn't allow hypothesis testing, it doesn't work in our context"
if isinstance(criterion, ThresholdDecisionCriterion):
policies[metric_name] = ThresholdDecisionPolicy(config)
elif isinstance(criterion, DynamicDecisionPolicy):
policies[metric_name] = DynamicDecisionPolicy(config)
policies[metric_name] = ThresholdDecisionPolicy(criterion)
elif isinstance(criterion, DynamicPercentileThresholdCriterion):
policies[metric_name] = DynamicPercentileThresholdPolicy(criterion)
elif isinstance(criterion, DynamicRollingAverageThresholdCriterion):
policies[metric_name] = DynamicRollingAverageThresholdPolicy(criterion)
return policies
57 changes: 44 additions & 13 deletions modyn/supervisor/internal/triggers/drift/decision_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,58 @@ def evaluate_decision(self, distance: float) -> bool:

class DynamicDecisionPolicy(DriftDecisionPolicy):
"""Decision policy that will make the binary is_drift decisions based on a
dynamic threshold.
dynamic threshold."""

def __init__(self, config: DynamicThresholdCriterion):
self.config = config
self.score_observations: deque = deque(maxlen=self.config.window_size)


class DynamicPercentileThresholdPolicy(DynamicDecisionPolicy):
robinholzi marked this conversation as resolved.
Show resolved Hide resolved
"""Dynamic threshold based on a extremeness percentile of the previous
distance values.

We compare a new distance value with the series of previous distance values
and decide if it's more extreme than a certain percentile of the series. Therefore we count the
`num_more_extreme` values that are greater than the new distance and compare it with the
`percentile` threshold.

TODO: we might want to also support some rolling average policy that will trigger if a distance is deviates
from the average by a certain amount.
"""

def __init__(self, config: DynamicThresholdCriterion):
self.config = config
self.score_observations: deque = deque(maxlen=self.config.window_size)
def evaluate_decision(self, distance: float) -> bool:
if len(self.score_observations) == 0:
self.score_observations.append(distance)
return True

sorted_observations = list(sorted(self.score_observations))

threshold = sorted_observations[
min(
max(
robinholzi marked this conversation as resolved.
Show resolved Hide resolved
0,
int(len(sorted_observations) * (1.0 - self.config.percentile)) - 1,
robinholzi marked this conversation as resolved.
Show resolved Hide resolved
),
len(sorted_observations) - 1,
)
] # - 1 from length to index space
self.score_observations.append(distance)

return distance > threshold


class DynamicRollingAverageThresholdPolicy(DynamicDecisionPolicy):
robinholzi marked this conversation as resolved.
Show resolved Hide resolved
"""Triggers when a new distance value deviates from the rolling average by
a certain amount or percentage."""

def evaluate_decision(self, distance: float) -> bool:
num_more_extreme = sum(1 for score in self.score_observations if score >= distance)
trigger = True
if len(self.score_observations) > 0:
perc = num_more_extreme / len(self.score_observations)
trigger = perc < self.config.percentile
if not self.score_observations:
self.score_observations.append(distance)
return True

rolling_average = sum(self.score_observations) / len(self.score_observations)
deviation = distance - rolling_average

self.score_observations.append(distance)
return trigger

if self.config.absolute:
return deviation >= self.config.deviation
return deviation >= self.config.deviation * rolling_average
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import pytest

from modyn.config.schema.pipeline.trigger.drift.metric import DynamicThresholdCriterion, ThresholdDecisionCriterion
from modyn.supervisor.internal.triggers.drift.decision_policy import DynamicDecisionPolicy, ThresholdDecisionPolicy
from modyn.config.schema.pipeline.trigger.drift.metric import (
DynamicPercentileThresholdCriterion,
DynamicRollingAverageThresholdCriterion,
ThresholdDecisionCriterion,
)
from modyn.supervisor.internal.triggers.drift.decision_policy import (
DynamicPercentileThresholdPolicy,
DynamicRollingAverageThresholdPolicy,
ThresholdDecisionPolicy,
)


def test_threshold_decision_policy() -> None:
Expand All @@ -14,29 +22,29 @@ def test_threshold_decision_policy() -> None:

@pytest.mark.parametrize("percentile", [0.1, 0.5, 0.9])
def test_dynamic_decision_policy_initial(percentile: float) -> None:
config = DynamicThresholdCriterion(window_size=3, percentile=percentile)
policy = DynamicDecisionPolicy(config)
config = DynamicPercentileThresholdCriterion(window_size=3, percentile=percentile)
policy = DynamicPercentileThresholdPolicy(config)

# Initially, the deque is empty, so any value should trigger a drift
assert policy.evaluate_decision(0.5)


def test_dynamic_decision_policy_with_observations() -> None:
config = DynamicThresholdCriterion(window_size=3, percentile=0.5)
policy = DynamicDecisionPolicy(config)
config = DynamicPercentileThresholdCriterion(window_size=4, percentile=0.5)
policy = DynamicPercentileThresholdPolicy(config)

# Add initial observations
policy.score_observations.extend([0.4, 0.6, 0.7])
policy.score_observations.extend([0.4, 0.5, 0.6, 0.7])

# Testing with various distances
assert not policy.evaluate_decision(0.3) # Less than all observations
assert policy.evaluate_decision(0.8) # Greater than all observations
assert not policy.evaluate_decision(0.5) # 0.5 is at the 50th percentile
assert not policy.evaluate_decision(0.6)


def test_dynamic_decision_policy_window_size() -> None:
config = DynamicThresholdCriterion(window_size=3, percentile=0.5)
policy = DynamicDecisionPolicy(config)
config = DynamicPercentileThresholdCriterion(window_size=3, percentile=0.5)
policy = DynamicPercentileThresholdPolicy(config)

# Add observations to fill the window
policy.evaluate_decision(0.4)
Expand All @@ -49,8 +57,8 @@ def test_dynamic_decision_policy_window_size() -> None:


def test_dynamic_decision_policy_percentile() -> None:
config = DynamicThresholdCriterion(window_size=4, percentile=0.75)
policy = DynamicDecisionPolicy(config)
config = DynamicPercentileThresholdCriterion(window_size=4, percentile=0.25)
policy = DynamicPercentileThresholdPolicy(config)

# Add observations
policy.evaluate_decision(0.4)
Expand All @@ -61,3 +69,18 @@ def test_dynamic_decision_policy_percentile() -> None:
assert not policy.evaluate_decision(0.5)
assert policy.evaluate_decision(0.8)
assert not policy.evaluate_decision(0.7)


def test_dynamic_decision_policy_average() -> None:
config = DynamicRollingAverageThresholdCriterion(window_size=2, deviation=0.1, absolute=True)
policy = DynamicRollingAverageThresholdPolicy(config)

# Add observations
policy.evaluate_decision(1.0)
policy.evaluate_decision(0.6)
policy.evaluate_decision(0.7)
policy.evaluate_decision(0.9)

assert not policy.evaluate_decision(0.7) # avg: 0.8
assert not policy.evaluate_decision(0.8) # avg: 0.8 (not >=0.1 deviation)
assert not policy.evaluate_decision(0.85) # avg: 0.75
Loading