|
| 1 | +# Copyright 2017 Google Inc. All Rights Reserved. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | + |
| 16 | +"""Provides interface for Datalab. |
| 17 | +
|
| 18 | + Datalab will look for functions with the below names: |
| 19 | + local_preprocess |
| 20 | + local_train |
| 21 | + local_predict |
| 22 | + cloud_preprocess |
| 23 | + cloud_train |
| 24 | + cloud_predict |
| 25 | +""" |
| 26 | + |
| 27 | +from __future__ import absolute_import |
| 28 | +from __future__ import division |
| 29 | +from __future__ import print_function |
| 30 | + |
| 31 | +import datetime |
| 32 | +import logging |
| 33 | +import os |
| 34 | +import shutil |
| 35 | +import subprocess |
| 36 | +import sys |
| 37 | +import tempfile |
| 38 | +import urllib |
| 39 | + |
| 40 | +import tensorflow as tf |
| 41 | +import yaml |
| 42 | + |
| 43 | +import google.cloud.ml as ml |
| 44 | + |
| 45 | +_TF_GS_URL = 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl' |
| 46 | + |
| 47 | + |
| 48 | +def _percent_flags(train_percent=None, eval_percent=None, test_percent=None): |
| 49 | + """Convert train/eval/test percents into command line flags. |
| 50 | +
|
| 51 | + Args: |
| 52 | + train_percent: Int in range [0, 100]. |
| 53 | + eval_percent: Int in range [0, 100]. |
| 54 | + train_percent: Int in range [0, 100]. |
| 55 | +
|
| 56 | + Returns: |
| 57 | + Array of strings encoding the command line flags. |
| 58 | + """ |
| 59 | + train_percent = train_percent or 0 |
| 60 | + eval_percent = eval_percent or 0 |
| 61 | + test_percent = test_percent or 0 |
| 62 | + if train_percent == 0 and eval_percent == 0 and test_percent == 0: |
| 63 | + percent_flags = [] |
| 64 | + else: |
| 65 | + percent_flags = ['--train_percent=%s' % str(train_percent), |
| 66 | + '--eval_percent=%s' % str(eval_percent), |
| 67 | + '--test_percent=%s' % str(test_percent)] |
| 68 | + return percent_flags |
| 69 | + |
| 70 | + |
| 71 | +def _default_project(): |
| 72 | + import datalab.context |
| 73 | + context = datalab.context.Context.default() |
| 74 | + return context.project_id |
| 75 | + |
| 76 | +def _is_in_IPython(): |
| 77 | + try: |
| 78 | + import IPython |
| 79 | + return True |
| 80 | + except ImportError: |
| 81 | + return False |
| 82 | + |
| 83 | +def _check_transforms_config_file(transforms_config_file): |
| 84 | + """Check that the transforms file has expected values.""" |
| 85 | + pass |
| 86 | + |
| 87 | + |
| 88 | +def _run_cmd(cmd): |
| 89 | + output = subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) |
| 90 | + |
| 91 | + while True: |
| 92 | + line = output.stdout.readline().rstrip() |
| 93 | + print(line) |
| 94 | + if line == '' and output.poll() != None: |
| 95 | + break |
| 96 | + |
| 97 | +def local_preprocess(input_file_path, output_dir, transforms_config_file, |
| 98 | + train_percent=None, eval_percent=None, test_percent=None): |
| 99 | + """Preprocess data locally with Beam. |
| 100 | +
|
| 101 | + Produce output that can be used by training efficiently. Will also split |
| 102 | + data into three sets (training, eval, and test). {train, eval, test}_percent |
| 103 | + should be nonnegative integers that sum to 100. |
| 104 | +
|
| 105 | + Args: |
| 106 | + input_file_path: String. File pattern what will expand into a list of csv |
| 107 | + files. Preprocessing will automatically slip the data into three sets |
| 108 | + for training, evaluation, and testing. Can be local or GCS path. |
| 109 | + output_dir: The output directory to use; can be local or GCS path. |
| 110 | + transforms_config_file: File path to the config file. |
| 111 | + train_percent: Int in range [0, 100]. |
| 112 | + eval_percent: Int in range [0, 100]. |
| 113 | + train_percent: Int in range [0, 100]. |
| 114 | + """ |
| 115 | + _check_transforms_config_file(transforms_config_file) |
| 116 | + |
| 117 | + percent_flags = _percent_flags(train_percent, eval_percent, test_percent) |
| 118 | + this_folder = os.path.dirname(os.path.abspath(__file__)) |
| 119 | + |
| 120 | + cmd = ['python', |
| 121 | + os.path.join(this_folder, 'preprocess/preprocess.py'), |
| 122 | + '--input_file_path=%s' % input_file_path, |
| 123 | + '--output_dir=%s' % output_dir, |
| 124 | + '--transforms_config_file=%s' % transforms_config_file] + percent_flags |
| 125 | + |
| 126 | + print('Local preprocess, running command: %s' % ' '.join(cmd)) |
| 127 | + _run_cmd(' '.join(cmd)) |
| 128 | + |
| 129 | + print('Local preprocessing done.') |
| 130 | + |
| 131 | + |
| 132 | +def cloud_preprocess(input_file_path, output_dir, transforms_config_file, |
| 133 | + train_percent=None, eval_percent=None, test_percent=None, |
| 134 | + project_id=None, job_name=None): |
| 135 | + """Preprocess data in the cloud with Dataflow. |
| 136 | +
|
| 137 | + Produce output that can be used by training efficiently. Will also split |
| 138 | + data into three sets (training, eval, and test). {train, eval, test}_percent |
| 139 | + should be nonnegative integers that sum to 100. |
| 140 | +
|
| 141 | + Args: |
| 142 | + input_file_path: String. File pattern what will expand into a list of csv |
| 143 | + files. Preprocessing will automatically slip the data into three sets |
| 144 | + for training, evaluation, and testing. Can be local or GCS path. |
| 145 | + output_dir: The output directory to use; can be local or GCS path. |
| 146 | + transforms_config_file: File path to the config file. |
| 147 | + train_percent: Int in range [0, 100]. |
| 148 | + eval_percent: Int in range [0, 100]. |
| 149 | + train_percent: Int in range [0, 100]. |
| 150 | + project_id: String. The GCE project to use. Defaults to the notebook's |
| 151 | + default project id. |
| 152 | + job_name: String. Job name as listed on the Dataflow service. If None, a |
| 153 | + default job name is selected. |
| 154 | + """ |
| 155 | + _check_transforms_config_file(transforms_config_file) |
| 156 | + |
| 157 | + percent_flags = _percent_flags(train_percent, eval_percent, test_percent) |
| 158 | + this_folder = os.path.dirname(os.path.abspath(__file__)) |
| 159 | + project_id = project_id or _default_project() |
| 160 | + job_name = job_name or ('structured-data-' + |
| 161 | + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) |
| 162 | + |
| 163 | + cmd = ['python', |
| 164 | + os.path.join(this_folder, 'preprocess/preprocess.py'), |
| 165 | + '--cloud', |
| 166 | + '--project_id=%s' % project_id, |
| 167 | + '--job_name=%s' % job_name, |
| 168 | + '--input_file_path=%s' % input_file_path, |
| 169 | + '--output_dir=%s' % output_dir, |
| 170 | + '--transforms_config_file=%s' % transforms_config_file] + percent_flags |
| 171 | + |
| 172 | + print('Cloud preprocess, running command: %s' % ' '.join(cmd)) |
| 173 | + _run_cmd(' '.join(cmd)) |
| 174 | + |
| 175 | + print('Cloud preprocessing job submitted.') |
| 176 | + |
| 177 | + if _is_in_IPython(): |
| 178 | + import IPython |
| 179 | + |
| 180 | + dataflow_url = ('https://console.developers.google.com/dataflow?project=%s' |
| 181 | + % project_id) |
| 182 | + html = ('<p>Click <a href="%s" target="_blank">here</a> to track ' |
| 183 | + 'preprocessing job %s.</p><br/>' % (dataflow_url, job_name)) |
| 184 | + IPython.display.display_html(html, raw=True) |
| 185 | + |
| 186 | + |
| 187 | + |
| 188 | +def local_train(preprocessed_dir, transforms_config_file, output_dir, |
| 189 | + layer_sizes=None, max_steps=None): |
| 190 | + """Train model locally. |
| 191 | + Args: |
| 192 | + preprocessed_dir: The output directory from preprocessing. Must contain |
| 193 | + files named features_train*.tfrecord.gz, features_eval*.tfrecord.gz, |
| 194 | + and metadata.json. Can be local or GCS path. |
| 195 | + transforms_config_file: File path to the config file. |
| 196 | + output_dir: Output directory of training. |
| 197 | + layer_sizes: String. Represents the layers in the connected DNN. |
| 198 | + If the model type is DNN, this must be set. Example "10 3 2", this will |
| 199 | + create three DNN layers where the first layer will have 10 nodes, the |
| 200 | + middle layer will have 3 nodes, and the laster layer will have 2 nodes. |
| 201 | + max_steps: Int. Number of training steps to perform. |
| 202 | + """ |
| 203 | + _check_transforms_config_file(transforms_config_file) |
| 204 | + |
| 205 | + #TODO(brandondutra): allow other flags to be set like batch size/learner rate |
| 206 | + #TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed. |
| 207 | + |
| 208 | + train_filename = os.path.join(preprocessed_dir, 'features_train*') |
| 209 | + eval_filename = os.path.join(preprocessed_dir, 'features_eval*') |
| 210 | + metadata_filename = os.path.join(preprocessed_dir, 'metadata.json') |
| 211 | + this_folder = os.path.dirname(os.path.abspath(__file__)) |
| 212 | + |
| 213 | + #TODO(brandondutra): remove the cd after b/34221856 |
| 214 | + cmd = ['cd %s &&' % this_folder, |
| 215 | + 'gcloud beta ml local train', |
| 216 | + '--module-name=trainer.task', |
| 217 | + '--package-path=trainer', |
| 218 | + '--', |
| 219 | + '--train_data_paths=%s' % train_filename, |
| 220 | + '--eval_data_paths=%s' % eval_filename, |
| 221 | + '--metadata_path=%s' % metadata_filename, |
| 222 | + '--output_path=%s' % output_dir, |
| 223 | + '--transforms_config_file=%s' % transforms_config_file, |
| 224 | + '--max_steps=%s' % str(max_steps)] |
| 225 | + if layer_sizes: |
| 226 | + cmd += ['--layer_sizes %s' % layer_sizes] |
| 227 | + |
| 228 | + print('Local training, running command: %s' % ' '.join(cmd)) |
| 229 | + _run_cmd(' '.join(cmd)) |
| 230 | + print('Local training done.') |
| 231 | + |
| 232 | + |
| 233 | +def cloud_train(preprocessed_dir, transforms_config_file, output_dir, |
| 234 | + staging_bucket, |
| 235 | + layer_sizes=None, max_steps=None, project_id=None, |
| 236 | + job_name=None, scale_tier='BASIC'): |
| 237 | + """Train model using CloudML. |
| 238 | + Args: |
| 239 | + preprocessed_dir: The output directory from preprocessing. Must contain |
| 240 | + files named features_train*.tfrecord.gz, features_eval*.tfrecord.gz, |
| 241 | + and metadata.json. |
| 242 | + transforms_config_file: File path to the config file. |
| 243 | + output_dir: Output directory of training. |
| 244 | + staging_bucket: GCS bucket. |
| 245 | + layer_sizes: String. Represents the layers in the connected DNN. |
| 246 | + If the model type is DNN, this must be set. Example "10 3 2", this will |
| 247 | + create three DNN layers where the first layer will have 10 nodes, the |
| 248 | + middle layer will have 3 nodes, and the laster layer will have 2 nodes. |
| 249 | + max_steps: Int. Number of training steps to perform. |
| 250 | + project_id: String. The GCE project to use. Defaults to the notebook's |
| 251 | + default project id. |
| 252 | + job_name: String. Job name as listed on the Dataflow service. If None, a |
| 253 | + default job name is selected. |
| 254 | + scale_tier: The CloudML scale tier. CUSTOM tiers are currently not supported |
| 255 | + in this package. See https://cloud.google.com/ml/reference/rest/v1beta1/projects.jobs#ScaleTier |
| 256 | + """ |
| 257 | + _check_transforms_config_file(transforms_config_file) |
| 258 | + |
| 259 | + #TODO(brandondutra): allow other flags to be set like batch size, |
| 260 | + # learner rate, custom scale tiers, etc |
| 261 | + #TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed. |
| 262 | + |
| 263 | + if (not preprocessed_dir.startswith('gs://') |
| 264 | + or not transforms_config_file.startswith('gs://') |
| 265 | + or not output_dir.startswith('gs://')): |
| 266 | + print('ERROR: preprocessed_dir, transforms_config_file, and output_dir ' |
| 267 | + 'must all be in GCS.') |
| 268 | + return |
| 269 | + |
| 270 | + # Training will fail if there are files in the output folder. Check now and |
| 271 | + # fail fast. |
| 272 | + if ml.util._file.glob_files(os.path.join(output_dir, '*')): |
| 273 | + print('ERROR: output_dir should be empty. Use another folder') |
| 274 | + return |
| 275 | + |
| 276 | + #TODO(brandondutra): remove the tf stuff once the cloudml service is past 0.11 |
| 277 | + temp_dir = tempfile.mkdtemp() |
| 278 | + subprocess.check_call(['gsutil', 'cp', _TF_GS_URL, temp_dir]) |
| 279 | + tf_local_package = os.path.join(temp_dir, os.path.basename(_TF_GS_URL)) |
| 280 | + |
| 281 | + # Buld the training config file. |
| 282 | + training_config_file_path = tempfile.mkstemp(dir=temp_dir)[1] |
| 283 | + training_config = {'trainingInput': {'scaleTier': scale_tier}} |
| 284 | + with open(training_config_file_path, 'w') as f: |
| 285 | + f.write(yaml.dump(training_config, default_flow_style=False)) |
| 286 | + |
| 287 | + train_filename = os.path.join(preprocessed_dir, 'features_train*') |
| 288 | + eval_filename = os.path.join(preprocessed_dir, 'features_eval*') |
| 289 | + metadata_filename = os.path.join(preprocessed_dir, 'metadata.json') |
| 290 | + this_folder = os.path.dirname(os.path.abspath(__file__)) |
| 291 | + project_id = project_id or _default_project() |
| 292 | + job_name = job_name or ('structured_data_train_' + |
| 293 | + datetime.datetime.now().strftime('%Y%m%d%H%M%S')) |
| 294 | + |
| 295 | + # TODO(brandondutra): remove the cd after b/34221856 |
| 296 | + cmd = ['cd %s &&' % this_folder, |
| 297 | + 'gcloud beta ml jobs submit training %s' % job_name, |
| 298 | + '--module-name=trainer.task', |
| 299 | + '--staging-bucket=%s' % staging_bucket, |
| 300 | + '--async', |
| 301 | + '--package-path=%s' % 'trainer', #os.path.join(this_folder, 'trainer'), |
| 302 | + '--packages=%s' % tf_local_package, |
| 303 | + '--config=%s' % training_config_file_path, |
| 304 | + '--', |
| 305 | + '--train_data_paths=%s' % train_filename, |
| 306 | + '--eval_data_paths=%s' % eval_filename, |
| 307 | + '--metadata_path=%s' % metadata_filename, |
| 308 | + '--output_path=%s' % output_dir, |
| 309 | + '--transforms_config_file=%s' % transforms_config_file, |
| 310 | + '--max_steps=%s' % str(max_steps)] |
| 311 | + if layer_sizes: |
| 312 | + cmd += ['--layer_sizes %s' % layer_sizes] |
| 313 | + |
| 314 | + print('CloudML training, running command: %s' % ' '.join(cmd)) |
| 315 | + _run_cmd(' '.join(cmd)) |
| 316 | + |
| 317 | + print('CloudML training job submitted.') |
| 318 | + |
| 319 | + if _is_in_IPython(): |
| 320 | + import IPython |
| 321 | + |
| 322 | + dataflow_url = ('https://console.developers.google.com/ml/jobs?project=%s' |
| 323 | + % project_id) |
| 324 | + html = ('<p>Click <a href="%s" target="_blank">here</a> to track ' |
| 325 | + 'the training job %s.</p><br/>' % (dataflow_url, job_name)) |
| 326 | + IPython.display.display_html(html, raw=True) |
| 327 | + |
| 328 | + # Delete the temp files made |
| 329 | + shutil.rmtree(temp_dir) |
| 330 | + |
| 331 | + |
| 332 | +def local_predict(): |
| 333 | + """Not Implemented Yet.""" |
| 334 | + print('local_predict') |
| 335 | + |
| 336 | + |
| 337 | +def cloud_predict(): |
| 338 | + """Not Implemented Yet.""" |
| 339 | + print('cloud_predict') |
| 340 | + |
| 341 | + |
| 342 | +def local_batch_predict(): |
| 343 | + """Not Implemented Yet.""" |
| 344 | + print('local_batch_predict') |
| 345 | + |
| 346 | + |
| 347 | +def cloud_batch_predict(): |
| 348 | + """Not Implemented Yet.""" |
| 349 | + print('cloud_batch_predict') |
| 350 | + |
0 commit comments