Skip to content

Commit

Permalink
Update samples/core/tfx-oss to tfx==0.14.0 and kfp=0.1.31 (#2385)
Browse files Browse the repository at this point in the history
* minor fixes to tfx-oss README

* update location of tfx/examples

* update location of tfx/examples
  • Loading branch information
ucdmkt authored and k8s-ci-robot committed Oct 16, 2019
1 parent 09171d7 commit 758ef3a
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 43 deletions.
8 changes: 4 additions & 4 deletions samples/core/tfx-oss/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ then activate the environment.

Install TFX and Kubeflow Pipelines SDK
```
pip3 install tfx==0.13.0 --upgrade
pip3 install kfp --upgrade
pip3 install 'tfx==0.14.0' --upgrade
pip3 install 'kfp>=0.1.31' --upgrade
```

Upload the utility code to your storage bucket. You can modify this code if needed for a different dataset.
Expand All @@ -37,11 +37,11 @@ gfile.Copy('utils/taxi_utils.py', 'gs://<my bucket>/<path>/taxi_utils.py')

## Configure the TFX Pipeline

Modify the pipeline configurations at
Modify the pipeline configurations at
```
TFX Example.ipynb
```
Configure
Configure
- Set `_input_bucket` to the GCS directory where you've copied taxi_utils.py. I.e. gs://<my bucket>/<path>/
- Set `_output_bucket` to the GCS directory where you've want the results to be written
- Set GCP project ID (replace my-gcp-project). Note that it should be project ID, not project name.
Expand Down
4 changes: 2 additions & 2 deletions samples/core/tfx-oss/TFX Example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"metadata": {},
"outputs": [],
"source": [
"!pip3 install tfx==0.13.0 --upgrade\n",
"!pip3 install 'tfx==0.14.0' --upgrade\n",
"!python3 -m pip install 'kfp>=0.1.31' --quiet\n"
]
},
Expand Down Expand Up @@ -370,4 +370,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}
64 changes: 27 additions & 37 deletions samples/core/tfx-oss/utils/taxi_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,9 @@
from __future__ import division
from __future__ import print_function

import os

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.saved import saved_transform_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.tf_metadata import schema_utils

# Categorical features are assumed to each have a maximum value in the dataset.
Expand Down Expand Up @@ -81,11 +76,11 @@ def _get_raw_feature_spec(schema):
return schema_utils.schema_as_feature_spec(schema).feature_spec


def _gzip_reader_fn():
def _gzip_reader_fn(filenames):
"""Small utility returning a record reader that can read gzip'ed files."""
return tf.TFRecordReader(
options=tf.python_io.TFRecordOptions(
compression_type=tf.python_io.TFRecordCompressionType.GZIP))
return tf.data.TFRecordDataset(
filenames,
compression_type='GZIP')


def _fill_in_missing(x):
Expand Down Expand Up @@ -132,7 +127,8 @@ def preprocessing_fn(inputs):

for key in _BUCKET_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.bucketize(
_fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT)
_fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT,
always_return_num_quantiles=False)

for key in _CATEGORICAL_FEATURE_KEYS:
outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])
Expand All @@ -154,7 +150,7 @@ def _build_estimator(config, hidden_units=None, warm_start_from=None):
"""Build an estimator for predicting the tipping behavior of taxi riders.
Args:
config: tf.contrib.learn.RunConfig defining the runtime environment for the
config: tf.estimator.RunConfig defining the runtime environment for the
estimator (including model_dir).
hidden_units: [int], the layer sizes of the DNN (input layer first)
warm_start_from: Optional directory to warm start from.
Expand Down Expand Up @@ -196,12 +192,11 @@ def _build_estimator(config, hidden_units=None, warm_start_from=None):
warm_start_from=warm_start_from)


def _example_serving_receiver_fn(transform_output, schema):
def _example_serving_receiver_fn(tf_transform_output, schema):
"""Build the serving in inputs.
Args:
transform_output: directory in which the tf-transform model was written
during the preprocessing step.
tf_transform_output: A TFTransformOutput.
schema: the schema of the input data.
Returns:
Expand All @@ -214,21 +209,18 @@ def _example_serving_receiver_fn(transform_output, schema):
raw_feature_spec, default_batch_size=None)
serving_input_receiver = raw_input_fn()

_, transformed_features = (
saved_transform_io.partially_apply_saved_transform(
os.path.join(transform_output, transform_fn_io.TRANSFORM_FN_DIR),
serving_input_receiver.features))
transformed_features = tf_transform_output.transform_raw_features(
serving_input_receiver.features)

return tf.estimator.export.ServingInputReceiver(
transformed_features, serving_input_receiver.receiver_tensors)


def _eval_input_receiver_fn(transform_output, schema):
def _eval_input_receiver_fn(tf_transform_output, schema):
"""Build everything needed for the tf-model-analysis to run the model.
Args:
transform_output: directory in which the tf-transform model was written
during the preprocessing step.
tf_transform_output: A TFTransformOutput.
schema: the schema of the input data.
Returns:
Expand All @@ -250,10 +242,8 @@ def _eval_input_receiver_fn(transform_output, schema):

# Now that we have our raw examples, process them through the tf-transform
# function computed during the preprocessing step.
_, transformed_features = (
saved_transform_io.partially_apply_saved_transform(
os.path.join(transform_output, transform_fn_io.TRANSFORM_FN_DIR),
features))
transformed_features = tf_transform_output.transform_raw_features(
features)

# The key name MUST be 'examples'.
receiver_tensors = {'examples': serialized_tf_example}
Expand All @@ -268,27 +258,25 @@ def _eval_input_receiver_fn(transform_output, schema):
labels=transformed_features[_transformed_name(_LABEL_KEY)])


def _input_fn(filenames, transform_output, batch_size=200):
def _input_fn(filenames, tf_transform_output, batch_size=200):
"""Generates features and labels for training or evaluation.
Args:
filenames: [str] list of CSV files to read data from.
transform_output: directory in which the tf-transform model was written
during the preprocessing step.
tf_transform_output: A TFTransformOutput.
batch_size: int First dimension size of the Tensors returned by input_fn
Returns:
A (features, indices) tuple where features is a dictionary of
Tensors, and indices is a single Tensor of label indices.
"""
metadata_dir = os.path.join(transform_output,
transform_fn_io.TRANSFORMED_METADATA_DIR)
transformed_metadata = metadata_io.read_metadata(metadata_dir)
transformed_feature_spec = transformed_metadata.schema.as_feature_spec()
transformed_feature_spec = (
tf_transform_output.transformed_feature_spec().copy())

transformed_features = tf.contrib.learn.io.read_batch_features(
dataset = tf.data.experimental.make_batched_features_dataset(
filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)

transformed_features = dataset.make_one_shot_iterator().get_next()
# We pop the label because we do not want to use it as a feature while we're
# training.
return transformed_features, transformed_features.pop(
Expand Down Expand Up @@ -318,22 +306,24 @@ def trainer_fn(hparams, schema):
train_batch_size = 40
eval_batch_size = 40

tf_transform_output = tft.TFTransformOutput(hparams.transform_output)

train_input_fn = lambda: _input_fn( # pylint: disable=g-long-lambda
hparams.train_files,
hparams.transform_output,
tf_transform_output,
batch_size=train_batch_size)

eval_input_fn = lambda: _input_fn( # pylint: disable=g-long-lambda
hparams.eval_files,
hparams.transform_output,
tf_transform_output,
batch_size=eval_batch_size)

train_spec = tf.estimator.TrainSpec( # pylint: disable=g-long-lambda
train_input_fn,
max_steps=hparams.train_steps)

serving_receiver_fn = lambda: _example_serving_receiver_fn( # pylint: disable=g-long-lambda
hparams.transform_output, schema)
tf_transform_output, schema)

exporter = tf.estimator.FinalExporter('chicago-taxi', serving_receiver_fn)
eval_spec = tf.estimator.EvalSpec(
Expand All @@ -358,7 +348,7 @@ def trainer_fn(hparams, schema):

# Create an input receiver for TFMA processing
receiver_fn = lambda: _eval_input_receiver_fn( # pylint: disable=g-long-lambda
hparams.transform_output, schema)
tf_transform_output, schema)

return {
'estimator': estimator,
Expand Down

0 comments on commit 758ef3a

Please sign in to comment.