Skip to content

Commit

Permalink
feat: Add IncorporationLatencyTracker (#602)
Browse files Browse the repository at this point in the history
  • Loading branch information
robinholzi authored Aug 27, 2024
1 parent 15ab573 commit 9f1e643
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
class IncorporationLatencyTracker:
def __init__(self) -> None:
self._current_regret = 0.0
"""The current value of the regret metric at the end of the last
interval."""

self._cumulative_latency_regret = 0.0
"""Cumulated regret (latency) reflecting the area under the regret
curve."""

@property
def cumulative_latency_regret(self) -> float:
return self._cumulative_latency_regret

def add_latency(self, regret: float, period_duration: float) -> float:
"""Add the new regret from the last interval to the cumulative regret.
Applicable if the regret can uniformly be aggregated to a scalar for every reporting data
batch. (e.g. batch drift distance)
This assumes uniformity in the aggregated regret value. If the regret is not uniform.
E.g. when it's build from the per-sample regrets of samples with varying timestamps, we
cannot simply assume that every regret component can be weighted by the period duration when adding it to
the cumulative regret. Some samples might only have arrived at the end of the period and thus
need a smaller weight. Consider using `add_latencies`.
Args:
regret_sum: The new regret value at the end of the interval.
interval_duration: The duration of the last period in seconds
Returns:
Most recent cumulative regret value.
"""
self._current_regret += regret
self._cumulative_latency_regret += self._current_regret * period_duration

return self._cumulative_latency_regret

def add_latencies(
self,
regrets: list[tuple[int, float]],
start_timestamp: int,
period_duration: float,
) -> float:
"""Add the new regret after computing the regret sum from a list of
per-sample regrets.
Addressed the non-uniformity in the regret values by computing the regret sum from the per-sample.
Args:
regrets: List of regrets for each sample in the last period with their timestamps.
start_timestamp: The timestamp of the start of the last period.
end_timestamp: The timestamp of the end of the last period.
Returns:
Most recent cumulative regret value.
"""
# We split the regrets into two parts: those that arrived in the last period and those that have been
# around for longer. In the latency cumulation (area under curve), the first make up a triangular shape,
# while the second contribute to the rectangle.

end_timestamp = start_timestamp + period_duration
regrets_durations = [(end_timestamp - timestamp, regret) for timestamp, regret in regrets]
new_regret = sum(regret for _, regret in regrets)

# rectangular shape of the area under the curve for the recently arrived regrets
new_regret_latency = sum(duration * regret for duration, regret in regrets_durations)

# old regret units that still contribute to area under curve (rectangular shape)
old_regret_latency = self._current_regret * period_duration

self._current_regret += new_regret
self._cumulative_latency_regret += old_regret_latency + new_regret_latency

return self._cumulative_latency_regret

def inform_trigger(self) -> None:
"""Informs the tracker about a trigger which will reset the counter."""
self._current_regret = 0
self._cumulative_latency_regret = 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from modyn.supervisor.internal.triggers.cost.incorporation_latency_tracker import (
IncorporationLatencyTracker,
)


def test_incorporation_latency_tracker() -> None:
tracker = IncorporationLatencyTracker()
assert tracker.cumulative_latency_regret == 0.0

cumulative_regret = tracker.add_latency(1.0, 2.0)
assert tracker._current_regret == 1.0
assert cumulative_regret == tracker.cumulative_latency_regret == 2.0

cumulative_regret = tracker.add_latency(0.5, 3.0)
assert tracker._current_regret == 1.5
assert cumulative_regret == tracker.cumulative_latency_regret == 2.0 + 1.5 * 3.0

tracker.inform_trigger()
assert tracker.cumulative_latency_regret == 0.0

cumulative_regret = tracker.add_latency(2.0, 1.0)
assert tracker._current_regret == 2.0
assert cumulative_regret == tracker.cumulative_latency_regret == 2.0


def test_add_latencies() -> None:
tracker = IncorporationLatencyTracker()
assert tracker.cumulative_latency_regret == 0.0

cumulative_regret_1 = tracker.add_latencies([(1, 1.0), (2, 0.5)], 0, 3.0)
assert tracker._current_regret == 1.5
assert cumulative_regret_1 == tracker.cumulative_latency_regret == (1.0 * (3.0 - 1.0) + 0.5 * (3.0 - 2.0))

# period: time=3 to 13
cumulative_regret_2 = tracker.add_latencies([(6, 4.0), (7, 2.0)], 3, 10.0)
assert tracker._current_regret == 1.5 + (4.0 + 2.0)
assert (
cumulative_regret_2
== tracker.cumulative_latency_regret
== cumulative_regret_1 + 1.5 * 10.0 + (4.0 * (13.0 - 6.0) + 2.0 * (13.0 - 7.0))
)

0 comments on commit 9f1e643

Please sign in to comment.