Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Commit e586108

Browse files
committed
Inception package improvements (#155)
* Inception package improvements. - It takes DataSets as input instead of CSV files. It also supports BigQuery source now. - Changes to make latest DataFlow and TensorFlow happy. - Changes in preprocessing to remove partial support for multiple labels. - Other minor improments. * Add a comment.
1 parent 94beaf0 commit e586108

File tree

7 files changed

+92
-61
lines changed

7 files changed

+92
-61
lines changed

datalab/mlalpha/_dataset.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def __init__(self, files, schema=None, schema_file=None):
4848
self._schema = json.load(f)
4949
self._files = []
5050
for file in files:
51-
self._files += ml.util._file.glob_files(file)
51+
# glob_files() returns unicode strings which doesn't make DataFlow happy. So str().
52+
self._files += [str(x) for x in ml.util._file.glob_files(file)]
5253

5354
@property
5455
def files(self):

solutionbox/inception/datalab_solutions/inception/_cloud.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,18 @@
2020
import base64
2121
import collections
2222
import datetime
23-
from googleapiclient import discovery
2423
import google.cloud.ml as ml
2524
import logging
2625
import os
2726

27+
2828
from . import _model
2929
from . import _preprocess
3030
from . import _trainer
3131
from . import _util
3232

3333

34-
_TF_GS_URL= 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl'
34+
_TF_GS_URL= 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc1-cp27-none-linux_x86_64.whl'
3535

3636

3737
class Cloud(object):
@@ -42,9 +42,10 @@ def __init__(self, checkpoint=None):
4242
if self._checkpoint is None:
4343
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL
4444

45-
def preprocess(self, input_csvs, output_dir, pipeline_option=None):
45+
def preprocess(self, dataset, output_dir, pipeline_option=None):
4646
"""Cloud preprocessing with Cloud DataFlow."""
4747

48+
import datalab.mlalpha as mlalpha
4849
job_name = 'preprocess-inception-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
4950
options = {
5051
'staging_location': os.path.join(output_dir, 'tmp', 'staging'),
@@ -59,9 +60,15 @@ def preprocess(self, input_csvs, output_dir, pipeline_option=None):
5960
options.update(pipeline_option)
6061

6162
opts = beam.pipeline.PipelineOptions(flags=[], **options)
62-
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
63-
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_name)
63+
p = beam.Pipeline('DataflowRunner', options=opts)
64+
if type(dataset) is mlalpha.CsvDataSet:
65+
_preprocess.configure_pipeline_csv(p, self._checkpoint, dataset.files, output_dir, job_name)
66+
elif type(dataset) is mlalpha.BigQueryDataSet:
67+
_preprocess.configure_pipeline_bigquery(p, self._checkpoint, dataset.sql, output_dir, job_name)
68+
else:
69+
raise ValueError('preprocess takes CsvDataSet or BigQueryDataset only.')
6470
p.run()
71+
return job_name
6572

6673
def train(self, input_dir, batch_size, max_steps, output_path,
6774
region, scale_tier):

solutionbox/inception/datalab_solutions/inception/_local.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,24 @@ def __init__(self, checkpoint=None):
4242
if self._checkpoint is None:
4343
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL
4444

45-
def preprocess(self, input_csvs, output_dir):
45+
def preprocess(self, dataset, output_dir):
4646
"""Local preprocessing with local DataFlow."""
47-
47+
48+
import datalab.mlalpha as mlalpha
4849
job_id = 'inception_preprocessed_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
49-
p = beam.Pipeline('DirectPipelineRunner')
50-
_preprocess.configure_pipeline(p, self._checkpoint, input_csvs, output_dir, job_id)
51-
p.run()
50+
# Project is needed for bigquery data source, even in local run.
51+
options = {
52+
'project': _util.default_project(),
53+
}
54+
opts = beam.pipeline.PipelineOptions(flags=[], **options)
55+
p = beam.Pipeline('DirectRunner', options=opts)
56+
if type(dataset) is mlalpha.CsvDataSet:
57+
_preprocess.configure_pipeline_csv(p, self._checkpoint, dataset.files, output_dir, job_id)
58+
elif type(dataset) is mlalpha.BigQueryDataSet:
59+
_preprocess.configure_pipeline_bigquery(p, self._checkpoint, dataset.sql, output_dir, job_id)
60+
else:
61+
raise ValueError('preprocess takes CsvDataSet or BigQueryDataset only.')
62+
p.run().wait_until_finish()
5263

5364
def train(self, input_dir, batch_size, max_steps, output_dir):
5465
"""Local training."""

solutionbox/inception/datalab_solutions/inception/_model.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ def loss(logits, labels):
389389
"""
390390
labels = tf.to_int64(labels)
391391
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
392-
logits, labels, name='xentropy')
392+
logits=logits, labels=labels, name='xentropy')
393393
return tf.reduce_mean(cross_entropy, name='xentropy_mean')
394394

395395

solutionbox/inception/datalab_solutions/inception/_package.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,11 @@
3636
from . import _util
3737

3838

39-
def local_preprocess(input_csvs, output_dir, checkpoint=None):
39+
def local_preprocess(dataset, output_dir, checkpoint=None):
4040
"""Preprocess data locally. Produce output that can be used by training efficiently.
4141
Args:
42-
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
43-
Preprocessing will concatenate the data inside all files and split them into
44-
train/eval dataset. Can be local or GCS path.
42+
dataset: data source to preprocess. Can be either datalab.mlalpha.CsvDataset, or
43+
datalab.mlalpha.BigQueryDataSet.
4544
output_dir: The output directory to use. Preprocessing will create a sub directory under
4645
it for each run, and also update "latest" file which points to the latest preprocessed
4746
directory. Users are responsible for cleanup. Can be local or GCS path.
@@ -51,17 +50,16 @@ def local_preprocess(input_csvs, output_dir, checkpoint=None):
5150
print 'Local preprocessing...'
5251
# TODO: Move this to a new process to avoid pickling issues
5352
# TODO: Expose train/eval split ratio
54-
_local.Local(checkpoint).preprocess(input_csvs, output_dir)
53+
_local.Local(checkpoint).preprocess(dataset, output_dir)
5554
print 'Done'
5655

5756

58-
def cloud_preprocess(input_csvs, output_dir, checkpoint=None, pipeline_option=None):
57+
def cloud_preprocess(dataset, output_dir, checkpoint=None, pipeline_option=None):
5958
"""Preprocess data in Cloud with DataFlow.
6059
Produce output that can be used by training efficiently.
6160
Args:
62-
input_csvs: A list of CSV files which include two columns only: image_gs_url, label.
63-
Preprocessing will concatenate the data inside all files and split them into
64-
train/eval dataset. GCS paths only.
61+
dataset: data source to preprocess. Can be either datalab.mlalpha.CsvDataset, or
62+
datalab.mlalpha.BigQueryDataSet. For CsvDataSet, all files need to be in GCS.
6563
output_dir: The output directory to use. Preprocessing will create a sub directory under
6664
it for each run, and also update "latest" file which points to the latest preprocessed
6765
directory. Users are responsible for cleanup. GCS path only.
@@ -70,14 +68,13 @@ def cloud_preprocess(input_csvs, output_dir, checkpoint=None, pipeline_option=No
7068

7169
# TODO: Move this to a new process to avoid pickling issues
7270
# TODO: Expose train/eval split ratio
73-
# TODO: Consider exposing explicit train/eval datasets
74-
_cloud.Cloud(checkpoint=checkpoint).preprocess(input_csvs, output_dir, pipeline_option)
71+
job_name = _cloud.Cloud(checkpoint=checkpoint).preprocess(dataset, output_dir, pipeline_option)
7572
if (_util.is_in_IPython()):
7673
import IPython
7774

7875
dataflow_url = 'https://console.developers.google.com/dataflow?project=%s' % \
7976
_util.default_project()
80-
html = 'Job submitted.'
77+
html = 'Job "%s" submitted.' % job_name
8178
html += '<p>Click <a href="%s" target="_blank">here</a> to track preprocessing job. <br/>' \
8279
% dataflow_url
8380
IPython.display.display_html(html, raw=True)
@@ -126,7 +123,7 @@ def cloud_train(input_dir, batch_size, max_steps, output_dir,
126123
}
127124
log_url = 'https://console.developers.google.com/logs/viewer?' + \
128125
urllib.urlencode(log_url_query_strings)
129-
html = 'Job submitted.'
126+
html = 'Job "%s" submitted.' % job_info['jobId']
130127
html += '<p>Click <a href="%s" target="_blank">here</a> to view cloud log. <br/>' % log_url
131128
IPython.display.display_html(html, raw=True)
132129

solutionbox/inception/datalab_solutions/inception/_preprocess.py

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919

2020
import apache_beam as beam
21-
from apache_beam.utils.options import PipelineOptions
21+
from apache_beam.utils.pipeline_options import PipelineOptions
2222
import cStringIO
2323
import csv
2424
import google.cloud.ml as ml
@@ -35,8 +35,7 @@
3535
slim = tf.contrib.slim
3636

3737
error_count = beam.Aggregator('errorCount')
38-
csv_rows_count = beam.Aggregator('csvRowsCount')
39-
labels_count = beam.Aggregator('labelsCount')
38+
rows_count = beam.Aggregator('RowsCount')
4039
skipped_empty_line = beam.Aggregator('skippedEmptyLine')
4140
embedding_good = beam.Aggregator('embedding_good')
4241
embedding_bad = beam.Aggregator('embedding_bad')
@@ -66,27 +65,23 @@ def process(self, context, all_labels):
6665
self.label_to_id_map[label] = i
6766

6867
# Row format is:
69-
# image_uri(,label_ids)*
70-
row = context.element
71-
if not row:
68+
# image_uri,label_id
69+
element = context.element
70+
if not element:
7271
context.aggregate_to(skipped_empty_line, 1)
7372
return
7473

75-
context.aggregate_to(csv_rows_count, 1)
76-
uri = row[0]
74+
context.aggregate_to(rows_count, 1)
75+
uri = element['image_url']
7776
if not uri or not uri.startswith('gs://'):
7877
context.aggregate_to(invalid_uri, 1)
7978
return
8079

81-
# In a real-world system, you may want to provide a default id for labels
82-
# that were not in the dictionary. In this sample, we will throw an error.
83-
# This code already supports multi-label problems if you want to use it.
84-
label_ids = [self.label_to_id_map[label.strip()] for label in row[1:]]
85-
context.aggregate_to(labels_count, len(label_ids))
86-
87-
if not label_ids:
80+
try:
81+
label_id = self.label_to_id_map[element['label'].strip()]
82+
except KeyError:
8883
context.aggregate_to(ignored_unlabeled_image, 1)
89-
yield row[0], label_ids
84+
yield uri, label_id
9085

9186

9287
class ReadImageAndConvertToJpegDoFn(beam.DoFn):
@@ -97,7 +92,7 @@ class ReadImageAndConvertToJpegDoFn(beam.DoFn):
9792
"""
9893

9994
def process(self, context):
100-
uri, label_ids = context.element
95+
uri, label_id = context.element
10196

10297
try:
10398
with ml.util._file.open_local_or_gcs(uri, mode='r') as f:
@@ -114,7 +109,7 @@ def process(self, context):
114109
output = cStringIO.StringIO()
115110
img.save(output, 'jpeg')
116111
image_bytes = output.getvalue()
117-
yield uri, label_ids, image_bytes
112+
yield uri, label_id, image_bytes
118113

119114

120115
class EmbeddingsGraph(object):
@@ -250,7 +245,7 @@ def _bytes_feature(value):
250245
def _float_feature(value):
251246
return tf.train.Feature(float_list=tf.train.FloatList(value=value))
252247

253-
uri, label_ids, image_bytes = context.element
248+
uri, label_id, image_bytes = context.element
254249

255250
try:
256251
embedding = self.preprocess_graph.calculate_embedding(image_bytes)
@@ -265,13 +260,11 @@ def _float_feature(value):
265260
context.aggregate_to(embedding_bad, 1)
266261

267262
example = tf.train.Example(features=tf.train.Features(feature={
268-
'image_uri': _bytes_feature([uri]),
263+
'image_uri': _bytes_feature([str(uri)]),
269264
'embedding': _float_feature(embedding.ravel().tolist()),
270265
}))
271266

272-
if label_ids:
273-
label_ids.sort()
274-
example.features.feature['label'].int64_list.value.extend(label_ids)
267+
example.features.feature['label'].int64_list.value.append(label_id)
275268

276269
yield example
277270

@@ -283,20 +276,31 @@ def partition_for(self, context, num_partitions):
283276
return 1 if random.random() > 0.7 else 0
284277

285278

286-
def configure_pipeline(p, checkpoint_path, input_paths, output_dir, job_id):
287-
"""Specify PCollection and transformations in pipeline."""
288-
output_latest_file = os.path.join(output_dir, 'latest')
279+
def _get_sources_from_csvs(p, input_paths):
289280
source_list = []
290281
for ii, input_path in enumerate(input_paths):
291-
input_source = beam.io.TextFileSource(input_path, strip_trailing_newlines=True)
292-
source_list.append(p | 'Read input %d' % ii >> beam.Read(input_source))
293-
all_sources = source_list | 'Flatten Sources' >> beam.Flatten()
294-
labels = (all_sources
295-
| 'Parse input for labels' >> beam.Map(lambda line: csv.reader([line]).next()[1])
282+
source_list.append(p | 'Read from Csv %d' % ii >>
283+
beam.io.ReadFromText(input_path, strip_trailing_newlines=True))
284+
all_sources = (source_list | 'Flatten Sources' >> beam.Flatten()
285+
| beam.Map(lambda line: csv.DictReader([line], fieldnames=['image_url', 'label']).next()))
286+
return all_sources
287+
288+
289+
def _get_sources_from_bigquery(p, query):
290+
if len(query.split()) == 1:
291+
bq_source = beam.io.BigQuerySource(table=query)
292+
else:
293+
bq_source = beam.io.BigQuerySource(query=query)
294+
query_results = p | 'Read from BigQuery' >> beam.io.Read(bq_source)
295+
return query_results
296+
297+
298+
def _configure_pipeline_from_source(source, checkpoint_path, output_dir, job_id):
299+
labels = (source
300+
| 'Parse input for labels' >> beam.Map(lambda x: x['label'])
296301
| 'Combine labels' >> beam.transforms.combiners.Count.PerElement()
297302
| 'Get labels' >> beam.Map(lambda label_count: label_count[0]))
298-
all_preprocessed = (all_sources
299-
| 'Parse input' >> beam.Map(lambda line: csv.reader([line]).next())
303+
all_preprocessed = (source
300304
| 'Extract label ids' >> beam.ParDo(ExtractLabelIdsDoFn(),
301305
beam.pvalue.AsIter(labels))
302306
| 'Read and convert to JPEG' >> beam.ParDo(ReadImageAndConvertToJpegDoFn())
@@ -311,8 +315,19 @@ def configure_pipeline(p, checkpoint_path, input_paths, output_dir, job_id):
311315
eval_save = train_eval[1] | 'Save eval to disk' >> SaveFeatures(preprocessed_eval)
312316
train_save = train_eval[0] | 'Save train to disk' >> SaveFeatures(preprocessed_train)
313317
# Make sure we write "latest" file after train and eval data are successfully written.
318+
output_latest_file = os.path.join(output_dir, 'latest')
314319
([eval_save, train_save, labels_save] | 'Wait for train eval saving' >> beam.Flatten() |
315320
beam.transforms.combiners.Sample.FixedSizeGlobally('Fixed One', 1) |
316321
beam.Map(lambda path: job_id) |
317322
'WriteLatest' >> beam.io.textio.WriteToText(output_latest_file, shard_name_template=''))
318323

324+
325+
def configure_pipeline_csv(p, checkpoint_path, input_paths, output_dir, job_id):
326+
all_sources = _get_sources_from_csvs(p, input_paths)
327+
_configure_pipeline_from_source(all_sources, checkpoint_path, output_dir, job_id)
328+
329+
330+
def configure_pipeline_bigquery(p, checkpoint_path, query, output_dir, job_id):
331+
all_sources = _get_sources_from_bigquery(p, query)
332+
_configure_pipeline_from_source(all_sources, checkpoint_path, output_dir, job_id)
333+

solutionbox/inception/datalab_solutions/inception/_trainer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ def evaluate(self, num_eval_batches=None):
6969
with tf.Graph().as_default() as graph:
7070
self.tensors = self.model.build_eval_graph(self.eval_data_paths,
7171
self.batch_size)
72-
self.summary = tf.merge_all_summaries()
72+
self.summary = tf.summary.merge_all()
7373
self.saver = tf.train.Saver()
7474

75-
self.summary_writer = tf.train.SummaryWriter(self.output_path)
75+
self.summary_writer = tf.summary.FileWriter(self.output_path)
7676
self.sv = tf.train.Supervisor(
7777
graph=graph,
7878
logdir=self.output_path,
@@ -163,7 +163,7 @@ def run_training(self):
163163
self.saver = tf.train.Saver()
164164

165165
# Build the summary operation based on the TF collection of Summaries.
166-
self.summary_op = tf.merge_all_summaries()
166+
self.summary_op = tf.summary.merge_all()
167167

168168
# Create a "supervisor", which oversees the training process.
169169
self.sv = tf.train.Supervisor(

0 commit comments

Comments
 (0)