Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions frouros/detectors/data_drift/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def X_ref(self, value: Optional[np.ndarray]) -> None: # noqa: N802
self._check_array(X=value)
self._X_ref = value

def fit(self, X: np.ndarray) -> Dict[str, Any]: # noqa: N803
def fit(self, X: np.ndarray, **kwargs) -> Dict[str, Any]: # noqa: N803
"""Fit detector.

:param X: feature data
Expand All @@ -186,7 +186,7 @@ def fit(self, X: np.ndarray) -> Dict[str, Any]: # noqa: N803
self._check_fit_dimensions(X=X)
for callback in self.callbacks: # type: ignore
callback.on_fit_start()
self._fit(X=X)
self._fit(X=X, **kwargs)
for callback in self.callbacks: # type: ignore
callback.on_fit_end()

Expand Down
7 changes: 6 additions & 1 deletion frouros/detectors/data_drift/batch/distance_based/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
pass

Expand Down Expand Up @@ -166,6 +167,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
distance_bins = self._distance_measure_bins(X_ref=X_ref, X=X)
distance = DistanceResult(distance=distance_bins)
Expand All @@ -186,7 +188,9 @@ def _calculate_bins_values(

@abc.abstractmethod
def _distance_measure_bins(
self, X_ref: np.ndarray, X: np.ndarray # noqa: N803
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
) -> float:
pass

Expand Down Expand Up @@ -246,6 +250,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
pass

Expand Down
1 change: 1 addition & 0 deletions frouros/detectors/data_drift/batch/distance_based/emd.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
emd = self._emd(X=X_ref, Y=X, **self.kwargs)
distance = DistanceResult(distance=emd)
Expand Down
5 changes: 3 additions & 2 deletions frouros/detectors/data_drift/batch/distance_based/js.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(
self,
num_bins: int = 10,
callbacks: Optional[Union[Callback, List[Callback]]] = None,
**kwargs
**kwargs,
) -> None:
"""Init method.

Expand All @@ -50,6 +50,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
js = self._js(X=X_ref, Y=X, num_bins=self.num_bins, **self.kwargs)
distance = DistanceResult(distance=js)
Expand All @@ -61,7 +62,7 @@ def _js(
Y: np.ndarray,
*,
num_bins: int,
**kwargs: Dict[str, Any]
**kwargs: Dict[str, Any],
) -> float:
( # noqa: N806
X_ref_rvs,
Expand Down
1 change: 1 addition & 0 deletions frouros/detectors/data_drift/batch/distance_based/kl.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
kl = self._kl(X=X_ref, Y=X, num_bins=self.num_bins, **self.kwargs)
distance = DistanceResult(distance=kl)
Expand Down
176 changes: 160 additions & 16 deletions frouros/detectors/data_drift/batch/distance_based/mmd.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""MMD (Maximum Mean Discrepancy) module."""

from typing import Callable, Optional, List, Union
import itertools
import math
from typing import Callable, Iterator, Optional, List, Union

import numpy as np # type: ignore
from scipy.spatial.distance import cdist # type: ignore
import tqdm # type: ignore

from frouros.callbacks import Callback
from frouros.detectors.data_drift.base import MultivariateData
Expand Down Expand Up @@ -43,12 +46,15 @@ class MMD(DistanceBasedBase):
def __init__(
self,
kernel: Callable = rbf_kernel,
chunk_size: Optional[int] = None,
callbacks: Optional[Union[Callback, List[Callback]]] = None,
) -> None:
"""Init method.

:param kernel: kernel function to use
:type kernel: Callable
:param chunk_size:
:type chunk_size: Optional[int]
:param callbacks: callbacks
:type callbacks: Optional[Union[Callback, List[Callback]]]
"""
Expand All @@ -61,13 +67,42 @@ def __init__(
callbacks=callbacks,
)
self.kernel = kernel
self.chunk_size = chunk_size
self._chunk_size_x = None
self._expected_k_x = None
self._X_num_samples = None

@property
def chunk_size(self) -> Optional[int]:
"""Chunk size property.

:return: chunk size to use
:rtype: int
"""
return self._chunk_size

@chunk_size.setter
def chunk_size(self, value: Optional[int]) -> None:
"""Chunk size method setter.

:param value: value to be set
:type value: Optional[int]
:raises TypeError: Type error exception
"""
if value is not None:
if isinstance(value, int): # type: ignore
if value <= 0:
raise ValueError("chunk_size must be greater than 0 or None.")
else:
raise TypeError("chunk_size must be of type int or None.")
self._chunk_size = value

@property
def kernel(self) -> Callable:
"""Kernel property.

:return: kernel function to use
:rtype: Kernel
:rtype: Callable
"""
return self._kernel

Expand All @@ -80,38 +115,147 @@ def kernel(self, value: Callable) -> None:
:raises TypeError: Type error exception
"""
if not isinstance(value, Callable): # type: ignore
raise TypeError("value must be of type Callable.")
raise TypeError("kernel must be of type Callable.")
self._kernel = value

def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
mmd = self._mmd(X=X_ref, Y=X, kernel=self.kernel)
mmd = self._mmd(X=X_ref, Y=X, kernel=self.kernel, **kwargs)
distance_test = DistanceResult(distance=mmd)
return distance_test

def _fit(
self,
X: np.ndarray, # noqa: N803
**kwargs,
) -> None:
super()._fit(X=X)
# Add dimension only for the kernel calculation (if dim == 1)
if X.ndim == 1:
X = np.expand_dims(X, axis=1) # noqa: N806
self._X_num_samples = len(X) # type: ignore # noqa: N806

self._chunk_size_x = (
self._X_num_samples
if self.chunk_size is None
else self.chunk_size # type: ignore
)

X_chunks = self._get_chunks( # noqa: N806
data=X,
chunk_size=self._chunk_size_x, # type: ignore
)
X_chunks_combinations = itertools.product(X_chunks, repeat=2) # noqa: N806

if kwargs.get("verbose", False):
num_chunks = (
math.ceil(self._X_num_samples / self._chunk_size_x) ** 2 # type: ignore
)
k_x_sum = np.array(
[
self.kernel(*X_chunk).sum()
for X_chunk in tqdm.tqdm( # noqa: N806
X_chunks_combinations, total=num_chunks # noqa: N806
)
]
).sum()
else:
k_x_sum = np.array(
[
self.kernel(*X_chunk).sum()
for X_chunk in X_chunks_combinations # noqa: N806
]
).sum()
self._expected_k_x = k_x_sum / (
self._X_num_samples * (self._X_num_samples - 1) # type: ignore
)

@staticmethod
def _mmd(
def _get_chunks(data: np.ndarray, chunk_size: int) -> Iterator:
chunks = (
data[i : i + chunk_size] # noqa: E203
for i in range(0, len(data), chunk_size)
)
return chunks

def _mmd( # pylint: disable=too-many-locals
self,
X: np.ndarray, # noqa: N803
Y: np.ndarray,
*,
kernel: Callable,
**kwargs,
) -> float: # noqa: N803
X_num_samples = X.shape[0] # noqa: N806
Y_num_samples = Y.shape[0] # noqa: N806
data = np.concatenate([X, Y]) # noqa: N806
# Only check for X dimension (X == Y dim comparison has been already made)
if X.ndim == 1:
data = np.expand_dims(data, axis=1)
X = np.expand_dims(X, axis=1) # noqa: N806
Y = np.expand_dims(Y, axis=1) # noqa: N806

X_chunks = self._get_chunks( # noqa: N806
data=X,
chunk_size=self._chunk_size_x, # type: ignore
)
Y_num_samples = len(Y) # noqa: N806
chunk_size_y = Y_num_samples if self.chunk_size is None else self.chunk_size
Y_chunks, Y_chunks_copy = itertools.tee( # noqa: N806
self._get_chunks(
data=Y,
chunk_size=chunk_size_y, # type: ignore
),
2,
)
Y_chunks_combinations = itertools.product( # noqa: N806
Y_chunks,
repeat=2,
)
XY_chunks_combinations = itertools.product( # noqa: N806
X_chunks,
Y_chunks_copy,
)

if kwargs.get("verbose", False):
num_chunks_y = math.ceil(Y_num_samples / self.chunk_size) # type: ignore
num_chunks_y_combinations = num_chunks_y**2
num_chunks_xy = (
math.ceil(len(X) / self._chunk_size_x) * num_chunks_y # type: ignore
)
sum_y = np.array(
[
kernel(*Y_chunk).sum()
for Y_chunk in tqdm.tqdm( # noqa: N806
Y_chunks_combinations, total=num_chunks_y_combinations
)
]
).sum()
sum_xy = np.array(
[
kernel(*XY_chunk).sum()
for XY_chunk in tqdm.tqdm( # noqa: N806
XY_chunks_combinations, total=num_chunks_xy
)
]
).sum()
else:
sum_y = np.array(
[
kernel(*Y_chunk).sum()
for Y_chunk in Y_chunks_combinations # noqa: N806
]
).sum()
sum_xy = np.array(
[
kernel(*XY_chunk).sum()
for XY_chunk in XY_chunks_combinations # noqa: N806
]
).sum()

k_matrix = kernel(X=data, Y=data)
k_x = k_matrix[:X_num_samples, :X_num_samples]
k_y = k_matrix[Y_num_samples:, Y_num_samples:]
k_xy = k_matrix[:X_num_samples, Y_num_samples:]
mmd = (
k_x.sum() / (X_num_samples * (X_num_samples - 1))
+ k_y.sum() / (Y_num_samples * (Y_num_samples - 1))
- 2 * k_xy.sum() / (X_num_samples * Y_num_samples)
self._expected_k_x
+ sum_y / (Y_num_samples * (Y_num_samples - 1))
- 2 * sum_xy / (self._X_num_samples * Y_num_samples) # type: ignore
)
return mmd
52 changes: 51 additions & 1 deletion frouros/tests/integration/test_data_drift.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Test data drift detectors."""

from typing import Tuple
from typing import Tuple, Union

import pytest # type: ignore
import numpy as np # type: ignore
Expand Down Expand Up @@ -243,6 +243,56 @@ def test_batch_distance_based_multivariate_same_distribution(
assert np.isclose(statistic, expected_distance)


@pytest.mark.parametrize(
"detector, expected_distance",
[(MMD(chunk_size=10), 0.12183835), (MMD(chunk_size=None), 0.12183835)],
)
def test_batch_distance_based_chunk_size_valid(
X_ref_multivariate: np.ndarray, # noqa: N803
X_test_multivariate: np.ndarray, # noqa: N803
detector: DataDriftBatchBase,
expected_distance: float,
) -> None:
"""Test batch distance based chunk size valid method.

:param X_ref_multivariate: reference multivariate data
:type X_ref_multivariate: numpy.ndarray
:param X_test_multivariate: test multivariate data
:type X_test_multivariate: numpy.ndarray
:param detector: detector test
:type detector: DataDriftBatchBase
:param expected_distance: expected distance value
:type expected_distance: float
"""
_ = detector.fit(X=X_ref_multivariate)
statistic, _ = detector.compare(X=X_test_multivariate)

assert np.isclose(statistic, expected_distance)


@pytest.mark.parametrize(
"chunk_size, expected_exception",
[
(1.5, TypeError),
("10", TypeError),
(-1, ValueError),
],
)
def test_batch_distance_based_chunk_size_invalid(
chunk_size: Union[int, float, str],
expected_exception: Union[TypeError, ValueError],
) -> None:
"""Test batch distance based chunk size invalid method.

:param chunk_size: chunk size
:type chunk_size: Union[int, float, str]
:param expected_exception: expected exception
:type expected_exception: Union[TypeError, ValueError]
"""
with pytest.raises(expected_exception):
_ = MMD(chunk_size=chunk_size) # type: ignore


@pytest.mark.parametrize(
"detector, expected_statistic, expected_p_value",
[
Expand Down
Loading