Skip to content

Commit 4b45bfe

Browse files
tf-transform-teamzoyahav
authored andcommitted
Project import generated by Copybara.
PiperOrigin-RevId: 189351689
1 parent 681e1ef commit 4b45bfe

File tree

11 files changed

+116
-79
lines changed

11 files changed

+116
-79
lines changed

RELEASE.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44

55
## Bug Fixes and Other Changes
66
* Trim min/max value in `tft.bucketize where the computed number of bucket
7-
boundaries is more than requested. Updated documentation to
8-
clearly indicate that the number of buckets is computed using approximate
9-
algorithms, and that computed number can be more or less than requested.
7+
boundaries is more than requested. Updated documentation to clearly indicate
8+
that the number of buckets is computed using approximate algorithms, and that
9+
computed number can be more or less than requested.
1010
* Change the namespace used for Beam metrics from `tensorflow_transform` to
1111
`tfx.Transform`.
12-
* Export all required names from top level module, so only one import is needed
13-
in user code.
12+
* Update Beam metrics to also log vocabulary sizes.
1413

1514
## Breaking changes
1615

examples/census_example.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
from tensorflow.contrib import lookup
3333
from tensorflow.contrib.learn.python.learn.utils import input_fn_utils
3434

35+
from tensorflow_transform.beam import impl as beam_impl
36+
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
37+
from tensorflow_transform.coders import csv_coder
38+
from tensorflow_transform.coders import example_proto_coder
39+
from tensorflow_transform.saved import saved_transform_io
3540
from tensorflow_transform.tf_metadata import dataset_metadata
3641
from tensorflow_transform.tf_metadata import dataset_schema
3742
from tensorflow_transform.tf_metadata import metadata_io
@@ -128,7 +133,7 @@ def convert_label(label):
128133
# The "with" block will create a pipeline, and run that pipeline at the exit
129134
# of the block.
130135
with beam.Pipeline() as pipeline:
131-
with tft.Context(temp_dir=tempfile.mkdtemp()):
136+
with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
132137
# Create a coder to read the census data with the schema. To do this we
133138
# need to list all columns in order since the schema doesn't specify the
134139
# order of columns in the csv.
@@ -138,7 +143,7 @@ def convert_label(label):
138143
'capital-gain', 'capital-loss', 'hours-per-week', 'native-country',
139144
'label'
140145
]
141-
converter = tft.CsvCoder(ordered_columns, RAW_DATA_METADATA.schema)
146+
converter = csv_coder.CsvCoder(ordered_columns, RAW_DATA_METADATA.schema)
142147

143148
# Read in raw data and convert using CSV converter. Note that we apply
144149
# some Beam transformations here, which will not be encoded in the TF
@@ -159,12 +164,13 @@ def convert_label(label):
159164
# raw_data.
160165
raw_dataset = (raw_data, RAW_DATA_METADATA)
161166
transformed_dataset, transform_fn = (
162-
raw_dataset | tft.AnalyzeAndTransformDataset(preprocessing_fn))
167+
raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))
163168
transformed_data, transformed_metadata = transformed_dataset
164169

165170
_ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
166171
os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE),
167-
coder=tft.ExampleProtoCoder(transformed_metadata.schema))
172+
coder=example_proto_coder.ExampleProtoCoder(
173+
transformed_metadata.schema))
168174

169175
# Now apply transform function to test data. In this case we also remove
170176
# the header line from the CSV file and the trailing period at the end of
@@ -182,21 +188,22 @@ def convert_label(label):
182188
raw_test_dataset = (raw_test_data, RAW_DATA_METADATA)
183189

184190
transformed_test_dataset = (
185-
(raw_test_dataset, transform_fn) | tft.TransformDataset())
191+
(raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
186192
# Don't need transformed data schema, it's the same as before.
187193
transformed_test_data, _ = transformed_test_dataset
188194

189195
_ = transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
190196
os.path.join(working_dir, TRANSFORMED_TEST_DATA_FILEBASE),
191-
coder=tft.ExampleProtoCoder(transformed_metadata.schema))
197+
coder=example_proto_coder.ExampleProtoCoder(
198+
transformed_metadata.schema))
192199

193200
# Will write a SavedModel and metadata to two subdirectories of
194-
# working_dir, given by tft.TRANSFORM_FN_DIR and
195-
# tft.TRANSFORMED_METADATA_DIR respectively.
201+
# working_dir, given by transform_fn_io.TRANSFORM_FN_DIR and
202+
# transform_fn_io.TRANSFORMED_METADATA_DIR respectively.
196203
_ = (
197204
transform_fn
198205
| 'WriteTransformFn' >>
199-
tft.WriteTransformFn(working_dir))
206+
transform_fn_io.WriteTransformFn(working_dir))
200207

201208
# Functions for training
202209

@@ -214,7 +221,8 @@ def _make_training_input_fn(working_dir, filebase, batch_size):
214221
The input function for training or eval.
215222
"""
216223
transformed_metadata = metadata_io.read_metadata(
217-
os.path.join(working_dir, tft.TRANSFORMED_METADATA_DIR))
224+
os.path.join(
225+
working_dir, transform_fn_io.TRANSFORMED_METADATA_DIR))
218226
transformed_feature_spec = transformed_metadata.schema.as_feature_spec()
219227

220228
def input_fn():
@@ -257,8 +265,9 @@ def serving_input_fn():
257265
# Apply the transform function that was used to generate the materialized
258266
# data.
259267
_, transformed_features = (
260-
tft.partially_apply_saved_transform(
261-
os.path.join(working_dir, tft.TRANSFORM_FN_DIR), raw_features))
268+
saved_transform_io.partially_apply_saved_transform(
269+
os.path.join(working_dir, transform_fn_io.TRANSFORM_FN_DIR),
270+
raw_features))
262271

263272
return input_fn_utils.InputFnOps(transformed_features, None, default_inputs)
264273

examples/sentiment_example.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
from apache_beam.io import tfrecordio
3131
from tensorflow.contrib import learn
3232
from tensorflow.contrib.learn.python.learn.utils import input_fn_utils
33+
from tensorflow_transform.beam import impl as beam_impl
34+
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
35+
from tensorflow_transform.coders import example_proto_coder
36+
from tensorflow_transform.saved import saved_transform_io
3337
from tensorflow_transform.tf_metadata import dataset_metadata
3438
from tensorflow_transform.tf_metadata import dataset_schema
3539
from tensorflow_transform.tf_metadata import metadata_io
@@ -137,14 +141,16 @@ def read_and_shuffle_data(
137141
(train_neg_filepattern, train_pos_filepattern))
138142
| 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
139143
os.path.join(working_dir, SHUFFLED_TRAIN_DATA_FILEBASE),
140-
coder=tft.ExampleProtoCoder(RAW_DATA_METADATA.schema)))
144+
coder=example_proto_coder.ExampleProtoCoder(
145+
RAW_DATA_METADATA.schema)))
141146
_ = (
142147
pipeline
143148
| 'ReadAndShuffleTest' >> ReadAndShuffleData(
144149
(test_neg_filepattern, test_pos_filepattern))
145150
| 'WriteTestData' >> tfrecordio.WriteToTFRecord(
146151
os.path.join(working_dir, SHUFFLED_TEST_DATA_FILEBASE),
147-
coder=tft.ExampleProtoCoder(RAW_DATA_METADATA.schema)))
152+
coder=example_proto_coder.ExampleProtoCoder(
153+
RAW_DATA_METADATA.schema)))
148154
# pylint: enable=no-value-for-parameter
149155

150156

@@ -161,20 +167,22 @@ def transform_data(working_dir):
161167
"""
162168

163169
with beam.Pipeline() as pipeline:
164-
with tft.Context(temp_dir=tempfile.mkdtemp()):
170+
with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
165171
train_data = (
166172
pipeline |
167173
'ReadTrain' >> tfrecordio.ReadFromTFRecord(
168174
os.path.join(working_dir,
169175
SHUFFLED_TRAIN_DATA_FILEBASE + '*'),
170-
coder=tft.ExampleProtoCoder(RAW_DATA_METADATA.schema)))
176+
coder=example_proto_coder.ExampleProtoCoder(
177+
RAW_DATA_METADATA.schema)))
171178

172179
test_data = (
173180
pipeline |
174181
'ReadTest' >> tfrecordio.ReadFromTFRecord(
175182
os.path.join(working_dir,
176183
SHUFFLED_TEST_DATA_FILEBASE + '*'),
177-
coder=tft.ExampleProtoCoder(RAW_DATA_METADATA.schema)))
184+
coder=example_proto_coder.ExampleProtoCoder(
185+
RAW_DATA_METADATA.schema)))
178186

179187
def preprocessing_fn(inputs):
180188
"""Preprocess input columns into transformed columns."""
@@ -193,34 +201,36 @@ def preprocessing_fn(inputs):
193201

194202
(transformed_train_data, transformed_metadata), transform_fn = (
195203
(train_data, RAW_DATA_METADATA)
196-
| 'AnalyzeAndTransform' >> tft.AnalyzeAndTransformDataset(
204+
| 'AnalyzeAndTransform' >> beam_impl.AnalyzeAndTransformDataset(
197205
preprocessing_fn))
198206

199207
transformed_test_data, _ = (
200208
((test_data, RAW_DATA_METADATA), transform_fn)
201-
| 'Transform' >> tft.TransformDataset())
209+
| 'Transform' >> beam_impl.TransformDataset())
202210

203211
_ = (
204212
transformed_train_data
205213
| 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
206214
os.path.join(working_dir,
207215
TRANSFORMED_TRAIN_DATA_FILEBASE),
208-
coder=tft.ExampleProtoCoder(transformed_metadata.schema)))
216+
coder=example_proto_coder.ExampleProtoCoder(
217+
transformed_metadata.schema)))
209218

210219
_ = (
211220
transformed_test_data
212221
| 'WriteTestData' >> tfrecordio.WriteToTFRecord(
213222
os.path.join(working_dir,
214223
TRANSFORMED_TEST_DATA_FILEBASE),
215-
coder=tft.ExampleProtoCoder(transformed_metadata.schema)))
224+
coder=example_proto_coder.ExampleProtoCoder(
225+
transformed_metadata.schema)))
216226

217227
# Will write a SavedModel and metadata to two subdirectories of
218-
# working_dir, given by tft.TRANSFORM_FN_DIR and
219-
# tft.TRANSFORMED_METADATA_DIR respectively.
228+
# working_dir, given by transform_fn_io.TRANSFORM_FN_DIR and
229+
# transform_fn_io.TRANSFORMED_METADATA_DIR respectively.
220230
_ = (
221231
transform_fn
222232
| 'WriteTransformFn' >>
223-
tft.WriteTransformFn(working_dir))
233+
transform_fn_io.WriteTransformFn(working_dir))
224234

225235

226236
# Functions for training
@@ -239,7 +249,8 @@ def _make_training_input_fn(working_dir, filebase, batch_size):
239249
The input function for training or eval.
240250
"""
241251
transformed_metadata = metadata_io.read_metadata(
242-
os.path.join(working_dir, tft.TRANSFORMED_METADATA_DIR))
252+
os.path.join(
253+
working_dir, transform_fn_io.TRANSFORMED_METADATA_DIR))
243254
transformed_feature_spec = transformed_metadata.schema.as_feature_spec()
244255

245256
def input_fn():
@@ -282,8 +293,9 @@ def serving_input_fn():
282293
# Apply the transform function that was used to generate the materialized
283294
# data.
284295
_, transformed_features = (
285-
tft.partially_apply_saved_transform(
286-
os.path.join(working_dir, tft.TRANSFORM_FN_DIR), raw_features))
296+
saved_transform_io.partially_apply_saved_transform(
297+
os.path.join(working_dir, transform_fn_io.TRANSFORM_FN_DIR),
298+
raw_features))
287299

288300
return input_fn_utils.InputFnOps(transformed_features, None, default_inputs)
289301

examples/simple_example.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import tensorflow as tf
2525
import tensorflow_transform as tft
26+
import tensorflow_transform.beam.impl as beam_impl
2627
from tensorflow_transform.tf_metadata import dataset_metadata
2728
from tensorflow_transform.tf_metadata import dataset_schema
2829

@@ -59,9 +60,9 @@ def preprocessing_fn(inputs):
5960
tf.float32, [], dataset_schema.FixedColumnRepresentation())
6061
}))
6162

62-
with tft.Context(temp_dir=tempfile.mkdtemp()):
63+
with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
6364
transformed_dataset, transform_fn = ( # pylint: disable=unused-variable
64-
(raw_data, raw_data_metadata) | tft.AnalyzeAndTransformDataset(
65+
(raw_data, raw_data_metadata) | beam_impl.AnalyzeAndTransformDataset(
6566
preprocessing_fn))
6667

6768
transformed_data, transformed_metadata = transformed_dataset # pylint: disable=unused-variable

getting_started.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ raw_data = [
120120
121121
raw_data_metadata = ...
122122
transformed_dataset, transform_fn = (
123-
(raw_data, raw_data_metadata) | tft.AnalyzeAndTransformDataset(
123+
(raw_data, raw_data_metadata) | beam_impl.AnalyzeAndTransformDataset(
124124
preprocessing_fn))
125125
transformed_data, transformed_metadata = transformed_dataset
126126
```
@@ -278,7 +278,7 @@ between reading the lines of the CSV file, and applying the converter that
278278
converts each CSV row to an instance in the in-memory format.
279279

280280
```
281-
converter = tft.CsvCoder(ordered_columns, raw_data_schema)
281+
converter = csv_coder.CsvCoder(ordered_columns, raw_data_schema)
282282
283283
raw_data = (
284284
p
@@ -351,13 +351,13 @@ shards that are written.
351351
```
352352
transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
353353
transformed_eval_data_base,
354-
coder=tft.ExampleProtoCoder(transformed_metadata))
354+
coder=example_proto_coder.ExampleProtoCoder(transformed_metadata))
355355
```
356356

357357
In addition to the training data, we also write out the metadata.
358358

359359
```
360-
transformed_metadata | 'WriteMetadata' >> tft.WriteMetadata(
360+
transformed_metadata | 'WriteMetadata' >> beam_metadata_io.WriteMetadata(
361361
transformed_metadata_file, pipeline=p)
362362
```
363363

tensorflow_transform/__init__.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,6 @@
1616
# pylint: disable=wildcard-import
1717
from tensorflow_transform.analyzers import *
1818
from tensorflow_transform.api import apply_function
19-
from tensorflow_transform.beam.impl import AnalyzeAndTransformDataset
20-
from tensorflow_transform.beam.impl import AnalyzeDataset
21-
from tensorflow_transform.beam.impl import Context
22-
from tensorflow_transform.beam.impl import TransformDataset
23-
from tensorflow_transform.beam.tft_beam_io.transform_fn_io import ReadTransformFn
24-
from tensorflow_transform.beam.tft_beam_io.transform_fn_io import TRANSFORM_FN_DIR
25-
from tensorflow_transform.beam.tft_beam_io.transform_fn_io import TRANSFORMED_METADATA_DIR
26-
from tensorflow_transform.beam.tft_beam_io.transform_fn_io import WriteTransformFn
27-
from tensorflow_transform.coders.csv_coder import CsvCoder
28-
from tensorflow_transform.coders.example_proto_coder import ExampleProtoCoder
2919
from tensorflow_transform.mappers import *
3020
from tensorflow_transform.pretrained_models import *
31-
from tensorflow_transform.saved.saved_transform_io import apply_saved_transform
32-
from tensorflow_transform.saved.saved_transform_io import partially_apply_saved_transform
3321
# pylint: enable=wildcard-import

tensorflow_transform/analyzers.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from __future__ import division
2929
from __future__ import print_function
3030

31+
import collections
3132
import re
3233

3334
import numpy as np
@@ -38,7 +39,19 @@
3839
VOCAB_FILENAME_PREFIX = 'vocab_'
3940
VOCAB_FREQUENCY_FILENAME_PREFIX = 'vocab_frequency_'
4041

42+
# Named tuple with details for each output of an Analyzer.
43+
_AnalyzerOutputInfo = collections.namedtuple(
44+
'AnalyzerOutputInfo', ['name', 'dtype', 'is_asset'])
4145

46+
47+
# NOTE: this code is designed so that Analyzer is pickleable, and in particular
48+
# does not try to pickle a tf.Graph or tf.Tensor which may not be pickleable.
49+
# This is due to https://issues.apache.org/jira/browse/BEAM-3812. Until that
50+
# issue is fixed, anything that is a member variable of a Beam PTransform may
51+
# end up getting pickled. Instances of Analyzer do end up as member variables
52+
# of a PTransform in our implementation of tf.Transform on Beam currently, so
53+
# we must avoid directly putting `Tensor`s inside `Analyzer`, and instead use
54+
# tensor names.
4255
class Analyzer(object):
4356
"""An operation-like class for full-pass analyses of data.
4457
@@ -68,28 +81,37 @@ def __init__(self, inputs, output_dtype_shape_and_is_asset, spec, name):
6881
for tensor in inputs:
6982
if not isinstance(tensor, tf.Tensor):
7083
raise ValueError('Analyzers can only accept `Tensor`s as inputs')
71-
self._inputs = inputs
72-
self._outputs = []
73-
self._output_is_asset_map = {}
84+
self._input_tensor_names = [tensor.name for tensor in inputs]
85+
self._output_infos = []
7486
with tf.name_scope(name) as scope:
7587
self._name = scope
7688
for dtype, shape, is_asset in output_dtype_shape_and_is_asset:
7789
output_tensor = tf.placeholder(dtype, shape)
7890
if is_asset and output_tensor.dtype != tf.string:
7991
raise ValueError(('Tensor {} cannot represent an asset, because it '
8092
'is not a string.').format(output_tensor.name))
81-
self._outputs.append(output_tensor)
82-
self._output_is_asset_map[output_tensor] = is_asset
93+
self._output_infos.append(_AnalyzerOutputInfo(
94+
output_tensor.name, output_tensor.dtype, is_asset))
8395
self._spec = spec
8496
tf.add_to_collection(ANALYZER_COLLECTION, self)
8597

98+
@property
99+
def input_tensor_names(self):
100+
return self._input_tensor_names
101+
102+
@property
103+
def output_infos(self):
104+
return self._output_infos
105+
86106
@property
87107
def inputs(self):
88-
return self._inputs
108+
return [tf.get_default_graph().get_tensor_by_name(name)
109+
for name in self._input_tensor_names]
89110

90111
@property
91112
def outputs(self):
92-
return self._outputs
113+
return [tf.get_default_graph().get_tensor_by_name(output_info.name)
114+
for output_info in self._output_infos]
93115

94116
@property
95117
def spec(self):
@@ -99,9 +121,6 @@ def spec(self):
99121
def name(self):
100122
return self._name
101123

102-
def output_is_asset(self, output_tensor):
103-
return self._output_is_asset_map[output_tensor]
104-
105124

106125
class CombinerSpec(object):
107126
"""Analyze using combiner function.

0 commit comments

Comments
 (0)