Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Inception Package Improvements #138

Merged
merged 4 commits into from
Jan 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions datalab/mlalpha/_cloud_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,16 @@ def get(self, version_name):
def _wait_for_long_running_operation(self, response):
if 'name' not in response:
raise Exception('Invaid response from service. Cannot find "name" field.')
print('Waiting for job "%s"' % response['name'])
while True:
response = self._api.projects().operations().get(name=response['name']).execute()
if 'done' not in response or response['done'] != True:
time.sleep(3)
else:
if 'error' in response:
print response['error']
print(response['error'])
else:
print('Done.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's done? Maybe print "job xxxx finished"?

Your call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is "Waiting for job xxxxxx" right above it, so it should be clear.

break

def deploy(self, version_name, path):
Expand All @@ -173,7 +176,10 @@ def deploy(self, version_name, path):
if not path.startswith('gs://'):
raise Exception('Invalid path. Only Google Cloud Storage path (gs://...) is accepted.')
if not datalab.storage.Item.from_url(os.path.join(path, 'export.meta')).exists():
raise Exception('Cannot find export.meta from given path.')
# try appending '/model' sub dir.
path = os.path.join(path, 'model')
if not datalab.storage.Item.from_url(os.path.join(path, 'export.meta')).exists():
raise Exception('Cannot find export.meta from given path.')

body = {'name': self._model_name}
parent = 'projects/' + self._project_id
Expand Down
20 changes: 10 additions & 10 deletions solutionbox/inception/datalab_solutions/inception/_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, checkpoint=None):
if self._checkpoint is None:
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL

def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None):
def preprocess(self, input_csvs, output_dir, pipeline_option=None):
"""Cloud preprocessing with Cloud DataFlow."""

job_name = 'preprocess-inception-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
Expand All @@ -60,22 +60,19 @@ def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None):

opts = beam.pipeline.PipelineOptions(flags=[], **options)
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
_preprocess.configure_pipeline(
p, self._checkpoint, input_csvs, labels_file, output_dir, job_name)
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_name)
p.run()

def train(self, labels_file, input_dir, batch_size, max_steps, output_path,
def train(self, input_dir, batch_size, max_steps, output_path,
region, scale_tier):
"""Cloud training with CloudML trainer service."""

import datalab.mlalpha as mlalpha
num_classes = len(_util.get_labels(labels_file))
job_args = {
'input_dir': input_dir,
'output_path': output_path,
'max_steps': max_steps,
'batch_size': batch_size,
'num_classes': num_classes,
'checkpoint': self._checkpoint
}
job_request = {
Expand All @@ -89,16 +86,14 @@ def train(self, labels_file, input_dir, batch_size, max_steps, output_path,
job_id = 'inception_train_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
return cloud_runner.run(job_id)

def predict(self, model_id, image_files, labels_file):
def predict(self, model_id, image_files):
"""Cloud prediction with CloudML prediction service."""

import datalab.mlalpha as mlalpha
parts = model_id.split('.')
if len(parts) != 2:
raise Exception('Invalid model name for cloud prediction. Use "model.version".')

labels = _util.get_labels(labels_file)
labels.append('UNKNOWN')
data = []
for ii, img_file in enumerate(image_files):
with ml.util._file.open_local_or_gcs(img_file, 'rb') as f:
Expand All @@ -110,6 +105,11 @@ def predict(self, model_id, image_files, labels_file):

cloud_predictor = mlalpha.CloudPredictor(parts[0], parts[1])
predictions = cloud_predictor.predict(data)
labels_and_scores = [(labels[x['prediction']], x['scores'][x['prediction']])
if len(predictions) == 0:
raise Exception('Prediction results are empty.')
# Although prediction results contains a labels list in each instance, they are all the same
# so taking the first one.
labels = predictions[0]['labels']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could cloud_predictor.predict return an empty list? If so, predictions[0]['labels'] is not safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

labels_and_scores = [(x['prediction'], x['scores'][labels.index(x['prediction'])])
for x in predictions]
return labels_and_scores
19 changes: 9 additions & 10 deletions solutionbox/inception/datalab_solutions/inception/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,31 @@ def __init__(self, checkpoint=None):
if self._checkpoint is None:
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL

def preprocess(self, input_csvs, labels_file, output_dir):
def preprocess(self, input_csvs, output_dir):
"""Local preprocessing with local DataFlow."""

job_id = 'inception_preprocessed_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
p = beam.Pipeline('DirectPipelineRunner')
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, labels_file,
output_dir, job_id)
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_id)
p.run()

def train(self, labels_file, input_dir, batch_size, max_steps, output_dir):
def train(self, input_dir, batch_size, max_steps, output_dir):
"""Local training."""

num_classes = len(_util.get_labels(labels_file))
model = _model.Model(num_classes, 0.5, self._checkpoint)
labels = _util.get_labels(input_dir)
model = _model.Model(labels, 0.5, self._checkpoint)
task_data = {'type': 'master', 'index': 0}
task = type('TaskSpec', (object,), task_data)
_trainer.Trainer(input_dir, batch_size, max_steps, output_dir,
model, None, task).run_training()

def predict(self, model_dir, image_files, labels_file):
def predict(self, model_dir, image_files):
"""Local prediction."""

return _predictor.predict(model_dir, image_files, labels_file)
return _predictor.predict(model_dir, image_files)


def batch_predict(self, model_dir, input_csv, labels_file, output_file, output_bq_table):
def batch_predict(self, model_dir, input_csv, output_file, output_bq_table):
"""Local batch prediction."""

return _predictor.batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table)
return _predictor.batch_predict(model_dir, input_csv, output_file, output_bq_table)
32 changes: 24 additions & 8 deletions solutionbox/inception/datalab_solutions/inception/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ def __init__(self):
class Model(object):
"""TensorFlow model for the flowers problem."""

def __init__(self, label_count, dropout, inception_checkpoint_file):
self.label_count = label_count
def __init__(self, labels, dropout, inception_checkpoint_file):
self.labels = labels
self.labels.sort()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should do this. What if the labels were
x1
x2
x3.
...
x20

These labels are perfectly 'sorted', but when you call sort, it is going to mess up the order ( x20 < x3 )

self.dropout = dropout
self.inception_checkpoint_file = inception_checkpoint_file

Expand Down Expand Up @@ -203,7 +204,7 @@ def build_graph(self, data_paths, batch_size, graph_mod):
'label':
tf.FixedLenFeature(
shape=[1], dtype=tf.int64,
default_value=[self.label_count]),
default_value=[len(self.labels)]),
'embedding':
tf.FixedLenFeature(
shape=[BOTTLENECK_TENSOR_SIZE], dtype=tf.float32)
Expand All @@ -215,7 +216,7 @@ def build_graph(self, data_paths, batch_size, graph_mod):

# We assume a default label, so the total number of labels is equal to
# label_count+1.
all_labels_count = self.label_count + 1
all_labels_count = len(self.labels) + 1
with tf.name_scope('final_ops'):
softmax, logits = self.add_final_training_ops(
embeddings,
Expand Down Expand Up @@ -316,12 +317,28 @@ def build_prediction_graph(self):

# To extract the id, we need to add the identity function.
keys = tf.identity(keys_placeholder)
labels = self.labels + ['UNKNOWN']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to check that "UNKNOWN" is not already used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard. The model assumes there is a default label, so we have to add a default label to match the number. If 'UNKNOWN' already exists, there will be two 'UNKNOWN's, which seems fine? Any meaning label we choose may exist already in the list.

predicted_label = tf.contrib.lookup.index_to_string(tensors.predictions[0],
mapping=labels)
# Need to duplicate the labels by num_of_instances so the output is one batch
# (all output members share the same outer dimension).
# The labels are needed for client to match class scores list.
labels_tensor = tf.expand_dims(tf.constant(labels), 0)
num_instance = tf.shape(keys)
labels_tensors_n = tf.tile(labels_tensor, tf.concat(0, [num_instance, [1]]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!


outputs = {
'key': keys.name,
'prediction': tensors.predictions[0].name,
'scores': tensors.predictions[1].name
'prediction': predicted_label.name,
'labels': labels_tensors_n.name,
'scores': tensors.predictions[1].name,
}
tf.add_to_collection('outputs', json.dumps(outputs))
# Add table init op to collection so online prediction will load the model and run it.
# TODO: initialize_all_tables is going to be deprecated but the replacement
# tf.tables_initializer does not exist in 0.12 yet.
init_tables_op = tf.initialize_all_tables()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know what this function did, so I looked it up and found that it is deprecated. See https://github.com/tensorflow/tensorflow/blob/master/tensorflow/g3doc/api_docs/python/functions_and_classes/shard6/tf.initialize_all_tables.md

Copy link
Contributor Author

@qimingj qimingj Jan 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The replacement is not in tf 0.12 yet. I added a TODO. This function is needed if you have tf.contrib.lookup.index_to_string.

tf.add_to_collection(tf.contrib.session_bundle.constants.INIT_OP_KEY, init_tables_op)

def export(self, last_checkpoint, output_dir):
"""Builds a prediction graph and xports the model.
Expand All @@ -340,8 +357,7 @@ def export(self, last_checkpoint, output_dir):
last_checkpoint)
saver = tf.train.Saver()
saver.export_meta_graph(filename=os.path.join(output_dir, 'export.meta'))
saver.save(
sess, os.path.join(output_dir, 'export'), write_meta_graph=False)
saver.save(sess, os.path.join(output_dir, 'export'), write_meta_graph=False)

def format_metric_values(self, metric_values):
"""Formats metric values - used for logging purpose."""
Expand Down
57 changes: 23 additions & 34 deletions solutionbox/inception/datalab_solutions/inception/_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Datalab will look for functions with the above names.
"""

import google.cloud.ml as ml
import logging
import os
import urllib
Expand All @@ -35,14 +36,12 @@
from . import _util


def local_preprocess(input_csvs, labels_file, output_dir, checkpoint=None):
def local_preprocess(input_csvs, output_dir, checkpoint=None):
"""Preprocess data locally. Produce output that can be used by training efficiently.
Args:
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
Preprocessing will concatenate the data inside all files and split them into
train/eval dataset. Can be local or GCS path.
labels_file: The path to the labels file which lists all labels, each in a separate line.
It can be a local or a GCS path.
output_dir: The output directory to use. Preprocessing will create a sub directory under
it for each run, and also update "latest" file which points to the latest preprocessed
directory. Users are responsible for cleanup. Can be local or GCS path.
Expand All @@ -52,19 +51,17 @@ def local_preprocess(input_csvs, labels_file, output_dir, checkpoint=None):
print 'Local preprocessing...'
# TODO: Move this to a new process to avoid pickling issues
# TODO: Expose train/eval split ratio
_local.Local(checkpoint).preprocess(input_csvs, labels_file, output_dir)
_local.Local(checkpoint).preprocess(input_csvs, output_dir)
print 'Done'


def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
pipeline_option=None):
def cloud_preprocess(input_csvs, output_dir, checkpoint=None, pipeline_option=None):
"""Preprocess data in Cloud with DataFlow.
Produce output that can be used by training efficiently.
Args:
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
Preprocessing will concatenate the data inside all files and split them into
train/eval dataset. GCS paths only.
labels_file: The GCS path to the labels file which lists all labels, each in a separate line.
output_dir: The output directory to use. Preprocessing will create a sub directory under
it for each run, and also update "latest" file which points to the latest preprocessed
directory. Users are responsible for cleanup. GCS path only.
Expand All @@ -74,8 +71,7 @@ def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
# TODO: Move this to a new process to avoid pickling issues
# TODO: Expose train/eval split ratio
# TODO: Consider exposing explicit train/eval datasets
_cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, labels_file, output_dir,
pipeline_option)
_cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, output_dir, pipeline_option)
if (_util.is_in_IPython()):
import IPython

Expand All @@ -87,11 +83,9 @@ def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
IPython.display.display_html(html, raw=True)


def local_train(labels_file, input_dir, batch_size, max_steps, output_dir, checkpoint=None):
def local_train(input_dir, batch_size, max_steps, output_dir, checkpoint=None):
"""Train model locally. The output can be used for local prediction or for online deployment.
Args:
labels_file: The path to the labels file which lists all labels, each in a separate line.
It can be a local or a GCS path.
input_dir: A directory path containing preprocessed results. Can be local or GCS path.
batch_size: size of batch used for training.
max_steps: number of steps to train.
Expand All @@ -104,27 +98,25 @@ def local_train(labels_file, input_dir, batch_size, max_steps, output_dir, check
logger.setLevel(logging.INFO)
print 'Local training...'
try:
_local.Local(checkpoint).train(labels_file, input_dir, batch_size, max_steps, output_dir)
_local.Local(checkpoint).train(input_dir, batch_size, max_steps, output_dir)
finally:
logger.setLevel(original_level)
print 'Done'


def cloud_train(labels_file, input_dir, batch_size, max_steps, output_dir,
def cloud_train(input_dir, batch_size, max_steps, output_dir,
region, scale_tier='BASIC', checkpoint=None):
"""Train model in the cloud with CloudML trainer service.
The output can be used for local prediction or for online deployment.
Args:
labels_file: The path to the labels file which lists all labels, each in a separate line.
GCS path only.
input_dir: A directory path containing preprocessed results. GCS path only.
batch_size: size of batch used for training.
max_steps: number of steps to train.
output_dir: The output directory to use. GCS path only.
checkpoint: the Inception checkpoint to use.
"""

job_info = _cloud.Cloud(checkpoint=checkpoint).train(labels_file, input_dir, batch_size,
job_info = _cloud.Cloud(checkpoint=checkpoint).train(input_dir, batch_size,
max_steps, output_dir, region, scale_tier)
if (_util.is_in_IPython()):
import IPython
Expand All @@ -146,58 +138,55 @@ def _display_predict_results(results, show_image):
if show_image is True:
IPython.display.display_html('<p style="font-size:28px">%s(%.5f)</p>' % label_and_score,
raw=True)
IPython.display.display(IPython.display.Image(filename=image_file))
with ml.util._file.open_local_or_gcs(image_file, mode='r') as f:
IPython.display.display(IPython.display.Image(data=f.read()))
else:
IPython.display.display_html(
'<p>%s&nbsp&nbsp%s(%.5f)</p>' % ((image_file,) + label_and_score), raw=True)
else:
print results


def local_predict(model_dir, image_files, labels_file, show_image=True):
def local_predict(model_dir, image_files, show_image=True):
"""Predict using an offline model.
Args:
model_dir: The directory of a trained inception model. Can be local or GCS paths.
image_files: The paths to the image files to predict labels. Can be local or GCS paths.
labels_file: The path to the labels file which lists all labels, each in a separate line.
Can be local or GCS paths.
show_image: Whether to show images in the results.
"""

labels_and_scores = _local.Local().predict(model_dir, image_files, labels_file)
print('Predicting...')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing "done predicting"? Likewise below too.

Looks like printing "STAGE..." and "done" is not consistent in all the functions. For example, local_batch_predict does not print anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

labels_and_scores = _local.Local().predict(model_dir, image_files)
results = zip(image_files, labels_and_scores)
_display_predict_results(results, show_image)
print('Done')


def cloud_predict(model_id, image_files, labels_file, show_image=True):
def cloud_predict(model_id, image_files, show_image=True):
"""Predict using a deployed (online) model.
Args:
model_id: The deployed model id in the form of "model.version".
image_files: The paths to the image files to predict labels. GCS paths only.
labels_file: The path to the labels file which lists all labels, each in a separate line.
GCS paths only.
show_image: Whether to show images in the results.
"""

labels_and_scores = _cloud.Cloud().predict(model_id, image_files, labels_file)
print('Predicting...')
labels_and_scores = _cloud.Cloud().predict(model_id, image_files)
results = zip(image_files, labels_and_scores)
_display_predict_results(results, show_image)


def local_batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table=None):
def local_batch_predict(model_dir, input_csv, output_file, output_bq_table=None):
"""Batch predict using an offline model.
Args:
model_dir: The directory of a trained inception model. Can be local or GCS paths.
input_csv: The input csv which include two columns only: image_gs_url, label.
Can be local or GCS paths.
labels_file: The path to the labels file which lists all labels, each in a separate line.
Can be local or GCS paths.
output_file: The output csv file containing prediction results.
output_bq_table: If provided, will also save the results to BigQuery table.
"""
_local.Local().batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table)

print('Predicting...')
_local.Local().batch_predict(model_dir, input_csv, output_file, output_bq_table)
print('Done')

def cloud_batch_predict(model_dir, image_files, labels_file, show_image=True, output_file=None):
def cloud_batch_predict(model_dir, image_files, show_image=True, output_file=None):
"""Not Implemented Yet"""
pass
Loading