-
Notifications
You must be signed in to change notification settings - Fork 289
/
data_transformer.py
227 lines (180 loc) · 9.31 KB
/
data_transformer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from collections import namedtuple
import numpy as np
import pandas as pd
from rdt.transformers import OneHotEncodingTransformer
from sklearn.mixture import BayesianGaussianMixture
SpanInfo = namedtuple("SpanInfo", ["dim", "activation_fn"])
ColumnTransformInfo = namedtuple(
"ColumnTransformInfo", ["column_name", "column_type",
"transform", "transform_aux",
"output_info", "output_dimensions"])
class DataTransformer(object):
"""Data Transformer.
Model continuous columns with a BayesianGMM and normalized to a scalar [0, 1] and a vector.
Discrete columns are encoded using a scikit-learn OneHotEncoder.
"""
def __init__(self, max_clusters=10, weight_threshold=0.005):
"""Create a data transformer.
Args:
max_clusters (int):
Maximum number of Gaussian distributions in Bayesian GMM.
weight_threshold (float):
Weight threshold for a Gaussian distribution to be kept.
"""
self._max_clusters = max_clusters
self._weight_threshold = weight_threshold
def _fit_continuous(self, column_name, raw_column_data):
"""Train Bayesian GMM for continuous column."""
gm = BayesianGaussianMixture(
self._max_clusters,
weight_concentration_prior_type='dirichlet_process',
weight_concentration_prior=0.001,
n_init=1
)
gm.fit(raw_column_data.reshape(-1, 1))
valid_component_indicator = gm.weights_ > self._weight_threshold
num_components = valid_component_indicator.sum()
return ColumnTransformInfo(
column_name=column_name, column_type="continuous", transform=gm,
transform_aux=valid_component_indicator,
output_info=[SpanInfo(1, 'tanh'), SpanInfo(num_components, 'softmax')],
output_dimensions=1 + num_components)
def _fit_discrete(self, column_name, raw_column_data):
"""Fit one hot encoder for discrete column."""
ohe = OneHotEncodingTransformer()
ohe.fit(raw_column_data)
num_categories = len(ohe.dummies)
return ColumnTransformInfo(
column_name=column_name, column_type="discrete", transform=ohe,
transform_aux=None,
output_info=[SpanInfo(num_categories, 'softmax')],
output_dimensions=num_categories)
def fit(self, raw_data, discrete_columns=tuple()):
"""Fit GMM for continuous columns and One hot encoder for discrete columns.
This step also counts the #columns in matrix data, and span information.
"""
self.output_info_list = []
self.output_dimensions = 0
if not isinstance(raw_data, pd.DataFrame):
self.dataframe = False
raw_data = pd.DataFrame(raw_data)
else:
self.dataframe = True
self._column_raw_dtypes = raw_data.infer_objects().dtypes
self._column_transform_info_list = []
for column_name in raw_data.columns:
raw_column_data = raw_data[column_name].values
if column_name in discrete_columns:
column_transform_info = self._fit_discrete(
column_name, raw_column_data)
else:
column_transform_info = self._fit_continuous(
column_name, raw_column_data)
self.output_info_list.append(column_transform_info.output_info)
self.output_dimensions += column_transform_info.output_dimensions
self._column_transform_info_list.append(column_transform_info)
def _transform_continuous(self, column_transform_info, raw_column_data):
gm = column_transform_info.transform
valid_component_indicator = column_transform_info.transform_aux
num_components = valid_component_indicator.sum()
means = gm.means_.reshape((1, self._max_clusters))
stds = np.sqrt(gm.covariances_).reshape((1, self._max_clusters))
normalized_values = ((raw_column_data - means) / (4 * stds))[:, valid_component_indicator]
component_probs = gm.predict_proba(raw_column_data)[:, valid_component_indicator]
selected_component = np.zeros(len(raw_column_data), dtype='int')
for i in range(len(raw_column_data)):
component_porb_t = component_probs[i] + 1e-6
component_porb_t = component_porb_t / component_porb_t.sum()
selected_component[i] = np.random.choice(
np.arange(num_components), p=component_porb_t)
selected_normalized_value = normalized_values[
np.arange(len(raw_column_data)), selected_component].reshape([-1, 1])
selected_normalized_value = np.clip(selected_normalized_value, -.99, .99)
selected_component_onehot = np.zeros_like(component_probs)
selected_component_onehot[np.arange(len(raw_column_data)), selected_component] = 1
return [selected_normalized_value, selected_component_onehot]
def _transform_discrete(self, column_transform_info, raw_column_data):
ohe = column_transform_info.transform
return [ohe.transform(raw_column_data)]
def transform(self, raw_data):
"""Take raw data and output a matrix data."""
if not isinstance(raw_data, pd.DataFrame):
raw_data = pd.DataFrame(raw_data)
column_data_list = []
for column_transform_info in self._column_transform_info_list:
column_data = raw_data[[column_transform_info.column_name]].values
if column_transform_info.column_type == "continuous":
column_data_list += self._transform_continuous(
column_transform_info, column_data)
else:
assert column_transform_info.column_type == "discrete"
column_data_list += self._transform_discrete(
column_transform_info, column_data)
return np.concatenate(column_data_list, axis=1).astype(float)
def _inverse_transform_continuous(self, column_transform_info, column_data, sigmas, st):
gm = column_transform_info.transform
valid_component_indicator = column_transform_info.transform_aux
selected_normalized_value = column_data[:, 0]
selected_component_probs = column_data[:, 1:]
if sigmas is not None:
sig = sigmas[st]
selected_normalized_value = np.random.normal(selected_normalized_value, sig)
selected_normalized_value = np.clip(selected_normalized_value, -1, 1)
component_probs = np.ones((len(column_data), self._max_clusters)) * -100
component_probs[:, valid_component_indicator] = selected_component_probs
means = gm.means_.reshape([-1])
stds = np.sqrt(gm.covariances_).reshape([-1])
selected_component = np.argmax(component_probs, axis=1)
std_t = stds[selected_component]
mean_t = means[selected_component]
column = selected_normalized_value * 4 * std_t + mean_t
return column
def _inverse_transform_discrete(self, column_transform_info, column_data):
ohe = column_transform_info.transform
return ohe.reverse_transform(column_data)
def inverse_transform(self, data, sigmas=None):
"""Take matrix data and output raw data.
Output uses the same type as input to the transform function.
Either np array or pd dataframe.
"""
st = 0
recovered_column_data_list = []
column_names = []
for column_transform_info in self._column_transform_info_list:
dim = column_transform_info.output_dimensions
column_data = data[:, st:st + dim]
if column_transform_info.column_type == 'continuous':
recovered_column_data = self._inverse_transform_continuous(
column_transform_info, column_data, sigmas, st)
else:
assert column_transform_info.column_type == 'discrete'
recovered_column_data = self._inverse_transform_discrete(
column_transform_info, column_data)
recovered_column_data_list.append(recovered_column_data)
column_names.append(column_transform_info.column_name)
st += dim
recovered_data = np.column_stack(recovered_column_data_list)
recovered_data = (pd.DataFrame(recovered_data, columns=column_names)
.astype(self._column_raw_dtypes))
if not self.dataframe:
recovered_data = recovered_data.values
return recovered_data
def convert_column_name_value_to_id(self, column_name, value):
discrete_counter = 0
column_id = 0
for column_transform_info in self._column_transform_info_list:
if column_transform_info.column_name == column_name:
break
if column_transform_info.column_type == "discrete":
discrete_counter += 1
column_id += 1
else:
raise ValueError(f"The column_name `{column_name}` doesn't exist in the data.")
one_hot = column_transform_info.transform.transform(np.array([value]))[0]
if sum(one_hot) == 0:
raise ValueError(f"The value `{value}` doesn't exist in the column `{column_name}`.")
return {
"discrete_column_id": discrete_counter,
"column_id": column_id,
"value_id": np.argmax(one_hot)
}