Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Commit ee86996

Browse files
committed
Inception Package Improvements (#138)
* Fix an issue that prediction right after preprocessing fails in inception package local run. * Remove the "labels_file" parameter from inception preprocess/train/predict. Instead it will get labels from training data. Prediction graph will return labels. Make online prediction works with GCS images. "%%ml alpha deploy" now also check for "/model" subdir if needed. Other minor improvements. * Make local batch prediction really batched. Batch prediction input may not have to include target column. Sort labels, so it is consistent between preprocessing and training. Follow up other core review comments. * Follow up code review comments.
1 parent 707659e commit ee86996

File tree

10 files changed

+197
-133
lines changed

10 files changed

+197
-133
lines changed

datalab/mlalpha/_cloud_models.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,16 @@ def get(self, version_name):
151151
def _wait_for_long_running_operation(self, response):
152152
if 'name' not in response:
153153
raise Exception('Invaid response from service. Cannot find "name" field.')
154+
print('Waiting for job "%s"' % response['name'])
154155
while True:
155156
response = self._api.projects().operations().get(name=response['name']).execute()
156157
if 'done' not in response or response['done'] != True:
157158
time.sleep(3)
158159
else:
159160
if 'error' in response:
160-
print response['error']
161+
print(response['error'])
162+
else:
163+
print('Done.')
161164
break
162165

163166
def deploy(self, version_name, path):
@@ -173,7 +176,10 @@ def deploy(self, version_name, path):
173176
if not path.startswith('gs://'):
174177
raise Exception('Invalid path. Only Google Cloud Storage path (gs://...) is accepted.')
175178
if not datalab.storage.Item.from_url(os.path.join(path, 'export.meta')).exists():
176-
raise Exception('Cannot find export.meta from given path.')
179+
# try appending '/model' sub dir.
180+
path = os.path.join(path, 'model')
181+
if not datalab.storage.Item.from_url(os.path.join(path, 'export.meta')).exists():
182+
raise Exception('Cannot find export.meta from given path.')
177183

178184
body = {'name': self._model_name}
179185
parent = 'projects/' + self._project_id

solutionbox/inception/datalab_solutions/inception/_cloud.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def __init__(self, checkpoint=None):
4242
if self._checkpoint is None:
4343
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL
4444

45-
def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None):
45+
def preprocess(self, input_csvs, output_dir, pipeline_option=None):
4646
"""Cloud preprocessing with Cloud DataFlow."""
4747

4848
job_name = 'preprocess-inception-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
@@ -60,22 +60,19 @@ def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None):
6060

6161
opts = beam.pipeline.PipelineOptions(flags=[], **options)
6262
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
63-
_preprocess.configure_pipeline(
64-
p, self._checkpoint, input_csvs, labels_file, output_dir, job_name)
63+
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_name)
6564
p.run()
6665

67-
def train(self, labels_file, input_dir, batch_size, max_steps, output_path,
66+
def train(self, input_dir, batch_size, max_steps, output_path,
6867
region, scale_tier):
6968
"""Cloud training with CloudML trainer service."""
7069

7170
import datalab.mlalpha as mlalpha
72-
num_classes = len(_util.get_labels(labels_file))
7371
job_args = {
7472
'input_dir': input_dir,
7573
'output_path': output_path,
7674
'max_steps': max_steps,
7775
'batch_size': batch_size,
78-
'num_classes': num_classes,
7976
'checkpoint': self._checkpoint
8077
}
8178
job_request = {
@@ -89,16 +86,14 @@ def train(self, labels_file, input_dir, batch_size, max_steps, output_path,
8986
job_id = 'inception_train_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
9087
return cloud_runner.run(job_id)
9188

92-
def predict(self, model_id, image_files, labels_file):
89+
def predict(self, model_id, image_files):
9390
"""Cloud prediction with CloudML prediction service."""
9491

9592
import datalab.mlalpha as mlalpha
9693
parts = model_id.split('.')
9794
if len(parts) != 2:
9895
raise Exception('Invalid model name for cloud prediction. Use "model.version".')
9996

100-
labels = _util.get_labels(labels_file)
101-
labels.append('UNKNOWN')
10297
data = []
10398
for ii, img_file in enumerate(image_files):
10499
with ml.util._file.open_local_or_gcs(img_file, 'rb') as f:
@@ -110,6 +105,11 @@ def predict(self, model_id, image_files, labels_file):
110105

111106
cloud_predictor = mlalpha.CloudPredictor(parts[0], parts[1])
112107
predictions = cloud_predictor.predict(data)
113-
labels_and_scores = [(labels[x['prediction']], x['scores'][x['prediction']])
108+
if len(predictions) == 0:
109+
raise Exception('Prediction results are empty.')
110+
# Although prediction results contains a labels list in each instance, they are all the same
111+
# so taking the first one.
112+
labels = predictions[0]['labels']
113+
labels_and_scores = [(x['prediction'], x['scores'][labels.index(x['prediction'])])
114114
for x in predictions]
115115
return labels_and_scores

solutionbox/inception/datalab_solutions/inception/_local.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,32 +42,31 @@ def __init__(self, checkpoint=None):
4242
if self._checkpoint is None:
4343
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL
4444

45-
def preprocess(self, input_csvs, labels_file, output_dir):
45+
def preprocess(self, input_csvs, output_dir):
4646
"""Local preprocessing with local DataFlow."""
4747

4848
job_id = 'inception_preprocessed_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
4949
p = beam.Pipeline('DirectPipelineRunner')
50-
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, labels_file,
51-
output_dir, job_id)
50+
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_id)
5251
p.run()
5352

54-
def train(self, labels_file, input_dir, batch_size, max_steps, output_dir):
53+
def train(self, input_dir, batch_size, max_steps, output_dir):
5554
"""Local training."""
5655

57-
num_classes = len(_util.get_labels(labels_file))
58-
model = _model.Model(num_classes, 0.5, self._checkpoint)
56+
labels = _util.get_labels(input_dir)
57+
model = _model.Model(labels, 0.5, self._checkpoint)
5958
task_data = {'type': 'master', 'index': 0}
6059
task = type('TaskSpec', (object,), task_data)
6160
_trainer.Trainer(input_dir, batch_size, max_steps, output_dir,
6261
model, None, task).run_training()
6362

64-
def predict(self, model_dir, image_files, labels_file):
63+
def predict(self, model_dir, image_files):
6564
"""Local prediction."""
6665

67-
return _predictor.predict(model_dir, image_files, labels_file)
66+
return _predictor.predict(model_dir, image_files)
6867

6968

70-
def batch_predict(self, model_dir, input_csv, labels_file, output_file, output_bq_table):
69+
def batch_predict(self, model_dir, input_csv, output_file, output_bq_table):
7170
"""Local batch prediction."""
7271

73-
return _predictor.batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table)
72+
return _predictor.batch_predict(model_dir, input_csv, output_file, output_bq_table)

solutionbox/inception/datalab_solutions/inception/_model.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ def __init__(self):
5959
class Model(object):
6060
"""TensorFlow model for the flowers problem."""
6161

62-
def __init__(self, label_count, dropout, inception_checkpoint_file):
63-
self.label_count = label_count
62+
def __init__(self, labels, dropout, inception_checkpoint_file):
63+
self.labels = labels
64+
self.labels.sort()
6465
self.dropout = dropout
6566
self.inception_checkpoint_file = inception_checkpoint_file
6667

@@ -203,7 +204,7 @@ def build_graph(self, data_paths, batch_size, graph_mod):
203204
'label':
204205
tf.FixedLenFeature(
205206
shape=[1], dtype=tf.int64,
206-
default_value=[self.label_count]),
207+
default_value=[len(self.labels)]),
207208
'embedding':
208209
tf.FixedLenFeature(
209210
shape=[BOTTLENECK_TENSOR_SIZE], dtype=tf.float32)
@@ -215,7 +216,7 @@ def build_graph(self, data_paths, batch_size, graph_mod):
215216

216217
# We assume a default label, so the total number of labels is equal to
217218
# label_count+1.
218-
all_labels_count = self.label_count + 1
219+
all_labels_count = len(self.labels) + 1
219220
with tf.name_scope('final_ops'):
220221
softmax, logits = self.add_final_training_ops(
221222
embeddings,
@@ -316,12 +317,28 @@ def build_prediction_graph(self):
316317

317318
# To extract the id, we need to add the identity function.
318319
keys = tf.identity(keys_placeholder)
320+
labels = self.labels + ['UNKNOWN']
321+
predicted_label = tf.contrib.lookup.index_to_string(tensors.predictions[0],
322+
mapping=labels)
323+
# Need to duplicate the labels by num_of_instances so the output is one batch
324+
# (all output members share the same outer dimension).
325+
# The labels are needed for client to match class scores list.
326+
labels_tensor = tf.expand_dims(tf.constant(labels), 0)
327+
num_instance = tf.shape(keys)
328+
labels_tensors_n = tf.tile(labels_tensor, tf.concat(0, [num_instance, [1]]))
329+
319330
outputs = {
320331
'key': keys.name,
321-
'prediction': tensors.predictions[0].name,
322-
'scores': tensors.predictions[1].name
332+
'prediction': predicted_label.name,
333+
'labels': labels_tensors_n.name,
334+
'scores': tensors.predictions[1].name,
323335
}
324336
tf.add_to_collection('outputs', json.dumps(outputs))
337+
# Add table init op to collection so online prediction will load the model and run it.
338+
# TODO: initialize_all_tables is going to be deprecated but the replacement
339+
# tf.tables_initializer does not exist in 0.12 yet.
340+
init_tables_op = tf.initialize_all_tables()
341+
tf.add_to_collection(tf.contrib.session_bundle.constants.INIT_OP_KEY, init_tables_op)
325342

326343
def export(self, last_checkpoint, output_dir):
327344
"""Builds a prediction graph and xports the model.
@@ -340,8 +357,7 @@ def export(self, last_checkpoint, output_dir):
340357
last_checkpoint)
341358
saver = tf.train.Saver()
342359
saver.export_meta_graph(filename=os.path.join(output_dir, 'export.meta'))
343-
saver.save(
344-
sess, os.path.join(output_dir, 'export'), write_meta_graph=False)
360+
saver.save(sess, os.path.join(output_dir, 'export'), write_meta_graph=False)
345361

346362
def format_metric_values(self, metric_values):
347363
"""Formats metric values - used for logging purpose."""

solutionbox/inception/datalab_solutions/inception/_package.py

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
Datalab will look for functions with the above names.
2424
"""
2525

26+
import google.cloud.ml as ml
2627
import logging
2728
import os
2829
import urllib
@@ -35,14 +36,12 @@
3536
from . import _util
3637

3738

38-
def local_preprocess(input_csvs, labels_file, output_dir, checkpoint=None):
39+
def local_preprocess(input_csvs, output_dir, checkpoint=None):
3940
"""Preprocess data locally. Produce output that can be used by training efficiently.
4041
Args:
4142
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
4243
Preprocessing will concatenate the data inside all files and split them into
4344
train/eval dataset. Can be local or GCS path.
44-
labels_file: The path to the labels file which lists all labels, each in a separate line.
45-
It can be a local or a GCS path.
4645
output_dir: The output directory to use. Preprocessing will create a sub directory under
4746
it for each run, and also update "latest" file which points to the latest preprocessed
4847
directory. Users are responsible for cleanup. Can be local or GCS path.
@@ -52,19 +51,17 @@ def local_preprocess(input_csvs, labels_file, output_dir, checkpoint=None):
5251
print 'Local preprocessing...'
5352
# TODO: Move this to a new process to avoid pickling issues
5453
# TODO: Expose train/eval split ratio
55-
_local.Local(checkpoint).preprocess(input_csvs, labels_file, output_dir)
54+
_local.Local(checkpoint).preprocess(input_csvs, output_dir)
5655
print 'Done'
5756

5857

59-
def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
60-
pipeline_option=None):
58+
def cloud_preprocess(input_csvs, output_dir, checkpoint=None, pipeline_option=None):
6159
"""Preprocess data in Cloud with DataFlow.
6260
Produce output that can be used by training efficiently.
6361
Args:
6462
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
6563
Preprocessing will concatenate the data inside all files and split them into
6664
train/eval dataset. GCS paths only.
67-
labels_file: The GCS path to the labels file which lists all labels, each in a separate line.
6865
output_dir: The output directory to use. Preprocessing will create a sub directory under
6966
it for each run, and also update "latest" file which points to the latest preprocessed
7067
directory. Users are responsible for cleanup. GCS path only.
@@ -74,8 +71,7 @@ def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
7471
# TODO: Move this to a new process to avoid pickling issues
7572
# TODO: Expose train/eval split ratio
7673
# TODO: Consider exposing explicit train/eval datasets
77-
_cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, labels_file, output_dir,
78-
pipeline_option)
74+
_cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, output_dir, pipeline_option)
7975
if (_util.is_in_IPython()):
8076
import IPython
8177

@@ -87,11 +83,9 @@ def cloud_preprocess(input_csvs, labels_file, output_dir, checkpoint=None,
8783
IPython.display.display_html(html, raw=True)
8884

8985

90-
def local_train(labels_file, input_dir, batch_size, max_steps, output_dir, checkpoint=None):
86+
def local_train(input_dir, batch_size, max_steps, output_dir, checkpoint=None):
9187
"""Train model locally. The output can be used for local prediction or for online deployment.
9288
Args:
93-
labels_file: The path to the labels file which lists all labels, each in a separate line.
94-
It can be a local or a GCS path.
9589
input_dir: A directory path containing preprocessed results. Can be local or GCS path.
9690
batch_size: size of batch used for training.
9791
max_steps: number of steps to train.
@@ -104,27 +98,25 @@ def local_train(labels_file, input_dir, batch_size, max_steps, output_dir, check
10498
logger.setLevel(logging.INFO)
10599
print 'Local training...'
106100
try:
107-
_local.Local(checkpoint).train(labels_file, input_dir, batch_size, max_steps, output_dir)
101+
_local.Local(checkpoint).train(input_dir, batch_size, max_steps, output_dir)
108102
finally:
109103
logger.setLevel(original_level)
110104
print 'Done'
111105

112106

113-
def cloud_train(labels_file, input_dir, batch_size, max_steps, output_dir,
107+
def cloud_train(input_dir, batch_size, max_steps, output_dir,
114108
region, scale_tier='BASIC', checkpoint=None):
115109
"""Train model in the cloud with CloudML trainer service.
116110
The output can be used for local prediction or for online deployment.
117111
Args:
118-
labels_file: The path to the labels file which lists all labels, each in a separate line.
119-
GCS path only.
120112
input_dir: A directory path containing preprocessed results. GCS path only.
121113
batch_size: size of batch used for training.
122114
max_steps: number of steps to train.
123115
output_dir: The output directory to use. GCS path only.
124116
checkpoint: the Inception checkpoint to use.
125117
"""
126118

127-
job_info = _cloud.Cloud(checkpoint=checkpoint).train(labels_file, input_dir, batch_size,
119+
job_info = _cloud.Cloud(checkpoint=checkpoint).train(input_dir, batch_size,
128120
max_steps, output_dir, region, scale_tier)
129121
if (_util.is_in_IPython()):
130122
import IPython
@@ -146,58 +138,55 @@ def _display_predict_results(results, show_image):
146138
if show_image is True:
147139
IPython.display.display_html('<p style="font-size:28px">%s(%.5f)</p>' % label_and_score,
148140
raw=True)
149-
IPython.display.display(IPython.display.Image(filename=image_file))
141+
with ml.util._file.open_local_or_gcs(image_file, mode='r') as f:
142+
IPython.display.display(IPython.display.Image(data=f.read()))
150143
else:
151144
IPython.display.display_html(
152145
'<p>%s&nbsp&nbsp%s(%.5f)</p>' % ((image_file,) + label_and_score), raw=True)
153146
else:
154147
print results
155148

156149

157-
def local_predict(model_dir, image_files, labels_file, show_image=True):
150+
def local_predict(model_dir, image_files, show_image=True):
158151
"""Predict using an offline model.
159152
Args:
160153
model_dir: The directory of a trained inception model. Can be local or GCS paths.
161154
image_files: The paths to the image files to predict labels. Can be local or GCS paths.
162-
labels_file: The path to the labels file which lists all labels, each in a separate line.
163-
Can be local or GCS paths.
164155
show_image: Whether to show images in the results.
165156
"""
166-
167-
labels_and_scores = _local.Local().predict(model_dir, image_files, labels_file)
157+
print('Predicting...')
158+
labels_and_scores = _local.Local().predict(model_dir, image_files)
168159
results = zip(image_files, labels_and_scores)
169160
_display_predict_results(results, show_image)
161+
print('Done')
170162

171163

172-
def cloud_predict(model_id, image_files, labels_file, show_image=True):
164+
def cloud_predict(model_id, image_files, show_image=True):
173165
"""Predict using a deployed (online) model.
174166
Args:
175167
model_id: The deployed model id in the form of "model.version".
176168
image_files: The paths to the image files to predict labels. GCS paths only.
177-
labels_file: The path to the labels file which lists all labels, each in a separate line.
178-
GCS paths only.
179169
show_image: Whether to show images in the results.
180170
"""
181-
182-
labels_and_scores = _cloud.Cloud().predict(model_id, image_files, labels_file)
171+
print('Predicting...')
172+
labels_and_scores = _cloud.Cloud().predict(model_id, image_files)
183173
results = zip(image_files, labels_and_scores)
184174
_display_predict_results(results, show_image)
185175

186176

187-
def local_batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table=None):
177+
def local_batch_predict(model_dir, input_csv, output_file, output_bq_table=None):
188178
"""Batch predict using an offline model.
189179
Args:
190180
model_dir: The directory of a trained inception model. Can be local or GCS paths.
191181
input_csv: The input csv which include two columns only: image_gs_url, label.
192182
Can be local or GCS paths.
193-
labels_file: The path to the labels file which lists all labels, each in a separate line.
194-
Can be local or GCS paths.
195183
output_file: The output csv file containing prediction results.
196184
output_bq_table: If provided, will also save the results to BigQuery table.
197185
"""
198-
_local.Local().batch_predict(model_dir, input_csv, labels_file, output_file, output_bq_table)
199-
186+
print('Predicting...')
187+
_local.Local().batch_predict(model_dir, input_csv, output_file, output_bq_table)
188+
print('Done')
200189

201-
def cloud_batch_predict(model_dir, image_files, labels_file, show_image=True, output_file=None):
190+
def cloud_batch_predict(model_dir, image_files, show_image=True, output_file=None):
202191
"""Not Implemented Yet"""
203192
pass

0 commit comments

Comments
 (0)