Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Datalab Inception (image classification) solution. #117

Merged
merged 2 commits into from
Jan 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions solutionbox/inception/datalab_solutions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.

14 changes: 14 additions & 0 deletions solutionbox/inception/datalab_solutions/inception/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.


from ._package import local_preprocess, cloud_preprocess, local_train, cloud_train, local_predict, cloud_predict
140 changes: 140 additions & 0 deletions solutionbox/inception/datalab_solutions/inception/_cloud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Cloud implementation for preprocessing, training and prediction for inception model.
"""

import apache_beam as beam
import base64
import collections
import datetime
from googleapiclient import discovery
import google.cloud.ml as ml
import logging
import os

from . import _model
from . import _preprocess
from . import _trainer
from . import _util


_CLOUDML_DISCOVERY_URL = 'https://storage.googleapis.com/cloud-ml/discovery/' \
'ml_v1beta1_discovery.json'
_TF_GS_URL= 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl'


class Cloud(object):
"""Class for cloud training, preprocessing and prediction."""

def __init__(self, project, checkpoint=None):
self._project = project
self._checkpoint = checkpoint
if self._checkpoint is None:
self._checkpoint = _util._DEFAULT_CHECKPOINT_GSURL

def preprocess(self, input_csvs, labels_file, output_dir, pipeline_option=None):
"""Cloud preprocessing with Cloud DataFlow."""

job_name = 'preprocess-inception-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
options = {
'staging_location': os.path.join(output_dir, 'tmp', 'staging'),
'temp_location': os.path.join(output_dir, 'tmp'),
'job_name': job_name,
'project': self._project,
'extra_packages': [ml.sdk_location, _util._PACKAGE_GS_URL, _TF_GS_URL],
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True
}
if pipeline_option is not None:
options.update(pipeline_option)

opts = beam.pipeline.PipelineOptions(flags=[], **options)
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
_preprocess.configure_pipeline(
p, self._checkpoint, input_csvs, labels_file, output_dir, job_name)
p.run()

def train(self, labels_file, input_dir, batch_size, max_steps, output_path, credentials,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be the implementation of %ml train, not to be in the inception package itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%ml train (in datalab code as a separate change) basically runs the function through inspection. The function is not supposed to be called directly.

There are a few reasons that a function is more flexible than program args:

  1. Function arguments can easily be enumerated programmatically (so --dumpargs is easy to implement)
  2. It can display package specific content in Datalab directly. For example, predict() can display image in output along with labels. preprocess() can display html links for DataFlow jobs. train() can display job logs. The output behavior can be different package by package.
  3. There can be extra processing before submitting to cloud. In this case, we get the number of labels from labels file. We can also do more (such as validating package specific args).

region, scale_tier):
"""Cloud training with CloudML trainer service."""

num_classes = len(_util.get_labels(labels_file))
job_id = 'inception_train_' + datetime.datetime.now().strftime('%y%m%d_%H%M%S')
job_args_dict = {
'input_dir': input_dir,
'output_path': output_path,
'max_steps': max_steps,
'batch_size': batch_size,
'num_classes': num_classes,
'checkpoint': self._checkpoint
}
# convert job_args from dict to list as service required.
job_args = []
for k,v in job_args_dict.iteritems():
if isinstance(v, list):
for item in v:

job_args.append('--' + k)
job_args.append(str(item))
else:
job_args.append('--' + k)
job_args.append(str(v))

job_request = {
'package_uris': _util._PACKAGE_GS_URL,
'python_module': 'datalab_solutions.inception.task',
'scale_tier': scale_tier,
'region': region,
'args': job_args
}
job = {
'job_id': job_id,
'training_input': job_request,
}
cloudml = discovery.build('ml', 'v1beta1', discoveryServiceUrl=_CLOUDML_DISCOVERY_URL,
credentials=credentials)
request = cloudml.projects().jobs().create(body=job,
parent='projects/' + self._project)
request.headers['user-agent'] = 'GoogleCloudDataLab/1.0'
job_info = request.execute()
return job_info

def predict(self, model_id, image_files, labels_file, credentials):
"""Cloud prediction with CloudML prediction service."""

labels = _util.get_labels(labels_file)
data = []
for ii, img_file in enumerate(image_files):
with ml.util._file.open_local_or_gcs(img_file, 'rb') as f:
img = base64.b64encode(f.read())
data.append({
'key': str(ii),
'image_bytes': {'b64': img}
})
parts = model_id.split('.')
if len(parts) != 2:
raise Exception('Invalid model name for cloud prediction. Use "model.version".')
full_version_name = ('projects/%s/models/%s/versions/%s' % (self._project, parts[0], parts[1]))
api = discovery.build('ml', 'v1beta1', credentials=credentials,
discoveryServiceUrl=_CLOUDML_DISCOVERY_URL)
request = api.projects().predict(body={'instances': data}, name=full_version_name)
job_results = request.execute()
if 'predictions' not in job_results:
raise Exception('Invalid response from service. Cannot find "predictions" in response.')
predictions = job_results['predictions']
labels_and_scores = [(labels[x['prediction']], x['scores'][x['prediction']])
for x in predictions]
return labels_and_scores
Loading