Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Cost tracker for CostTriggers #599

Merged
merged 15 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions modyn/supervisor/internal/triggers/cost/cost_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from collections import deque

import numpy as np
from sklearn import linear_model


class CostTracker:
"""Observes a stream trigger costs (wall clack time measurements) and
maintains a linear model assuming a linear relationship between the number
of samples and the time it takes to process them."""

def __init__(self, window_size: int = 1000) -> None:
"""
Args:
window_size: How many trigger into the past should be considered for the linear model.
"""
self.measurements: deque[tuple[int, float]] = deque(maxlen=window_size)
"""List of measurements of number of samples and the resulted training
time for the last `window_size` triggers."""

self._linear_model = linear_model.Ridge()

def inform_trigger(self, number_samples: int, elapsed_time: float) -> None:
"""Informs the tracker about new data batch."""
self.measurements.append((number_samples, elapsed_time))

samples = np.array([x[0] for x in self.measurements]).reshape(-1, 1)
observations = [x[1] for x in self.measurements]

self._linear_model.fit(samples, observations)

def needs_calibration(self) -> bool:
"""Checks if the tracker has enough data for a forecast.

After one trigger inform, the tracker forecasts with a constant
model. With the second trigger the model can learn the
y-intercept and start making meaningful forecasts.
"""
return len(self.measurements) == 0 or not hasattr(self._linear_model, "coef_")
robinholzi marked this conversation as resolved.
Show resolved Hide resolved

def forecast_training_time(self, number_samples: int) -> float:
"""Forecasts the training time for a given number of samples."""
assert not self.needs_calibration(), "The tracker needs more data to make a forecast."
return self._linear_model.predict(np.array([[number_samples]]))[0]
robinholzi marked this conversation as resolved.
Show resolved Hide resolved
50 changes: 50 additions & 0 deletions modyn/tests/supervisor/internal/triggers/cost/test_cost_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from collections import deque

import numpy as np
import pytest

from modyn.supervisor.internal.triggers.cost.cost_tracker import CostTracker


@pytest.fixture
def tracker() -> CostTracker:
return CostTracker(window_size=3)


def test_initial_state_and_needs_calibration(tracker: CostTracker) -> None:
assert len(tracker.measurements) == 0
assert tracker.needs_calibration()

tracker.inform_trigger(10, 0.5)
assert not tracker.needs_calibration()


def test_inform_trigger_updates_measurements(tracker: CostTracker) -> None:
tracker.inform_trigger(10, 0.5)
tracker.inform_trigger(20, 1.0)
assert tracker.measurements == deque([(10, 0.5), (20, 1.0)], maxlen=3)

tracker.inform_trigger(30, 1.5)
assert tracker.measurements == deque([(10, 0.5), (20, 1.0), (30, 1.5)], maxlen=3)

tracker.inform_trigger(40, 2.0)
assert tracker.measurements == deque([(20, 1.0), (30, 1.5), (40, 2.0)], maxlen=3)


def test_forecast_training_time_and_model_retraining(
tracker: CostTracker,
) -> None:
# Inform with data points
with pytest.raises(AssertionError, match="The tracker needs more data to make a forecast."):
tracker.forecast_training_time(20)

tracker.inform_trigger(10, 0.5)
assert tracker.forecast_training_time(20) == 0.5 # first inform will only configure a constant model
assert tracker.forecast_training_time(0) == 0.5

tracker.inform_trigger(20, 1.0)
initial_coef = tracker._linear_model.coef_.copy()

tracker.inform_trigger(30, 1.5)
updated_coef = tracker._linear_model.coef_.copy()
assert not np.array_equal(initial_coef, updated_coef)
Loading