Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Cost tracker for CostTriggers #599

Merged
merged 15 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/pipeline/triggering/DRIFT_TRIGGERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ classDiagram
bool absolute = False
}


DriftDecisionCriterion <|-- ThresholdDecisionCriterion
DriftDecisionCriterion <|-- DynamicThresholdCriterion

Expand Down
12 changes: 11 additions & 1 deletion modyn/config/schema/pipeline/trigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

from pydantic import Field

from .cost import * # noqa
from .cost import (
AvoidableMisclassificationCostTriggerConfig,
DataIncorporationLatencyCostTriggerConfig,
)
from .drift import * # noqa
from .drift import DataDriftTriggerConfig
from .ensemble import * # noqa
Expand All @@ -14,6 +19,11 @@
from .simple import SimpleTriggerConfig

TriggerConfig = Annotated[
SimpleTriggerConfig | DataDriftTriggerConfig | EnsembleTriggerConfig | PerformanceTriggerConfig,
AvoidableMisclassificationCostTriggerConfig
| DataIncorporationLatencyCostTriggerConfig
| DataDriftTriggerConfig
| EnsembleTriggerConfig
| PerformanceTriggerConfig
| SimpleTriggerConfig,
Field(discriminator="id"),
]
9 changes: 9 additions & 0 deletions modyn/config/schema/pipeline/trigger/cost/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .cost import (
AvoidableMisclassificationCostTriggerConfig,
DataIncorporationLatencyCostTriggerConfig,
)

__all__ = [
"AvoidableMisclassificationCostTriggerConfig",
"DataIncorporationLatencyCostTriggerConfig",
]
90 changes: 90 additions & 0 deletions modyn/config/schema/pipeline/trigger/cost/cost.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Cost aware triggers are evaluate the trade-off between the cost of a trigger
(e.g. measured by training machine time) and the benefit of the trigger (e.g.
measured by a regret metric like data incorporation latency or avoidable
misclassification incorporation latency)."""

from typing import Literal

from pydantic import Field

from modyn.config.schema.base_model import ModynBaseModel

from ..performance.performance import _InternalPerformanceTriggerConfig


class _CostTriggerConfig(ModynBaseModel):
"""Base class for cost aware trigger policies."""

cost_tracking_window_size: int = Field(
description="How many trigger into the past should be considered for the linear cost w.r.t. data amount model."
)


class DataIncorporationLatencyCostTriggerConfig(_CostTriggerConfig):
"""Cost aware trigger policy configuration that uses the data incorporation
latency as a regret metric.

While no trigger occurs samples are cumulated into to a number of un-integrated samples over time curve.
Rather than using this metric directly, we build an area-under-the-curve metric.
The area under the un-integrated samples curve measures the time samples have spent in the incorporation queue.

As this policy operates the two metrics `time` (cost) and `incorporation_delay` (regret) we need
a way to express the tradeoff between the two. A user e.g. has to specify how many seconds of training time he is
willing to eradicate a certain amount of cumulative regret (here `incorporation delay`).

`incorporation_delay_per_training_second` is this conversion rate between cost budget (training time) and regret
metric (incorporation latency).

When the cumulated regret (area under the curve) exceeds the training-time budget, a trigger is fired.
"""

id: Literal["DataIncorporationLatencyCostTrigger"] = Field("DataIncorporationLatencyCostTrigger")

# Conversion rate between budget (training time) and regret metric (misclassifications)
incorporation_delay_per_training_second: float = Field(
description=(
"How many seconds of samples having spent in the incorporation queue are we willing to accept "
"per second of training time saved."
)
)


class AvoidableMisclassificationCostTriggerConfig(_CostTriggerConfig, _InternalPerformanceTriggerConfig):
"""Cost aware trigger policy configuration that using the number of
avoidable misclassifications integration latency as a regret metric.

We suspect the cumulated number of misclassifications is very unstable and badly conditioned on user input as
it's a linear function with respect to the amount of data. As the training cost is also linear with respect to the
amount of data, this likely lead to a very brittle trigger policy.
That's why we penalize every second that an avoidable misclassification remains unaddressed (no trigger).

It's a result of combining this data incorporation latency idea with the static number of misclassification
performance trigger. This policy can be seen a combination of data incorporation latency based cost triggers
and performance aware triggers.

As this policy operates the two metrics `time` (cost) and `misclassification_incorporation_latency` (regret) we need
a way to express the tradeoff between the two. A user e.g. has to specify how many seconds of training time he is
willing to eradicate a certain amount of cumulative regret (here `incorporation delay`).

`avoidable_misclassification_latency_per_training_second` is this conversion rate between cost budget
(training time) and regret metric (misclassifications).

When a the regret metric exceeds the budget, a trigger is fired.

Like for performance aware triggers the same set of `decision_criteria` as `PerformanceTriggerConfig`
but implicitly adds a cost criterion to the list.

Not only evaluates data density and model performance but also
consider the cost of a trigger both in terms of wall clock time and
number of triggers.

We use the `_InternalPerformanceTriggerConfig` base class as we cannot override the `id` field of
`PerformanceTriggerConfig`.
"""

id: Literal["AvoidableMisclassificationCostTrigger"] = Field("AvoidableMisclassificationCost")

# Conversion rate between budget (training time) and regret metric (misclassifications)
avoidable_misclassification_latency_per_training_second: float = Field(
description="How many seconds of unaddressed avoidable misclassifications are we willing to accept per second of training time saved."
)
44 changes: 44 additions & 0 deletions modyn/supervisor/internal/triggers/cost/cost_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from collections import deque

import numpy as np
from sklearn import linear_model


class CostTracker:
"""Observes a stream trigger costs (wall clack time measurements) and
maintains a linear model assuming a linear relationship between the number
of samples and the time it takes to process them."""

def __init__(self, window_size: int = 1000) -> None:
"""
Args:
window_size: How many trigger into the past should be considered for the linear model.
"""
self.measurements: deque[tuple[int, float]] = deque(maxlen=window_size)
"""List of measurements of number of samples and the resulted training
time for the last `window_size` triggers."""

self._linear_model = linear_model.Ridge()

def inform_trigger(self, number_samples: int, elapsed_time: float) -> None:
"""Informs the tracker about new data batch."""
self.measurements.append((number_samples, elapsed_time))

samples = np.array([x[0] for x in self.measurements]).reshape(-1, 1)
observations = [x[1] for x in self.measurements]

self._linear_model.fit(samples, observations)

def needs_calibration(self) -> bool:
"""Checks if the tracker has enough data for a forecast.

After one trigger inform, the tracker forecasts with a constant
model. With the second trigger the model can learn the
y-intercept and start making meaningful forecasts.
"""
return len(self.measurements) == 0 or not hasattr(self._linear_model, "coef_")
robinholzi marked this conversation as resolved.
Show resolved Hide resolved

def forecast_training_time(self, number_samples: int) -> float:
"""Forecasts the training time for a given number of samples."""
assert not self.needs_calibration(), "The tracker needs more data to make a forecast."
return self._linear_model.predict(np.array([[number_samples]]))[0]
robinholzi marked this conversation as resolved.
Show resolved Hide resolved
50 changes: 50 additions & 0 deletions modyn/tests/supervisor/internal/triggers/cost/test_cost_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from collections import deque

import numpy as np
import pytest

from modyn.supervisor.internal.triggers.cost.cost_tracker import CostTracker


@pytest.fixture
def tracker() -> CostTracker:
return CostTracker(window_size=3)


def test_initial_state_and_needs_calibration(tracker: CostTracker) -> None:
assert len(tracker.measurements) == 0
assert tracker.needs_calibration()

tracker.inform_trigger(10, 0.5)
assert not tracker.needs_calibration()


def test_inform_trigger_updates_measurements(tracker: CostTracker) -> None:
tracker.inform_trigger(10, 0.5)
tracker.inform_trigger(20, 1.0)
assert tracker.measurements == deque([(10, 0.5), (20, 1.0)], maxlen=3)

tracker.inform_trigger(30, 1.5)
assert tracker.measurements == deque([(10, 0.5), (20, 1.0), (30, 1.5)], maxlen=3)

tracker.inform_trigger(40, 2.0)
assert tracker.measurements == deque([(20, 1.0), (30, 1.5), (40, 2.0)], maxlen=3)


def test_forecast_training_time_and_model_retraining(
tracker: CostTracker,
) -> None:
# Inform with data points
with pytest.raises(AssertionError, match="The tracker needs more data to make a forecast."):
tracker.forecast_training_time(20)

tracker.inform_trigger(10, 0.5)
assert tracker.forecast_training_time(20) == 0.5 # first inform will only configure a constant model
assert tracker.forecast_training_time(0) == 0.5

tracker.inform_trigger(20, 1.0)
initial_coef = tracker._linear_model.coef_.copy()

tracker.inform_trigger(30, 1.5)
updated_coef = tracker._linear_model.coef_.copy()
assert not np.array_equal(initial_coef, updated_coef)
Loading