|
| 1 | +"""Kuiper's test module.""" |
| 2 | + |
| 3 | +from typing import Optional, Union |
| 4 | + |
| 5 | +import numpy as np # type: ignore |
| 6 | +from scipy.special import comb, factorial # type: ignore |
| 7 | +from scipy.stats import ks_2samp # type: ignore |
| 8 | + |
| 9 | +from frouros.callbacks.batch.base import BaseCallbackBatch |
| 10 | +from frouros.detectors.data_drift.base import NumericalData, UnivariateData |
| 11 | +from frouros.detectors.data_drift.batch.statistical_test.base import ( |
| 12 | + BaseStatisticalTest, |
| 13 | + StatisticalResult, |
| 14 | +) |
| 15 | + |
| 16 | + |
| 17 | +class KuiperTest(BaseStatisticalTest): |
| 18 | + """Kuiper's test [kuiper1960tests]_ detector. |
| 19 | +
|
| 20 | + :param callbacks: callbacks, defaults to None |
| 21 | + :type callbacks: Optional[Union[BaseCallbackBatch, list[BaseCallbackBatch]]] |
| 22 | +
|
| 23 | + :References: |
| 24 | +
|
| 25 | + .. [kuiper1960tests] Kuiper, Nicolaas H. |
| 26 | + "Tests concerning random points on a circle." |
| 27 | + Nederl. Akad. Wetensch. Proc. Ser. A. Vol. 63. No. 1. 1960. |
| 28 | +
|
| 29 | + :Example: |
| 30 | +
|
| 31 | + >>> from frouros.detectors.data_drift import KuiperTest |
| 32 | + >>> import numpy as np |
| 33 | + >>> np.random.seed(seed=31) |
| 34 | + >>> X = np.random.normal(loc=0, scale=1, size=100) |
| 35 | + >>> Y = np.random.normal(loc=1, scale=1, size=100) |
| 36 | + >>> detector = KuiperTest() |
| 37 | + >>> _ = detector.fit(X=X) |
| 38 | + >>> detector.compare(X=Y)[0] |
| 39 | + StatisticalResult(statistic=0.55, p_value=5.065664859580971e-13) |
| 40 | + """ # noqa: E501 # pylint: disable=line-too-long |
| 41 | + |
| 42 | + def __init__( # noqa: D107 |
| 43 | + self, |
| 44 | + callbacks: Optional[Union[BaseCallbackBatch, list[BaseCallbackBatch]]] = None, |
| 45 | + ) -> None: |
| 46 | + super().__init__( |
| 47 | + data_type=NumericalData(), |
| 48 | + statistical_type=UnivariateData(), |
| 49 | + callbacks=callbacks, |
| 50 | + ) |
| 51 | + |
| 52 | + @staticmethod |
| 53 | + def _statistical_test( |
| 54 | + X_ref: np.ndarray, # noqa: N803 |
| 55 | + X: np.ndarray, |
| 56 | + **kwargs, |
| 57 | + ) -> StatisticalResult: |
| 58 | + statistic, p_value = KuiperTest._kuiper( |
| 59 | + X=X_ref, |
| 60 | + Y=X, |
| 61 | + **kwargs, |
| 62 | + ) |
| 63 | + test = StatisticalResult( |
| 64 | + statistic=statistic, |
| 65 | + p_value=p_value, |
| 66 | + ) |
| 67 | + return test |
| 68 | + |
| 69 | + @staticmethod |
| 70 | + def _false_positive_probability( |
| 71 | + D: float, # noqa: N803 |
| 72 | + N: float, |
| 73 | + ) -> float: |
| 74 | + """ |
| 75 | + Compute the false positive probability for the Kuiper's test. |
| 76 | +
|
| 77 | + NOTE: This function is a modified version of the |
| 78 | + `kuiper_false_positive_probability` function from the |
| 79 | + `astropy.stats` <https://docs.astropy.org/en/stable/api/astropy.stats.kuiper_false_positive_probability.html#astropy.stats.kuiper_false_positive_probability> package. # noqa: E501 |
| 80 | +
|
| 81 | + :param D: Kuiper's statistic |
| 82 | + :param D: float |
| 83 | + :param N: effective size of the sample |
| 84 | + :type N: float |
| 85 | + :return: false positive probability |
| 86 | + :rtype: float |
| 87 | + """ |
| 88 | + if D < 2.0 / N: |
| 89 | + return 1.0 - factorial(N) * (D - 1.0 / N) ** (N - 1) |
| 90 | + |
| 91 | + if D < 3.0 / N: |
| 92 | + k = -(N * D - 1.0) / 2.0 |
| 93 | + r = np.sqrt(k**2 - (N * D - 2.0) ** 2 / 2.0) |
| 94 | + a, b = -k + r, -k - r |
| 95 | + return 1 - ( |
| 96 | + factorial(N - 1) |
| 97 | + * (b ** (N - 1) * (1 - a) - a ** (N - 1) * (1 - b)) |
| 98 | + / N ** (N - 2) |
| 99 | + / (b - a) |
| 100 | + ) |
| 101 | + |
| 102 | + if (D > 0.5 and N % 2 == 0) or (D > (N - 1.0) / (2.0 * N) and N % 2 == 1): |
| 103 | + # NOTE: the upper limit of this sum is taken from Stephens 1965 |
| 104 | + t = np.arange(np.floor(N * (1 - D)) + 1) |
| 105 | + y = D + t / N |
| 106 | + Tt = y ** (t - 3) * ( # noqa: N806 |
| 107 | + y**3 * N |
| 108 | + - y**2 * t * (3 - 2 / N) |
| 109 | + + y * t * (t - 1) * (3 - 2 / N) / N |
| 110 | + - t * (t - 1) * (t - 2) / N**2 |
| 111 | + ) |
| 112 | + term1 = comb(N, t) |
| 113 | + term2 = (1 - D - t / N) ** (N - t - 1) |
| 114 | + # term1 is formally finite, but is approximated by numpy as np.inf for |
| 115 | + # large values, so we set them to zero manually when they would be |
| 116 | + # multiplied by zero anyway |
| 117 | + term1[(term1 == np.inf) & (term2 == 0)] = 0.0 |
| 118 | + return (Tt * term1 * term2).sum() |
| 119 | + |
| 120 | + z = D * np.sqrt(N) |
| 121 | + # When m*z>18.82 (sqrt(-log(finfo(double))/2)), exp(-2m**2z**2) |
| 122 | + # underflows. Cutting off just before avoids triggering a (pointless) |
| 123 | + # underflow warning if `under="warn"`. |
| 124 | + ms = np.arange(1, 18.82 / z) |
| 125 | + S1 = ( # noqa: N806 |
| 126 | + 2 * (4 * ms**2 * z**2 - 1) * np.exp(-2 * ms**2 * z**2) |
| 127 | + ).sum() |
| 128 | + S2 = ( # noqa: N806 |
| 129 | + ms**2 * (4 * ms**2 * z**2 - 3) * np.exp(-2 * ms**2 * z**2) |
| 130 | + ).sum() |
| 131 | + return S1 - 8 * D / 3 * S2 |
| 132 | + |
| 133 | + @staticmethod |
| 134 | + def _kuiper( |
| 135 | + X: np.ndarray, # noqa: N803 |
| 136 | + Y: np.ndarray, |
| 137 | + ) -> tuple[float, float]: |
| 138 | + """Kuiper's test. |
| 139 | +
|
| 140 | + :param X: reference data |
| 141 | + :type X: numpy.ndarray |
| 142 | + :param Y: test data |
| 143 | + :type Y: numpy.ndarray |
| 144 | + :return: Kuiper's statistic and p-value |
| 145 | + :rtype: tuple[float, float] |
| 146 | + """ |
| 147 | + X = np.sort(X) # noqa: N806 |
| 148 | + Y = np.sort(Y) # noqa: N806 |
| 149 | + X_size = len(X) # noqa: N806 |
| 150 | + Y_size = len(Y) # noqa: N806 |
| 151 | + |
| 152 | + statistic = ks_2samp( |
| 153 | + data1=X, |
| 154 | + data2=Y, |
| 155 | + alternative="two-sided", |
| 156 | + ).statistic |
| 157 | + sample_effective_size = X_size * Y_size / float(X_size + Y_size) |
| 158 | + |
| 159 | + p_value = KuiperTest._false_positive_probability( |
| 160 | + D=statistic, |
| 161 | + N=sample_effective_size, |
| 162 | + ) |
| 163 | + |
| 164 | + return statistic, p_value |
0 commit comments