Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions frouros/data_drift/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .distance_based import (
EMD,
Hellinger,
HistogramIntersection,
JS,
KL,
Expand All @@ -19,6 +20,7 @@
"ChiSquareTest",
"CVMTest",
"EMD",
"Hellinger",
"HistogramIntersection",
"JS",
"KL",
Expand Down
2 changes: 2 additions & 0 deletions frouros/data_drift/batch/distance_based/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Data drift batch distance based detection methods' init."""

from .emd import EMD
from .hellinger import Hellinger
from .histogram_intersection import HistogramIntersection
from .js import JS
from .kl import KL
Expand All @@ -9,6 +10,7 @@

__all__ = [
"EMD",
"Hellinger",
"HistogramIntersection",
"JS",
"KL",
Expand Down
67 changes: 65 additions & 2 deletions frouros/data_drift/batch/distance_based/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


class DistanceBasedBase(DataDriftBatchBase):
"""Abstract class representing a distance based."""
"""Abstract class representing a distance based detector."""

def _apply_method(
self, X_ref_: np.ndarray, X: np.ndarray, **kwargs # noqa: N803
Expand Down Expand Up @@ -47,8 +47,71 @@ def _distance_measure(
pass


class DistanceBinsBasedBase(DistanceBasedBase):
"""Abstract class representing a distance bins based detector."""

def __init__(self, num_bins: int = 10) -> None:
"""Init method.

:param num_bins: number of bins in which to divide probabilities
:type num_bins: int
"""
super().__init__(data_type=NumericalData(), statistical_type=UnivariateData())
self.num_bins = num_bins

@property
def num_bins(self) -> int:
"""Number of bins property.

:return: number of bins in which to divide probabilities
:rtype: int
"""
return self._num_bins

@num_bins.setter
def num_bins(self, value: int) -> None:
"""Number of bins setter.

:param value: value to be set
:type value: int
:raises ValueError: Value error exception
"""
if value < 1:
raise ValueError("value must be greater than 0.")
self._num_bins = value

def _distance_measure(
self, X_ref_: np.ndarray, X: np.ndarray, **kwargs # noqa: N803
) -> DistanceResult:
X_ref_percents, X_percents = self._calculate_bins_values( # noqa: N806
X_ref_=self.X_ref_, X=X, num_bins=self.num_bins
)
distance_bins = self._distance_measure_bins(X_ref_=X_ref_percents, X=X_percents)
distance = DistanceResult(distance=distance_bins)
return distance

@staticmethod
def _calculate_bins_values(
X_ref_: np.ndarray, X: np.ndarray, num_bins: int = 10 # noqa: N803
) -> np.ndarray:
bins = np.histogram(np.hstack((X_ref_, X)), bins=num_bins)[ # get the bin edges
1
]
X_ref_percents = ( # noqa: N806
np.histogram(a=X_ref_, bins=bins)[0] / X_ref_.shape[0]
) # noqa: N806
X_percents = np.histogram(a=X, bins=bins)[0] / X.shape[0] # noqa: N806
return X_ref_percents, X_percents

@abc.abstractmethod
def _distance_measure_bins(
self, X_ref_: np.ndarray, X: np.ndarray # noqa: N803
) -> float:
pass


class DistanceProbabilityBasedBase(DistanceBasedBase):
"""Abstract class representing a distance probability based."""
"""Abstract class representing a distance probability based detector."""

def __init__(self, num_bins: int = 100) -> None:
"""Init method.
Expand Down
39 changes: 39 additions & 0 deletions frouros/data_drift/batch/distance_based/hellinger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Hellinger distance module."""

import numpy as np # type: ignore

from frouros.data_drift.batch.distance_based.base import (
DistanceBinsBasedBase,
)


class Hellinger(DistanceBinsBasedBase):
"""Hellinger algorithm class."""

def __init__(self, num_bins: int = 10) -> None:
"""Init method.

:param num_bins: number of bins in which to divide probabilities
:type num_bins: int
"""
super().__init__(num_bins=num_bins)
self._sqrt_div = np.sqrt(2)

def _distance_measure_bins(
self,
X_ref_: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
) -> float:
distance = self._hellinger(
X_ref_=X_ref_,
X=X,
sqrt_div=self._sqrt_div,
)
return distance

@staticmethod
def _hellinger(
X_ref_: np.ndarray, X: np.ndarray, sqrt_div: float # noqa: N803
) -> float:
distance = np.sqrt(np.sum((np.sqrt(X_ref_) - np.sqrt(X)) ** 2)) / sqrt_div
return distance
88 changes: 13 additions & 75 deletions frouros/data_drift/batch/distance_based/psi.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,43 @@
"""PSI (Population Stability Index) module."""

import sys
from typing import Optional

import numpy as np # type: ignore

from frouros.data_drift.base import NumericalData, UnivariateData
from frouros.data_drift.batch.distance_based.base import (
DistanceBasedBase,
DistanceBinsBasedBase,
DistanceResult,
)


class PSI(DistanceBasedBase):
class PSI(DistanceBinsBasedBase):
"""PSI (Population Stability Index) algorithm class."""

def __init__(self, num_buckets: int = 10) -> None:
"""Init method.

:param num_buckets: number of buckets
:type num_buckets: int
"""
super().__init__(data_type=NumericalData(), statistical_type=UnivariateData())
self.num_buckets = num_buckets
self.X_ref_num: Optional[int] = None # pylint: disable=invalid-name

@property
def num_buckets(self) -> int:
"""Number of buckets.

:return: number of buckets
:rtype: int
"""
return self._num_buckets

@num_buckets.setter
def num_buckets(self, value: int) -> None:
"""Number of buckets setter.

:param value: value to be set
:type value: Optional[int]
:raises ValueError: Value error exception
"""
if value < 1:
raise ValueError("num buckets must be greater than 0.")
self._num_buckets = value

def fit(
self,
X: np.ndarray, # noqa: N803
) -> None:
"""Fit estimator.

:param X: feature data
:type X: numpy.ndarray
:return: fitted estimator
:rtype: self
"""
super().fit(X=X)
self.X_ref_num = self.X_ref_.shape[0] # type: ignore

def _apply_method(
self, X_ref_: np.ndarray, X: np.ndarray, **kwargs # noqa: N803
) -> DistanceResult:
distance = self._distance_measure(X_ref_=X_ref_, X=X, **kwargs)
return distance

def _distance_measure(
self, X_ref_: np.ndarray, X: np.ndarray, **kwargs # noqa: N803
) -> DistanceResult:
psi = self._psi(
def _distance_measure_bins(
self,
X_ref_: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
) -> float:
# Replace 0.0 values with the smallest number possible
# in order to avoid division by zero
X_ref_[X_ref_ == 0.0] = sys.float_info.min
X[X == 0.0] = sys.float_info.min
distance = self._psi(
X_ref_=X_ref_,
X=X,
X_ref_num=self.X_ref_num, # type: ignore
num_buckets=self.num_buckets,
)
distance = DistanceResult(distance=psi)
return distance

@staticmethod
def _psi(
X_ref_: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
X_ref_num: int, # noqa: N803 # pylint: disable=invalid-name
num_buckets: int,
) -> float:
X_ref_percents = ( # noqa: N806 # pylint: disable=invalid-name
np.histogram(a=X_ref_, bins=num_buckets)[0] / X_ref_num
)
X_percents = np.histogram( # noqa: N806 # pylint: disable=invalid-name
a=X, bins=num_buckets
)[0] / len(
X # noqa: N806
)

# Replace 0.0 values with the smallest number possible
# in order to avoid division by zero
X_ref_percents[X_ref_percents == 0.0] = sys.float_info.min
X_percents[X_percents == 0.0] = sys.float_info.min

psi = np.sum(
(X_percents - X_ref_percents) * np.log(X_percents / X_ref_percents)
)
psi = np.sum((X - X_ref_) * np.log(X / X_ref_))
return psi
24 changes: 24 additions & 0 deletions frouros/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,30 @@ def multivariate_distribution_q() -> Tuple[np.ndarray, np.ndarray]:
return mean, cov


@pytest.fixture(scope="module")
def univariate_distribution_p() -> Tuple[float, float]:
"""Univariate distribution p.

:return: mean and standard deviation of distribution p
:rtype: Tuple[float, float]
"""
mean, std = 1, 1

return mean, std


@pytest.fixture(scope="module")
def univariate_distribution_q() -> Tuple[float, float]:
"""Univariate distribution q.

:return: mean and standard deviation of distribution q
:rtype: Tuple[float, float]
"""
mean, std = 5, 2

return mean, std


@pytest.fixture(scope="module")
def prequential_error():
"""Prequential error.
Expand Down
67 changes: 66 additions & 1 deletion frouros/tests/test_data_drift.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from frouros.data_drift.batch.base import DataDriftBatchBase
from frouros.data_drift.batch.distance_based import (
EMD,
Hellinger,
HistogramIntersection,
PSI,
JS,
Expand Down Expand Up @@ -56,7 +57,6 @@ def test_batch_distance_based_categorical(
"detector, expected_distance",
[
(EMD(), 0.54726161),
(PSI(), 496.21968934),
(JS(), 0.81451218),
(KL(), np.inf),
(HistogramIntersection(), 0.97669491),
Expand Down Expand Up @@ -84,6 +84,71 @@ def test_batch_distance_based_univariate(
assert np.isclose(distance, expected_distance)


@pytest.mark.parametrize(
"detector, expected_distance",
[(PSI(), 468.79410784), (Hellinger(), 0.77137797)],
)
def test_batch_distance_bins_based_univariate_different_distribution(
univariate_distribution_p: Tuple[float, float],
univariate_distribution_q: Tuple[float, float],
detector: DataDriftBatchBase,
expected_distance: float,
num_samples: int = 500,
) -> None:
"""Test distance based univariate different distribution method.

:param univariate_distribution_p: mean and standard deviation of distribution p
:type univariate_distribution_p: Tuple[float, float]
:param univariate_distribution_q: mean and standard deviation of distribution q
:type univariate_distribution_q: Tuple[float, float]
:param detector: detector distance
:type detector: DataDriftBatchBase
:param expected_distance: expected p-value value
:type expected_distance: float
"""
np.random.seed(seed=31)
X_ref = np.random.normal(*univariate_distribution_p, size=num_samples) # noqa: N806
X_test = np.random.normal( # noqa: N806
*univariate_distribution_q, size=num_samples
)

detector.fit(X=X_ref)
distance = detector.compare(X=X_test)

assert np.isclose(distance, expected_distance)


@pytest.mark.parametrize(
"detector, expected_distance",
[(PSI(), 0.01840072), (Hellinger(), 0.04792538)],
)
def test_batch_distance_bins_based_univariate_same_distribution(
univariate_distribution_p: Tuple[float, float],
detector: DataDriftBatchBase,
expected_distance: float,
num_samples: int = 500,
) -> None:
"""Test distance based univariate same distribution method.

:param univariate_distribution_p: mean and standard deviation of distribution p
:type univariate_distribution_p: Tuple[float, float]
:param detector: detector distance
:type detector: DataDriftBatchBase
:param expected_distance: expected p-value value
:type expected_distance: float
"""
np.random.seed(seed=31)
X_ref = np.random.normal(*univariate_distribution_p, size=num_samples) # noqa: N806
X_test = np.random.normal( # noqa: N806
*univariate_distribution_p, size=num_samples
)

detector.fit(X=X_ref)
distance = detector.compare(X=X_test)

assert np.isclose(distance, expected_distance)


@pytest.mark.parametrize(
"detector, expected_statistic, expected_p_value",
[
Expand Down
Loading