Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

add structure data preprocessing and training #132

Merged
merged 3 commits into from
Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions solutionbox/structured_data/datalab_solutions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2017 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.


from ._package import local_preprocess, cloud_preprocess, local_train, cloud_train, local_predict, \
cloud_predict, local_batch_predict, cloud_batch_predict
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Provides interface for Datalab.

Datalab will look for functions with the below names:
local_preprocess
local_train
local_predict
cloud_preprocess
cloud_train
cloud_predict
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import datetime
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import urllib

import tensorflow as tf
import yaml

import google.cloud.ml as ml

_TF_GS_URL = 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl'


def _percent_flags(train_percent=None, eval_percent=None, test_percent=None):
"""Convert train/eval/test percents into command line flags.

Args:
train_percent: Int in range [0, 100].
eval_percent: Int in range [0, 100].
train_percent: Int in range [0, 100].

Returns:
Array of strings encoding the command line flags.
"""
train_percent = train_percent or 0
eval_percent = eval_percent or 0
test_percent = test_percent or 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If our prediction (both instant and batch) takes raw data, there is no need for preprocessed test_data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not use the gcloud prediction commands before, but this is what I think will happen:

local_predict: new package code. Should take raw instances to match cloud_predict

cloud_predict: call to gcloud beta ml predict. Takes raw instances.

local_batch_predict: new package code. Should take preprocessed tfrecords to match cloud_batch_predict

cloud_batch_predict: call to gcloud beta ml jobs submit prediction. Takes tfrecord/preprocessed?

if train_percent == 0 and eval_percent == 0 and test_percent == 0:
percent_flags = []
else:
percent_flags = ['--train_percent=%s' % str(train_percent),
'--eval_percent=%s' % str(eval_percent),
'--test_percent=%s' % str(test_percent)]
return percent_flags


def _default_project():
import datalab.context
context = datalab.context.Context.default()
return context.project_id

def _is_in_IPython():
try:
import IPython
return True
except ImportError:
return False

def _check_transforms_config_file(transforms_config_file):
"""Check that the transforms file has expected values."""
pass


def _run_cmd(cmd):
output = subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)

while True:
line = output.stdout.readline().rstrip()
print(line)
if line == '' and output.poll() != None:
break

def local_preprocess(input_file_path, output_dir, transforms_config_file,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a way to take transforms_config from a cell input so that:

  1. A notebook does not need to reference an external file.
  2. Easy for users to input the config. Datalab is good at taking a form as input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully agree. Before making changes, I want to finish the prediction stuff.

I also want to revisit the config format first. I think Nikhil now wants separate schema/transform config files. Let me put a doc together/start a discussion before making changes.

train_percent=None, eval_percent=None, test_percent=None):
"""Preprocess data locally with Beam.

Produce output that can be used by training efficiently. Will also split
data into three sets (training, eval, and test). {train, eval, test}_percent
should be nonnegative integers that sum to 100.

Args:
input_file_path: String. File pattern what will expand into a list of csv
files. Preprocessing will automatically slip the data into three sets
for training, evaluation, and testing. Can be local or GCS path.
output_dir: The output directory to use; can be local or GCS path.
transforms_config_file: File path to the config file.
train_percent: Int in range [0, 100].
eval_percent: Int in range [0, 100].
train_percent: Int in range [0, 100].
"""
_check_transforms_config_file(transforms_config_file)

percent_flags = _percent_flags(train_percent, eval_percent, test_percent)
this_folder = os.path.dirname(os.path.abspath(__file__))

cmd = ['python',
os.path.join(this_folder, 'preprocess/preprocess.py'),
'--input_file_path=%s' % input_file_path,
'--output_dir=%s' % output_dir,
'--transforms_config_file=%s' % transforms_config_file] + percent_flags

print('Local preprocess, running command: %s' % ' '.join(cmd))
_run_cmd(' '.join(cmd))

print('Local preprocessing done.')


def cloud_preprocess(input_file_path, output_dir, transforms_config_file,
train_percent=None, eval_percent=None, test_percent=None,
project_id=None, job_name=None):
"""Preprocess data in the cloud with Dataflow.

Produce output that can be used by training efficiently. Will also split
data into three sets (training, eval, and test). {train, eval, test}_percent
should be nonnegative integers that sum to 100.

Args:
input_file_path: String. File pattern what will expand into a list of csv
files. Preprocessing will automatically slip the data into three sets
for training, evaluation, and testing. Can be local or GCS path.
output_dir: The output directory to use; can be local or GCS path.
transforms_config_file: File path to the config file.
train_percent: Int in range [0, 100].
eval_percent: Int in range [0, 100].
train_percent: Int in range [0, 100].
project_id: String. The GCE project to use. Defaults to the notebook's
default project id.
job_name: String. Job name as listed on the Dataflow service. If None, a
default job name is selected.
"""
_check_transforms_config_file(transforms_config_file)

percent_flags = _percent_flags(train_percent, eval_percent, test_percent)
this_folder = os.path.dirname(os.path.abspath(__file__))
project_id = project_id or _default_project()
job_name = job_name or ('structured-data-' +
datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

cmd = ['python',
os.path.join(this_folder, 'preprocess/preprocess.py'),
'--cloud',
'--project_id=%s' % project_id,
'--job_name=%s' % job_name,
'--input_file_path=%s' % input_file_path,
'--output_dir=%s' % output_dir,
'--transforms_config_file=%s' % transforms_config_file] + percent_flags

print('Cloud preprocess, running command: %s' % ' '.join(cmd))
_run_cmd(' '.join(cmd))

print('Cloud preprocessing job submitted.')

if _is_in_IPython():
import IPython

dataflow_url = ('https://console.developers.google.com/dataflow?project=%s'
% project_id)
html = ('<p>Click <a href="%s" target="_blank">here</a> to track '
'preprocessing job %s.</p><br/>' % (dataflow_url, job_name))
IPython.display.display_html(html, raw=True)



def local_train(preprocessed_dir, transforms_config_file, output_dir,
layer_sizes=None, max_steps=None):
"""Train model locally.
Args:
preprocessed_dir: The output directory from preprocessing. Must contain
files named features_train*.tfrecord.gz, features_eval*.tfrecord.gz,
and metadata.json. Can be local or GCS path.
transforms_config_file: File path to the config file.
output_dir: Output directory of training.
layer_sizes: String. Represents the layers in the connected DNN.
If the model type is DNN, this must be set. Example "10 3 2", this will
create three DNN layers where the first layer will have 10 nodes, the
middle layer will have 3 nodes, and the laster layer will have 2 nodes.
max_steps: Int. Number of training steps to perform.
"""
_check_transforms_config_file(transforms_config_file)

#TODO(brandondutra): allow other flags to be set like batch size/learner rate
#TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed.

train_filename = os.path.join(preprocessed_dir, 'features_train*')
eval_filename = os.path.join(preprocessed_dir, 'features_eval*')
metadata_filename = os.path.join(preprocessed_dir, 'metadata.json')
this_folder = os.path.dirname(os.path.abspath(__file__))

#TODO(brandondutra): remove the cd after b/34221856
cmd = ['cd %s &&' % this_folder,
'gcloud beta ml local train',
'--module-name=trainer.task',
'--package-path=trainer',
'--',
'--train_data_paths=%s' % train_filename,
'--eval_data_paths=%s' % eval_filename,
'--metadata_path=%s' % metadata_filename,
'--output_path=%s' % output_dir,
'--transforms_config_file=%s' % transforms_config_file,
'--max_steps=%s' % str(max_steps)]
if layer_sizes:
cmd += ['--layer_sizes %s' % layer_sizes]

print('Local training, running command: %s' % ' '.join(cmd))
_run_cmd(' '.join(cmd))
print('Local training done.')


def cloud_train(preprocessed_dir, transforms_config_file, output_dir,
staging_bucket,
layer_sizes=None, max_steps=None, project_id=None,
job_name=None, scale_tier='BASIC'):
"""Train model using CloudML.
Args:
preprocessed_dir: The output directory from preprocessing. Must contain
files named features_train*.tfrecord.gz, features_eval*.tfrecord.gz,
and metadata.json.
transforms_config_file: File path to the config file.
output_dir: Output directory of training.
staging_bucket: GCS bucket.
layer_sizes: String. Represents the layers in the connected DNN.
If the model type is DNN, this must be set. Example "10 3 2", this will
create three DNN layers where the first layer will have 10 nodes, the
middle layer will have 3 nodes, and the laster layer will have 2 nodes.
max_steps: Int. Number of training steps to perform.
project_id: String. The GCE project to use. Defaults to the notebook's
default project id.
job_name: String. Job name as listed on the Dataflow service. If None, a
default job name is selected.
scale_tier: The CloudML scale tier. CUSTOM tiers are currently not supported
in this package. See https://cloud.google.com/ml/reference/rest/v1beta1/projects.jobs#ScaleTier
"""
_check_transforms_config_file(transforms_config_file)

#TODO(brandondutra): allow other flags to be set like batch size,
# learner rate, custom scale tiers, etc
#TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed.

if (not preprocessed_dir.startswith('gs://')
or not transforms_config_file.startswith('gs://')
or not output_dir.startswith('gs://')):
print('ERROR: preprocessed_dir, transforms_config_file, and output_dir '
'must all be in GCS.')
return

# Training will fail if there are files in the output folder. Check now and
# fail fast.
if ml.util._file.glob_files(os.path.join(output_dir, '*')):
print('ERROR: output_dir should be empty. Use another folder')
return

#TODO(brandondutra): remove the tf stuff once the cloudml service is past 0.11
temp_dir = tempfile.mkdtemp()
subprocess.check_call(['gsutil', 'cp', _TF_GS_URL, temp_dir])
tf_local_package = os.path.join(temp_dir, os.path.basename(_TF_GS_URL))

# Buld the training config file.
training_config_file_path = tempfile.mkstemp(dir=temp_dir)[1]
training_config = {'trainingInput': {'scaleTier': scale_tier}}
with open(training_config_file_path, 'w') as f:
f.write(yaml.dump(training_config, default_flow_style=False))

train_filename = os.path.join(preprocessed_dir, 'features_train*')
eval_filename = os.path.join(preprocessed_dir, 'features_eval*')
metadata_filename = os.path.join(preprocessed_dir, 'metadata.json')
this_folder = os.path.dirname(os.path.abspath(__file__))
project_id = project_id or _default_project()
job_name = job_name or ('structured_data_train_' +
datetime.datetime.now().strftime('%Y%m%d%H%M%S'))

# TODO(brandondutra): remove the cd after b/34221856
cmd = ['cd %s &&' % this_folder,
'gcloud beta ml jobs submit training %s' % job_name,
'--module-name=trainer.task',
'--staging-bucket=%s' % staging_bucket,
'--async',
'--package-path=%s' % 'trainer', #os.path.join(this_folder, 'trainer'),
'--packages=%s' % tf_local_package,
'--config=%s' % training_config_file_path,
'--',
'--train_data_paths=%s' % train_filename,
'--eval_data_paths=%s' % eval_filename,
'--metadata_path=%s' % metadata_filename,
'--output_path=%s' % output_dir,
'--transforms_config_file=%s' % transforms_config_file,
'--max_steps=%s' % str(max_steps)]
if layer_sizes:
cmd += ['--layer_sizes %s' % layer_sizes]

print('CloudML training, running command: %s' % ' '.join(cmd))
_run_cmd(' '.join(cmd))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A "print('Done training')" would be nice here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

print('CloudML training job submitted.')

if _is_in_IPython():
import IPython

dataflow_url = ('https://console.developers.google.com/ml/jobs?project=%s'
% project_id)
html = ('<p>Click <a href="%s" target="_blank">here</a> to track '
'the training job %s.</p><br/>' % (dataflow_url, job_name))
IPython.display.display_html(html, raw=True)

# Delete the temp files made
shutil.rmtree(temp_dir)


def local_predict():
"""Not Implemented Yet."""
print('local_predict')


def cloud_predict():
"""Not Implemented Yet."""
print('cloud_predict')


def local_batch_predict():
"""Not Implemented Yet."""
print('local_batch_predict')


def cloud_batch_predict():
"""Not Implemented Yet."""
print('cloud_batch_predict')

Loading