39
39
import json
40
40
import glob
41
41
import StringIO
42
+ import subprocess
42
43
43
44
import pandas as pd
44
45
import tensorflow as tf
45
- import yaml
46
+
47
+ from tensorflow .python .lib .io import file_io
46
48
47
49
from . import preprocess
48
50
from . import trainer
49
51
from . import predict
50
52
51
- _TF_GS_URL = 'gs://cloud-datalab/deploy/tf/tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl'
52
-
53
- # TODO(brandondutra): move this url someplace else.
54
- _SD_GS_URL = 'gs://cloud-ml-dev_bdt/structured_data-0.1.tar.gz'
53
+ #_SETUP_PY = '/datalab/packages_setup/structured_data/setup.py'
54
+ #_TF_VERSION = 'tensorflow-0.12.0rc0-cp27-none-linux_x86_64.whl'
55
+ #_TF_WHL = '/datalab/packages_setup/structured_data'
55
56
56
57
57
58
def _default_project ():
@@ -80,24 +81,36 @@ def _assert_gcs_files(files):
80
81
raise ValueError ('File %s is not a gcs path' % f )
81
82
82
83
83
- def _run_cmd (cmd ):
84
- output = subprocess .Popen (cmd , shell = True , stderr = subprocess .STDOUT , stdout = subprocess .PIPE )
84
+ def _package_to_staging (staging_package_url ):
85
+ """Repackage this package from local installed location and copy it to GCS.
86
+
87
+ Args:
88
+ staging_package_url: GCS path.
89
+ """
90
+ import datalab .mlalpha as mlalpha
85
91
86
- while True :
87
- line = output .stdout .readline ().rstrip ()
88
- print (line )
89
- if line == '' and output .poll () != None :
90
- break
92
+ # Find the package root. __file__ is under [package_root]/datalab_solutions/inception.
93
+ package_root = os .path .abspath (
94
+ os .path .join (os .path .dirname (__file__ ), '../../' ))
95
+ setup_path = os .path .abspath (
96
+ os .path .join (os .path .dirname (__file__ ), 'setup.py' ))
97
+ tar_gz_path = os .path .join (staging_package_url , 'staging' , 'sd.tar.gz' )
91
98
99
+ print ('Building package in %s and uploading to %s' %
100
+ (package_root , tar_gz_path ))
101
+ mlalpha .package_and_copy (package_root , setup_path , tar_gz_path )
92
102
93
- def local_preprocess (output_dir , input_feature_file , input_file_pattern , schema_file ):
103
+
104
+ return tar_gz_path
105
+
106
+
107
+ def local_preprocess (output_dir , input_file_pattern , schema_file ):
94
108
"""Preprocess data locally with Pandas
95
109
96
110
Produce analysis used by training.
97
111
98
112
Args:
99
113
output_dir: The output directory to use.
100
- input_feature_file: Describes defaults and column types.
101
114
input_file_pattern: String. File pattern what will expand into a list of csv
102
115
files.
103
116
schema_file: File path to the schema file.
@@ -106,22 +119,20 @@ def local_preprocess(output_dir, input_feature_file, input_file_pattern, schema_
106
119
args = ['local_preprocess' ,
107
120
'--input_file_pattern=%s' % input_file_pattern ,
108
121
'--output_dir=%s' % output_dir ,
109
- '--schema_file=%s' % schema_file ,
110
- '--input_feature_file=%s' % input_feature_file ]
122
+ '--schema_file=%s' % schema_file ]
111
123
112
124
print ('Starting local preprocessing.' )
113
125
preprocess .local_preprocess .main (args )
114
126
print ('Local preprocessing done.' )
115
127
116
- def cloud_preprocess (output_dir , input_feature_file , input_file_pattern = None , schema_file = None , bigquery_table = None , project_id = None ):
128
+ def cloud_preprocess (output_dir , input_file_pattern = None , schema_file = None , bigquery_table = None , project_id = None ):
117
129
"""Preprocess data in the cloud with BigQuery.
118
130
119
131
Produce analysis used by training. This can take a while, even for small
120
132
datasets. For small datasets, it may be faster to use local_preprocess.
121
133
122
134
Args:
123
135
output_dir: The output directory to use.
124
- input_feature_file: Describes defaults and column types.
125
136
input_file_path: String. File pattern what will expand into a list of csv
126
137
files.
127
138
schema_file: File path to the schema file.
@@ -131,8 +142,7 @@ def cloud_preprocess(output_dir, input_feature_file, input_file_pattern=None, sc
131
142
_assert_gcs_files ([output_dir , input_file_pattern ])
132
143
133
144
args = ['cloud_preprocess' ,
134
- '--output_dir=%s' % output_dir ,
135
- '--input_feature_file=%s' % input_feature_file ]
145
+ '--output_dir=%s' % output_dir ]
136
146
137
147
if input_file_pattern :
138
148
args .append ('--input_file_pattern=%s' % input_file_pattern )
@@ -155,9 +165,10 @@ def local_train(train_file_pattern,
155
165
eval_file_pattern ,
156
166
preprocess_output_dir ,
157
167
output_dir ,
158
- transforms_file ,
159
168
model_type ,
160
169
max_steps ,
170
+ transforms_file = None ,
171
+ key_column = None ,
161
172
top_n = None ,
162
173
layer_sizes = None ):
163
174
"""Train model locally.
@@ -166,9 +177,55 @@ def local_train(train_file_pattern,
166
177
eval_file_pattern: eval csv file
167
178
preprocess_output_dir: The output directory from preprocessing
168
179
output_dir: Output directory of training.
169
- transforms_file: File path to the transforms file.
170
- model_type: model type
171
- max_steps: Int. Number of training steps to perform.
180
+ model_type: One of linear_classification, linear_regression,
181
+ dnn_classification, dnn_regression.
182
+ max_steps: Int. Number of training steps to perform.
183
+ transforms_file: File path to the transforms file. Example
184
+ {
185
+ "col_A": {"transform": "scale", "default": 0.0},
186
+ "col_B": {"transform": "scale","value": 4},
187
+ # Note col_C is missing, so default transform used.
188
+ "col_D": {"transform": "hash_one_hot", "hash_bucket_size": 4},
189
+ "col_target": {"transform": "target"},
190
+ "col_key": {"transform": "key"}
191
+ }
192
+ The keys correspond to the columns in the input files as defined by the
193
+ schema file during preprocessing. Some notes
194
+ 1) The "key" transform is required, but the "target" transform is
195
+ optional, as the target column must be the first column in the input
196
+ data, and all other transfroms are optional.
197
+ 2) Default values are optional. These are used if the input data has
198
+ missing values during training and prediction. If not supplied for a
199
+ column, the default value for a numerical column is that column's
200
+ mean vlaue, and for a categorical column the empty string is used.
201
+ 3) For numerical colums, the following transforms are supported:
202
+ i) {"transform": "identity"}: does nothing to the number. (default)
203
+ ii) {"transform": "scale"}: scales the colum values to -1, 1.
204
+ iii) {"transform": "scale", "value": a}: scales the colum values
205
+ to -a, a.
206
+
207
+ For categorical colums, the transform supported depends on if the
208
+ model is a linear or DNN model because tf.layers is uesed.
209
+ For a linear model, the transforms supported are:
210
+ i) {"transform": "sparse"}: Makes a sparse vector using the full
211
+ vocabulary associated with the column (default).
212
+ ii) {"transform": "hash_sparse", "hash_bucket_size": n}: First each
213
+ string is hashed to an integer in the range [0, n), and then a
214
+ sparse vector is used.
215
+
216
+ For a DNN model, the categorical transforms that are supported are:
217
+ i) {"transform": "one_hot"}: A one-hot vector using the full
218
+ vocabulary is used. (default)
219
+ ii) {"transform": "embedding", "embedding_dim": d}: Each label is
220
+ embedded into an d-dimensional space.
221
+ iii) {"transform": "hash_one_hot", "hash_bucket_size": n}: The label
222
+ is first hashed into the range [0, n) and then a one-hot encoding
223
+ is made.
224
+ iv) {"transform": "hash_embedding", "hash_bucket_size": n,
225
+ "embedding_dim": d}: First each label is hashed to [0, n), and
226
+ then each integer is embedded into a d-dimensional space.
227
+ key_column: key column name. If None, this information is read from the
228
+ transforms_file.
172
229
top_n: Int. For classification problems, the output graph will contain the
173
230
labels and scores for the top n classes with a default of n=1. Use
174
231
None for regression problems.
@@ -179,7 +236,19 @@ def local_train(train_file_pattern,
179
236
nodes.
180
237
"""
181
238
#TODO(brandondutra): allow other flags to be set like batch size/learner rate
182
- #TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed.
239
+
240
+ if key_column and not transforms_file :
241
+ # Make a transforms file.
242
+ transforms_file = os .math .join (output_dir , 'transforms_file.json' )
243
+ file_io .write_string_to_file (
244
+ transforms_file ,
245
+ json .dumps ({key_column : {"transform" : "key" }}, indent = 2 ))
246
+ elif not key_column and transforms_file :
247
+ pass
248
+ else :
249
+ raise ValueError ('Exactly one of key_column or transforms_file should be '
250
+ 'not None' )
251
+
183
252
184
253
args = ['local_train' ,
185
254
'--train_data_paths=%s' % train_file_pattern ,
@@ -189,8 +258,8 @@ def local_train(train_file_pattern,
189
258
'--transforms_file=%s' % transforms_file ,
190
259
'--model_type=%s' % model_type ,
191
260
'--max_steps=%s' % str (max_steps )]
192
- if layer_sizes :
193
- args .extend ([ '--layer_sizes' ] + [ str (x ) for x in layer_sizes ] )
261
+ for i in range ( len ( layer_sizes )) :
262
+ args .append ( '--layer_size%s=%s' % ( i + 1 , str (layer_sizes [ i ])) )
194
263
if top_n :
195
264
args .append ('--top_n=%s' % str (top_n ))
196
265
@@ -202,12 +271,12 @@ def cloud_train(train_file_pattern,
202
271
eval_file_pattern ,
203
272
preprocess_output_dir ,
204
273
output_dir ,
205
- transforms_file ,
206
274
model_type ,
207
275
max_steps ,
276
+ transforms_file = None ,
277
+ key_column = None ,
208
278
top_n = None ,
209
279
layer_sizes = None ,
210
- staging_bucket = None ,
211
280
project_id = None ,
212
281
job_name = None ,
213
282
scale_tier = 'STANDARD_1' ,
@@ -219,9 +288,13 @@ def cloud_train(train_file_pattern,
219
288
eval_file_pattern: eval csv file
220
289
preprocess_output_dir: The output directory from preprocessing
221
290
output_dir: Output directory of training.
222
- transforms_file: File path to the transforms file.
223
- model_type: model type
291
+ model_type: One of linear_classification, linear_regression,
292
+ dnn_classification, dnn_regression.
224
293
max_steps: Int. Number of training steps to perform.
294
+ transforms_file: File path to the transforms file. See local_train for
295
+ a long description of this file. Must include the key transform.
296
+ key_column: key column name. If None, this information is read from the
297
+ transforms_file.
225
298
top_n: Int. For classification problems, the output graph will contain the
226
299
labels and scores for the top n classes with a default of n=1.
227
300
Use None for regression problems.
@@ -230,8 +303,6 @@ def cloud_train(train_file_pattern,
230
303
will create three DNN layers where the first layer will have 10 nodes,
231
304
the middle layer will have 3 nodes, and the laster layer will have 2
232
305
nodes.
233
-
234
- staging_bucket: GCS bucket.
235
306
project_id: String. The GCE project to use. Defaults to the notebook's
236
307
default project id.
237
308
job_name: String. Job name as listed on the Dataflow service. If None, a
@@ -240,11 +311,22 @@ def cloud_train(train_file_pattern,
240
311
in this package. See https://cloud.google.com/ml/reference/rest/v1beta1/projects.jobs#ScaleTier
241
312
"""
242
313
#TODO(brandondutra): allow other flags to be set like batch size,
243
- # learner rate, custom scale tiers, etc
244
- #TODO(brandondutra): doc someplace that TF>=0.12 and cloudml >-1.7 are needed.
314
+ # learner rate, etc
315
+
316
+ if key_column and not transforms_file :
317
+ # Make a transforms file.
318
+ transforms_file = os .math .join (output_dir , 'transforms_file.json' )
319
+ file_io .write_string_to_file (
320
+ transforms_file ,
321
+ json .dumps ({key_column : {"transform" : "key" }}, indent = 2 ))
322
+ elif not key_column and transforms_file :
323
+ pass
324
+ else :
325
+ raise ValueError ('Exactly one of key_column or transforms_file should be '
326
+ 'not None' )
245
327
246
328
_assert_gcs_files ([train_file_pattern , eval_file_pattern ,
247
- preprocess_output_dir , transforms_file ])
329
+ preprocess_output_dir , transforms_file , output_dir ])
248
330
249
331
# TODO: Convert args to a dictionary so we can use datalab's cloudml trainer.
250
332
args = ['--train_data_paths=%s' % train_file_pattern ,
@@ -254,23 +336,21 @@ def cloud_train(train_file_pattern,
254
336
'--transforms_file=%s' % transforms_file ,
255
337
'--model_type=%s' % model_type ,
256
338
'--max_steps=%s' % str (max_steps )]
257
- if layer_sizes :
258
- args .extend ([ '--layer_sizes' ] + [ str (x ) for x in layer_sizes ] )
339
+ for i in range ( len ( layer_sizes )) :
340
+ args .append ( '--layer_size%s=%s' % ( i + 1 , str (layer_sizes [ i ])) )
259
341
if top_n :
260
342
args .append ('--top_n=%s' % str (top_n ))
261
343
262
- # TODO(brandondutra): move these package uris locally, ask for a staging
263
- # and copy them there. This package should work without cloudml having to
264
- # maintain gs files!!!
265
344
job_request = {
266
- 'package_uris' : [_TF_GS_URL , _SD_GS_URL ],
345
+ 'package_uris' : [_package_to_staging ( output_dir ) ],
267
346
'python_module' : 'datalab_solutions.structured_data.trainer.task' ,
268
347
'scale_tier' : scale_tier ,
269
348
'region' : region ,
270
349
'args' : args
271
350
}
272
351
# Local import because cloudml service does not have datalab
273
- import datalab .mlaplha
352
+ import datalab
353
+ cloud_runner = datalab .mlalpha .CloudRunner (job_request )
274
354
if not job_name :
275
355
job_name = 'structured_data_train_' + datetime .datetime .now ().strftime ('%y%m%d_%H%M%S' )
276
356
job = datalab .mlalpha .Job .submit_training (job_request , job_name )
@@ -331,7 +411,8 @@ def local_predict(model_dir, data):
331
411
print ('Local prediction done.' )
332
412
333
413
# Read the header file.
334
- with open (os .path .join (tmp_dir , 'csv_header.txt' ), 'r' ) as f :
414
+ header_file = os .path .join (tmp_dir , 'csv_header.txt' )
415
+ with open (header_file , 'r' ) as f :
335
416
header = f .readline ()
336
417
337
418
# Print any errors to the screen.
@@ -467,7 +548,9 @@ def cloud_batch_predict(model_dir, prediction_input_file, output_dir,
467
548
'--trained_model_dir=%s' % model_dir ,
468
549
'--output_dir=%s' % output_dir ,
469
550
'--output_format=%s' % output_format ,
470
- '--batch_size=%s' % str (batch_size )]
551
+ '--batch_size=%s' % str (batch_size ),
552
+ '--extra_package=%s' % _package_to_staging (output_dir )]
553
+ print (cmd )
471
554
472
555
if shard_files :
473
556
cmd .append ('--shard_files' )
0 commit comments