| 
 | 1 | +from collections import OrderedDict  | 
 | 2 | +from copy import copy  | 
 | 3 | +from statistics import mean, stdev  | 
 | 4 | + | 
 | 5 | +from controller.sentry.models import Project  | 
 | 6 | + | 
 | 7 | + | 
 | 8 | +class SpikesDetector:  | 
 | 9 | +    def __init__(self, lag=48, threshold=5, influence=0) -> None:  | 
 | 10 | +        """  | 
 | 11 | +        Z-score based algorithm  | 
 | 12 | +        """  | 
 | 13 | + | 
 | 14 | +        # The lag parameter determines how much your data will be smoothed and how adaptive the  | 
 | 15 | +        # algorithm is to changes in the long-term average of the data. The more stationary your  | 
 | 16 | +        # data is, the more lags you should include (this should improve the robustness of the algorithm).  | 
 | 17 | +        # If your data contains time-varying trends, you should consider how quickly you want the algorithm  | 
 | 18 | +        # to adapt to these trends. I.e., if you put lag at 10, it takes 10 'periods' before the algorithm's  | 
 | 19 | +        # threshold is adjusted to any systematic changes in the long-term average. So choose the lag parameter  | 
 | 20 | +        # based on the trending behavior of your data and how adaptive you want the algorithm to be.  | 
 | 21 | +        self.lag = lag  | 
 | 22 | +        # The threshold parameter is the number of standard deviations from the moving mean above which the  | 
 | 23 | +        # algorithm will classify a new datapoint as being a signal. For example, if a new datapoint is 4.0  | 
 | 24 | +        # standard deviations above the moving mean and the threshold parameter is set as 3.5,  | 
 | 25 | +        # the algorithm will identify the datapoint as a signal. This parameter should be set based  | 
 | 26 | +        # on how many signals you expect.  | 
 | 27 | +        # For example, if your data is normally distributed, a threshold (or: z-score) of 3.5  | 
 | 28 | +        # corresponds to a signaling probability of 0.00047 (from this table),  | 
 | 29 | +        # which implies that you expect a signal once every 2128 datapoints (1/0.00047).  | 
 | 30 | +        # The threshold therefore directly influences how sensitive the algorithm is  | 
 | 31 | +        # and thereby also determines how often the algorithm signals.  | 
 | 32 | +        self.threshold = threshold  | 
 | 33 | +        # The influence parameter determines the influence of signals on the algorithm's detection threshold.  | 
 | 34 | +        # If put at 0, signals have no influence on the threshold, such that future signals are detected based  | 
 | 35 | +        # on a threshold that is calculated with a mean and standard deviation that is not influenced by past signals.  | 
 | 36 | +        # If put at 0.5, signals have half the influence of normal data points. Another way to think about this  | 
 | 37 | +        # is that if you put the influence at 0, you implicitly assume stationarity  | 
 | 38 | +        # (i.e. no matter how many signals there are, you always expect the time series to return to the  | 
 | 39 | +        # same average over the long term).  | 
 | 40 | +        # If this is not the case, you should put the influence parameter somewhere between 0 and 1,  | 
 | 41 | +        # depending on the extent to which signals can systematically influence the time-varying trend of the data.  | 
 | 42 | +        # E.g., if signals lead to a structural break of the long-term average of the time series,  | 
 | 43 | +        # the influence parameter should be put high (close to 1)  | 
 | 44 | +        # so the threshold can react to structural breaks quickly.  | 
 | 45 | +        self.influence = influence  | 
 | 46 | + | 
 | 47 | +    @classmethod  | 
 | 48 | +    def from_project(cls, project: Project):  | 
 | 49 | +        return cls(**project.detection_param)  | 
 | 50 | + | 
 | 51 | +    def compute_sentry(self, stats):  | 
 | 52 | +        series = next(  | 
 | 53 | +            (group["series"]["sum(quantity)"] for group in stats["groups"] if group["by"]["outcome"] == "accepted"),  | 
 | 54 | +            None,  | 
 | 55 | +        )  | 
 | 56 | +        if series is None:  | 
 | 57 | +            raise ValueError("No series with accepted outcome")  | 
 | 58 | + | 
 | 59 | +        signal, _, _ = self.compute(series)  | 
 | 60 | + | 
 | 61 | +        annotated_result = OrderedDict((date, signal) for date, signal in zip(stats["intervals"], signal))  | 
 | 62 | +        return annotated_result  | 
 | 63 | + | 
 | 64 | +    def compute(self, data):  | 
 | 65 | +        signals = [0] * self.lag  | 
 | 66 | +        avg_filter = [0] * self.lag  | 
 | 67 | +        std_filter = [0] * self.lag  | 
 | 68 | +        filtered_data = copy(data)  | 
 | 69 | +        avg_filter[self.lag - 1] = mean(data[: self.lag])  | 
 | 70 | +        std_filter[self.lag - 1] = stdev(data[: self.lag])  | 
 | 71 | + | 
 | 72 | +        for i, item in enumerate(data[self.lag :], start=self.lag):  | 
 | 73 | +            if abs(item - avg_filter[i - 1]) > self.threshold * std_filter[i - 1]:  | 
 | 74 | +                signals.append(1 if item > avg_filter[i - 1] else 0)  | 
 | 75 | +                filtered_data[i] = self.influence * item + (1 - self.influence) * filtered_data[i - 1]  | 
 | 76 | +            else:  | 
 | 77 | +                signals.append(0)  | 
 | 78 | +                filtered_data[i] = data[i]  | 
 | 79 | +            avg_filter.append(mean(filtered_data[(i - self.lag) : i]))  | 
 | 80 | +            std_filter.append(stdev(filtered_data[(i - self.lag) : i]))  | 
 | 81 | + | 
 | 82 | +        return signals, avg_filter, std_filter  | 
0 commit comments