11
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
- """TF.Transform analyzers."""
14
+ """Functions that involve a full pass over the dataset.
15
+
16
+ This module contains functions that are used in the preprocessing function, to
17
+ define a full pass operation such as computing the sum, min, max or unique
18
+ values of a tensor over the entire dataset. This is implemented by a reduction
19
+ operation in the Beam implementation.
20
+
21
+ From the user's point of view, an analyzer appears as a regular TensorFlow
22
+ function, i.e. it accepts and returns tensors. However it is represented in
23
+ the graph as a `Analyzer` which is not a TensorFlow op, but a placeholder for
24
+ the computation that takes place outside of TensorFlow.
25
+ """
15
26
16
27
from __future__ import absolute_import
17
28
from __future__ import division
18
29
from __future__ import print_function
19
30
20
31
import tensorflow as tf
21
- from tensorflow_transform import api
22
32
23
33
24
- def _get_output_shape (x , reduce_instance_dims ):
25
- """Determines the shape of the output of a numerical analyzer.
34
+ ANALYZER_COLLECTION = 'tft_analyzers'
35
+
36
+
37
+ class Analyzer (object ):
38
+ """An operation-like class for full-pass analyses of data.
39
+
40
+ An Analyzer is like a tf.Operation except that it requires computation over
41
+ the full dataset. E.g. sum(my_tensor) will compute the sum of the value of
42
+ my_tensor over all instances in the dataset. The Analyzer class contains the
43
+ inputs to this computation, and placeholders which will later be converted to
44
+ constants during a call to AnalyzeDataset.
26
45
27
46
Args:
28
- x: An input `Column' wrapping a `Tensor`.
29
- reduce_instance_dims: If true, collapses the batch and instance dimensions
30
- to arrive at a single scalar output. If False, only collapses the batch
31
- dimension and outputs a vector of the same shape as the output.
47
+ inputs: The inputs to the analyzer.
48
+ output_shapes_and_dtype: List of pairs of (shape, dtype) for each output.
49
+ spec: A description of the computation to be done.
32
50
33
- Returns :
34
- The shape to use for the output placeholder .
51
+ Raises :
52
+ ValueError: If the inputs are not all `Tensor`s .
35
53
"""
54
+
55
+ def __init__ (self , inputs , output_shapes_and_dtypes , spec ):
56
+ for tensor in inputs :
57
+ if not isinstance (tensor , tf .Tensor ):
58
+ raise ValueError ('Analyzers can only accept `Tensor`s as inputs' )
59
+ self ._inputs = inputs
60
+ self ._outputs = [tf .placeholder (shape , dtype )
61
+ for shape , dtype in output_shapes_and_dtypes ]
62
+ self ._spec = spec
63
+ tf .add_to_collection (ANALYZER_COLLECTION , self )
64
+
65
+ @property
66
+ def inputs (self ):
67
+ return self ._inputs
68
+
69
+ @property
70
+ def outputs (self ):
71
+ return self ._outputs
72
+
73
+ @property
74
+ def spec (self ):
75
+ return self ._spec
76
+
77
+
78
+ class NumericCombineSpec (object ):
79
+ """Operation to combine numeric values."""
80
+
81
+ MIN = 'min'
82
+ MAX = 'max'
83
+ SUM = 'sum'
84
+
85
+ def __init__ (self , dtype , combiner_type , reduce_instance_dims ):
86
+ self ._dtype = dtype
87
+ self ._combiner_type = combiner_type
88
+ self ._reduce_instance_dims = reduce_instance_dims
89
+
90
+ @property
91
+ def dtype (self ):
92
+ return self ._dtype
93
+
94
+ @property
95
+ def combiner_type (self ):
96
+ return self ._combiner_type
97
+
98
+ @property
99
+ def reduce_instance_dims (self ):
100
+ return self ._reduce_instance_dims
101
+
102
+
103
+ def _numeric_combine (x , combiner_type , reduce_instance_dims = True ):
104
+ """Apply an analyzer with NumericCombineSpec to given input."""
105
+ if not isinstance (x , tf .Tensor ):
106
+ raise TypeError ('Expected a Tensor, but got %r' % x )
107
+
36
108
if reduce_instance_dims :
37
- # Numerical analyzers produce scalar output by default
38
- return ()
109
+ # If reducing over all dimensions, result is scalar.
110
+ shape = ()
111
+ elif x .shape .dims is not None :
112
+ # If reducing over batch dimensions, with known shape, the result will be
113
+ # the same shape as the input, but without the batch.
114
+ shape = x .shape .as_list ()[1 :]
39
115
else :
40
- in_shape = x . tensor . shape
41
- if in_shape :
42
- # The output will be the same shape as the input, but without the batch.
43
- return in_shape . as_list ()[ 1 :]
44
- else :
45
- return None
116
+ # If reducing over batch dimensions, with unknown shape, the result will
117
+ # also have unknown shape.
118
+ shape = None
119
+ with tf . name_scope ( combiner_type ):
120
+ spec = NumericCombineSpec ( x . dtype , combiner_type , reduce_instance_dims )
121
+ return Analyzer ([ x ], [( x . dtype , shape )], spec ). outputs [ 0 ]
46
122
47
123
48
124
def min (x , reduce_instance_dims = True ): # pylint: disable=redefined-builtin
49
- """Computes the minimum of a `Column` .
125
+ """Computes the minimum of the values of a `Tensor` over the whole dataset .
50
126
51
127
Args:
52
- x: An input `Column' wrapping a `Tensor`.
128
+ x: A `Tensor`.
53
129
reduce_instance_dims: By default collapses the batch and instance dimensions
54
130
to arrive at a single scalar output. If False, only collapses the batch
55
- dimension and outputs a vector of the same shape as the output .
131
+ dimension and outputs a `Tensor` of the same shape as the input .
56
132
57
133
Returns:
58
- A `Statistic `.
134
+ A `Tensor `.
59
135
"""
60
- if not isinstance (x .tensor , tf .Tensor ):
61
- raise TypeError ('Expected a Tensor, but got %r' % x .tensor )
62
-
63
- arg_dict = {'reduce_instance_dims' : reduce_instance_dims }
64
-
65
- # pylint: disable=protected-access
66
- return api ._AnalyzerOutput (
67
- tf .placeholder (x .tensor .dtype , _get_output_shape (
68
- x , reduce_instance_dims )), api .CanonicalAnalyzers .MIN , [x ], arg_dict )
136
+ return _numeric_combine (x , NumericCombineSpec .MIN , reduce_instance_dims )
69
137
70
138
71
139
def max (x , reduce_instance_dims = True ): # pylint: disable=redefined-builtin
72
- """Computes the maximum of a `Column` .
140
+ """Computes the maximum of the values of a `Tensor` over the whole dataset .
73
141
74
142
Args:
75
- x: An input `Column' wrapping a `Tensor`.
143
+ x: A `Tensor`.
76
144
reduce_instance_dims: By default collapses the batch and instance dimensions
77
145
to arrive at a single scalar output. If False, only collapses the batch
78
146
dimension and outputs a vector of the same shape as the output.
79
147
80
148
Returns:
81
- A `Statistic `.
149
+ A `Tensor `.
82
150
"""
83
- if not isinstance (x .tensor , tf .Tensor ):
84
- raise TypeError ('Expected a Tensor, but got %r' % x .tensor )
85
-
86
- arg_dict = {'reduce_instance_dims' : reduce_instance_dims }
87
- # pylint: disable=protected-access
88
- return api ._AnalyzerOutput (
89
- tf .placeholder (x .tensor .dtype , _get_output_shape (
90
- x , reduce_instance_dims )), api .CanonicalAnalyzers .MAX , [x ], arg_dict )
151
+ return _numeric_combine (x , NumericCombineSpec .MAX , reduce_instance_dims )
91
152
92
153
93
154
def sum (x , reduce_instance_dims = True ): # pylint: disable=redefined-builtin
94
- """Computes the sum of a `Column` .
155
+ """Computes the sum of the values of a `Tensor` over the whole dataset .
95
156
96
157
Args:
97
- x: An input `Column' wrapping a `Tensor`.
158
+ x: A `Tensor`.
98
159
reduce_instance_dims: By default collapses the batch and instance dimensions
99
160
to arrive at a single scalar output. If False, only collapses the batch
100
161
dimension and outputs a vector of the same shape as the output.
101
162
102
163
Returns:
103
- A `Statistic `.
164
+ A `Tensor `.
104
165
"""
105
- if not isinstance (x .tensor , tf .Tensor ):
106
- raise TypeError ('Expected a Tensor, but got %r' % x .tensor )
107
-
108
- arg_dict = {'reduce_instance_dims' : reduce_instance_dims }
109
- # pylint: disable=protected-access
110
- return api ._AnalyzerOutput (
111
- tf .placeholder (x .tensor .dtype , _get_output_shape (
112
- x , reduce_instance_dims )), api .CanonicalAnalyzers .SUM , [x ], arg_dict )
166
+ return _numeric_combine (x , NumericCombineSpec .SUM , reduce_instance_dims )
113
167
114
168
115
169
def size (x , reduce_instance_dims = True ):
116
- """Computes the total size of instances in a `Column` .
170
+ """Computes the total size of instances in a `Tensor` over the whole dataset .
117
171
118
172
Args:
119
- x: An input `Column' wrapping a `Tensor`.
173
+ x: A `Tensor`.
120
174
reduce_instance_dims: By default collapses the batch and instance dimensions
121
175
to arrive at a single scalar output. If False, only collapses the batch
122
176
dimension and outputs a vector of the same shape as the output.
123
177
124
178
Returns:
125
- A `Statistic `.
179
+ A `Tensor `.
126
180
"""
127
- if not isinstance (x .tensor , tf .Tensor ):
128
- raise TypeError ('Expected a Tensor, but got %r' % x .tensor )
129
-
130
- # Note: Calling `sum` defined in this module, not the builtin.
131
- return sum (api .map (tf .ones_like , x ), reduce_instance_dims )
181
+ with tf .name_scope ('size' ):
182
+ # Note: Calling `sum` defined in this module, not the builtin.
183
+ return sum (tf .ones_like (x ), reduce_instance_dims )
132
184
133
185
134
186
def mean (x , reduce_instance_dims = True ):
135
- """Computes the mean of the values in a `Column` .
187
+ """Computes the mean of the values of a `Tensor` over the whole dataset .
136
188
137
189
Args:
138
- x: An input `Column' wrapping a `Tensor`.
190
+ x: A `Tensor`.
139
191
reduce_instance_dims: By default collapses the batch and instance dimensions
140
192
to arrive at a single scalar output. If False, only collapses the batch
141
193
dimension and outputs a vector of the same shape as the output.
142
194
143
195
Returns:
144
- A `Column` with an underlying ` Tensor` of shape [1], containing the mean.
196
+ A `Tensor` containing the mean.
145
197
"""
146
- if not isinstance (x .tensor , tf .Tensor ):
147
- raise TypeError ('Expected a Tensor, but got %r' % x .tensor )
198
+ with tf .name_scope ('mean' ):
199
+ # Note: Calling `sum` defined in this module, not the builtin.
200
+ return tf .divide (
201
+ sum (x , reduce_instance_dims ), size (x , reduce_instance_dims ))
202
+
203
+
204
+ class UniquesSpec (object ):
205
+ """Operation to compute unique values."""
206
+
207
+ def __init__ (self , dtype , top_k , frequency_threshold ):
208
+ self ._dtype = dtype
209
+ self ._top_k = top_k
210
+ self ._frequency_threshold = frequency_threshold
148
211
149
- # Note: Calling `sum` defined in this module, not the builtin.
150
- return api .map_statistics (tf .divide ,
151
- sum (x , reduce_instance_dims ),
152
- size (x , reduce_instance_dims ))
212
+ @property
213
+ def dtype (self ):
214
+ return self ._dtype
215
+
216
+ @property
217
+ def top_k (self ):
218
+ return self ._top_k
219
+
220
+ @property
221
+ def frequency_threshold (self ):
222
+ return self ._frequency_threshold
153
223
154
224
155
225
def uniques (x , top_k = None , frequency_threshold = None ):
156
- """Returns the unique values of the input tensor .
226
+ """Computes the unique values of a `Tensor` over the whole dataset .
157
227
158
- Computes The unique values taken by the input column `x`, which can be backed
159
- by a `Tensor` or ` SparseTensor` of any size. The unique values will be
160
- aggregated over all dimensions of `x` and all instances.
228
+ Computes The unique values taken by `x`, which can be a `Tensor` or
229
+ ` SparseTensor` of any size. The unique values will be aggregated over all
230
+ dimensions of `x` and all instances.
161
231
162
232
The unique values are sorted by decreasing frequency and then decreasing
163
233
value.
164
234
165
235
Args:
166
- x: An input `Column` wrapping a ` Tensor` or `SparseTensor`.
236
+ x: An input `Tensor` or `SparseTensor`.
167
237
top_k: Limit the generated vocabulary to the first `top_k` elements. If set
168
238
to None, the full vocabulary is generated.
169
239
frequency_threshold: Limit the generated vocabulary only to elements whose
170
240
frequency is >= to the supplied threshold. If set to None, the full
171
- vocabulary is generated.
241
+ vocabulary is generated
172
242
173
243
Returns:
174
244
The unique values of `x`.
@@ -184,15 +254,11 @@ def uniques(x, top_k=None, frequency_threshold=None):
184
254
if frequency_threshold is not None :
185
255
frequency_threshold = int (frequency_threshold )
186
256
if frequency_threshold < 0 :
187
- raise ValueError ('frequency_threshold must be non-negative, but got: %r' %
188
- frequency_threshold )
189
-
190
- if isinstance (x .tensor , tf .SparseTensor ):
191
- values = x .tensor .values
192
- else :
193
- values = x .tensor
194
- arg_dict = {'top_k' : top_k , 'frequency_threshold' : frequency_threshold }
195
- # Create output placeholder whose shape is a 1-d tensor of unkown size.
196
- # pylint: disable=protected-access
197
- return api ._AnalyzerOutput (tf .placeholder (values .dtype , (None ,)),
198
- api .CanonicalAnalyzers .UNIQUES , [x ], arg_dict )
257
+ raise ValueError (
258
+ 'frequency_threshold must be non-negative, but got: %r' %
259
+ frequency_threshold )
260
+ if isinstance (x , tf .SparseTensor ):
261
+ x = x .values
262
+ with tf .name_scope ('uniques' ):
263
+ spec = UniquesSpec (x .dtype , top_k , frequency_threshold )
264
+ return Analyzer ([x ], [(x .dtype , [None ])], spec ).outputs [0 ]
0 commit comments