-
Notifications
You must be signed in to change notification settings - Fork 224
/
Copy pathmahalanobis.py
352 lines (306 loc) · 14 KB
/
mahalanobis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import logging
import numpy as np
from scipy.linalg import eigh
from typing import Dict, Optional
from alibi_detect.utils.discretizer import Discretizer
from alibi_detect.utils.distance import abdm, mvdm, multidim_scaling
from alibi_detect.utils.mapping import ohe2ord, ord2num
from alibi_detect.base import BaseDetector, FitMixin, ThresholdMixin, outlier_prediction_dict
logger = logging.getLogger(__name__)
EPSILON = 1e-8
class Mahalanobis(BaseDetector, FitMixin, ThresholdMixin):
def __init__(self,
threshold: float = None,
n_components: int = 3,
std_clip: int = 3,
start_clip: int = 100,
max_n: int = None,
cat_vars: dict = None,
ohe: bool = False,
data_type: str = 'tabular'
) -> None:
"""
Outlier detector for tabular data using the Mahalanobis distance.
Parameters
----------
threshold
Mahalanobis distance threshold used to classify outliers.
n_components
Number of principal components used.
std_clip
Feature-wise stdev used to clip the observations before updating the mean and cov.
start_clip
Number of observations before clipping is applied.
max_n
Algorithm behaves as if it has seen at most max_n points.
cat_vars
Dict with as keys the categorical columns and as values
the number of categories per categorical variable.
ohe
Whether the categorical variables are one-hot encoded (OHE) or not. If not OHE, they are
assumed to have ordinal encodings.
data_type
Optionally specifiy the data type (tabular, image or time-series). Added to metadata.
"""
super().__init__()
if threshold is None:
logger.warning('No threshold level set. Need to infer threshold using `infer_threshold`.')
self.threshold = threshold
self.n_components = n_components
self.std_clip = std_clip
self.start_clip = start_clip
self.max_n = max_n
# variables used in mapping from categorical to numerical values
# keys = categorical columns; values = numerical value for each of the categories
self.cat_vars = cat_vars
self.ohe = ohe
self.d_abs: Dict = {}
# initial parameter values
self.clip: Optional[list] = None
self.mean = 0
self.C = 0
self.n = 0
# set metadata
self.meta['detector_type'] = 'outlier'
self.meta['data_type'] = data_type
self.meta['online'] = True
def fit(self,
X: np.ndarray,
y: np.ndarray = None,
d_type: str = 'abdm',
w: float = None,
disc_perc: list = [25, 50, 75],
standardize_cat_vars: bool = True,
feature_range: tuple = (-1e10, 1e10),
smooth: float = 1.,
center: bool = True
) -> None:
"""
If categorical variables are present, then transform those to numerical values.
This step is not necessary in the absence of categorical variables.
Parameters
----------
X
Batch of instances used to infer distances between categories from.
y
Model class predictions or ground truth labels for X.
Used for 'mvdm' and 'abdm-mvdm' pairwise distance metrics.
Note that this is only compatible with classification problems. For regression problems,
use the 'abdm' distance metric.
d_type
Pairwise distance metric used for categorical variables. Currently, 'abdm', 'mvdm' and 'abdm-mvdm'
are supported. 'abdm' infers context from the other variables while 'mvdm' uses the model predictions.
'abdm-mvdm' is a weighted combination of the two metrics.
w
Weight on 'abdm' (between 0. and 1.) distance if d_type equals 'abdm-mvdm'.
disc_perc
List with percentiles used in binning of numerical features used for the 'abdm'
and 'abdm-mvdm' pairwise distance measures.
standardize_cat_vars
Standardize numerical values of categorical variables if True.
feature_range
Tuple with min and max ranges to allow for perturbed instances. Min and max ranges can be floats or
numpy arrays with dimension (1x nb of features) for feature-wise ranges.
smooth
Smoothing exponent between 0 and 1 for the distances. Lower values of l will smooth the difference in
distance metric between different features.
center
Whether to center the scaled distance measures. If False, the min distance for each feature
except for the feature with the highest raw max distance will be the lower bound of the
feature range, but the upper bound will be below the max feature range.
"""
if self.cat_vars is None:
raise TypeError('No categorical variables specified in the "cat_vars" argument.')
if d_type not in ['abdm', 'mvdm', 'abdm-mvdm']:
raise ValueError('d_type needs to be "abdm", "mvdm" or "abdm-mvdm". '
'{} is not supported.'.format(d_type))
if self.ohe:
X_ord, cat_vars_ord = ohe2ord(X, self.cat_vars)
else:
X_ord, cat_vars_ord = X, self.cat_vars
# bin numerical features to compute the pairwise distance matrices
cat_keys = list(cat_vars_ord.keys())
n_ord = X_ord.shape[1]
if d_type in ['abdm', 'abdm-mvdm'] and len(cat_keys) != n_ord:
fnames = [str(_) for _ in range(n_ord)]
disc = Discretizer(X_ord, cat_keys, fnames, percentiles=disc_perc)
X_bin = disc.discretize(X_ord)
cat_vars_bin = {k: len(disc.names[k]) for k in range(n_ord) if k not in cat_keys}
else:
X_bin = X_ord
cat_vars_bin = {}
# pairwise distances for categorical variables
if d_type == 'abdm':
d_pair = abdm(X_bin, cat_vars_ord, cat_vars_bin)
elif d_type == 'mvdm':
d_pair = mvdm(X_ord, y, cat_vars_ord, alpha=1)
if (type(feature_range[0]) == type(feature_range[1]) and # noqa
type(feature_range[0]) in [int, float]):
feature_range = (np.ones((1, n_ord)) * feature_range[0],
np.ones((1, n_ord)) * feature_range[1])
if d_type == 'abdm-mvdm':
# pairwise distances
d_abdm = abdm(X_bin, cat_vars_ord, cat_vars_bin)
d_mvdm = mvdm(X_ord, y, cat_vars_ord, alpha=1)
# multidim scaled distances
d_abs_abdm = multidim_scaling(d_abdm, n_components=2, use_metric=True,
feature_range=feature_range,
standardize_cat_vars=standardize_cat_vars,
smooth=smooth, center=center,
update_feature_range=False)[0]
d_abs_mvdm = multidim_scaling(d_mvdm, n_components=2, use_metric=True,
feature_range=feature_range,
standardize_cat_vars=standardize_cat_vars,
smooth=smooth, center=center,
update_feature_range=False)[0]
# combine abdm and mvdm
for k, v in d_abs_abdm.items():
self.d_abs[k] = v * w + d_abs_mvdm[k] * (1 - w)
if center: # center the numerical feature values
self.d_abs[k] -= .5 * (self.d_abs[k].max() + self.d_abs[k].min())
else:
self.d_abs = multidim_scaling(d_pair, n_components=2, use_metric=True,
feature_range=feature_range,
standardize_cat_vars=standardize_cat_vars,
smooth=smooth, center=center,
update_feature_range=False)[0]
def infer_threshold(self,
X: np.ndarray,
threshold_perc: float = 95.
) -> None:
"""
Update threshold by a value inferred from the percentage of instances considered to be
outliers in a sample of the dataset.
Parameters
----------
X
Batch of instances.
threshold_perc
Percentage of X considered to be normal based on the outlier score.
"""
# convert categorical variables to numerical values
X = self.cat2num(X)
# compute outlier scores
iscore = self.score(X)
# update threshold
self.threshold = np.percentile(iscore, threshold_perc)
def cat2num(self, X: np.ndarray) -> np.ndarray:
"""
Convert categorical variables to numerical values.
Parameters
----------
X
Batch of instances to analyze.
Returns
-------
Batch of instances where categorical variables are converted to numerical values.
"""
if self.cat_vars is not None: # convert categorical variables
if self.ohe:
X = ohe2ord(X, self.cat_vars)[0]
X = ord2num(X, self.d_abs)
return X
def score(self, X: np.ndarray) -> np.ndarray:
"""
Compute outlier scores.
Parameters
----------
X
Batch of instances to analyze.
Returns
-------
Array with outlier scores for each instance in the batch.
"""
n_batch, n_params = X.shape # batch size and number of features
n_components = min(self.n_components, n_params)
if self.max_n is not None:
n = min(self.n, self.max_n) # n can never be above max_n
else:
n = self.n
# clip X
if self.n > self.start_clip:
X_clip = np.clip(X, self.clip[0], self.clip[1])
else:
X_clip = X
# track mean and covariance matrix
roll_partial_means = X_clip.cumsum(axis=0) / (np.arange(n_batch) + 1).reshape((n_batch, 1))
coefs = (np.arange(n_batch) + 1.) / (np.arange(n_batch) + n + 1.)
new_means = self.mean + coefs.reshape((n_batch, 1)) * (roll_partial_means - self.mean)
new_means_offset = np.empty_like(new_means)
new_means_offset[0] = self.mean
new_means_offset[1:] = new_means[:-1]
coefs = ((n + np.arange(n_batch)) / (n + np.arange(n_batch) + 1.)).reshape((n_batch, 1, 1))
B = coefs * np.matmul((X_clip - new_means_offset)[:, :, None], (X_clip - new_means_offset)[:, None, :])
cov_batch = (n - 1.) / (n + max(1, n_batch - 1.)) * self.C + 1. / (n + max(1, n_batch - 1.)) * B.sum(axis=0)
# PCA
eigvals, eigvects = eigh(cov_batch, eigvals=(n_params - n_components, n_params - 1))
# projections
proj_x = np.matmul(X, eigvects)
proj_x_clip = np.matmul(X_clip, eigvects)
proj_means = np.matmul(new_means_offset, eigvects)
if isinstance(self.C, int) and self.C == 0:
proj_cov = np.diag(np.zeros(n_components))
else:
proj_cov = np.matmul(eigvects.transpose(), np.matmul(self.C, eigvects))
# outlier scores are computed in the principal component space
coefs = (1. / (n + np.arange(n_batch) + 1.)).reshape((n_batch, 1, 1))
B = coefs * np.matmul((proj_x_clip - proj_means)[:, :, None], (proj_x_clip - proj_means)[:, None, :])
all_C_inv = np.zeros_like(B)
c_inv = None
for i, b in enumerate(B):
if c_inv is None:
if abs(np.linalg.det(proj_cov)) > EPSILON:
c_inv = np.linalg.inv(proj_cov)
all_C_inv[i] = c_inv
continue
else:
if n + i == 0:
continue
proj_cov = (n + i - 1.) / (n + i) * proj_cov + b
continue
else:
c_inv = (n + i - 1.) / float(n + i - 2.) * all_C_inv[i - 1]
BC1 = np.matmul(B[i - 1], c_inv)
all_C_inv[i] = c_inv - 1. / (1. + np.trace(BC1)) * np.matmul(c_inv, BC1)
# update parameters
self.mean = new_means[-1]
self.C = cov_batch
stdev = np.sqrt(np.diag(cov_batch))
self.n += n_batch
if self.n > self.start_clip:
self.clip = [self.mean - self.std_clip * stdev, self.mean + self.std_clip * stdev]
# compute outlier scores
x_diff = proj_x - proj_means
outlier_score = np.matmul(x_diff[:, None, :], np.matmul(all_C_inv, x_diff[:, :, None])).reshape(n_batch)
return outlier_score
def predict(self,
X: np.ndarray,
return_instance_score: bool = True) \
-> Dict[Dict[str, str], Dict[np.ndarray, np.ndarray]]:
"""
Compute outlier scores and transform into outlier predictions.
Parameters
----------
X
Batch of instances.
return_instance_score
Whether to return instance level outlier scores.
Returns
-------
Dictionary containing ``'meta'`` and ``'data'`` dictionaries.
- ``'meta'`` has the model's metadata.
- ``'data'`` contains the outlier predictions and instance level outlier scores.
"""
# convert categorical variables to numerical values
X = self.cat2num(X)
# compute outlier scores
iscore = self.score(X)
# values above threshold are outliers
outlier_pred = (iscore > self.threshold).astype(int)
# populate output dict
od = outlier_prediction_dict()
od['meta'] = self.meta
od['data']['is_outlier'] = outlier_pred
if return_instance_score:
od['data']['instance_score'] = iscore
return od