Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Commit 47df9fe

Browse files
brandondutraqimingj
authored andcommitted
first full-feature version of structured data is done (#139)
* added the preprocessing/training files. Preprocessing is connected with datalab. Training is not fully connected with datalab. * added training interface. * local/cloud training ready for review * saving work * saving work * cloud online prediction is done. * split config file into two (schema/transforms) and updated the unittests. * local preprocess/train working * 1) merged --model_type and --problem_type 2) online/local prediction is done * added batch prediction * all prediction is done. Going to make a merge request next * Update _package.py removed some white space + add a print statement to local_predict * --preprocessing puts a copy of schema in the outut dir. --no need to pass schema to train in datalab. * tests can be run from any folder above the test folder by python -m unittest discover Also, the training test will parse the output of training and check that the loss is small.
1 parent bddf668 commit 47df9fe

File tree

8 files changed

+687
-344
lines changed

8 files changed

+687
-344
lines changed

solutionbox/structured_data/datalab_solutions/structured_data/_package.py

Lines changed: 164 additions & 49 deletions
Large diffs are not rendered by default.

solutionbox/structured_data/datalab_solutions/structured_data/preprocess/preprocess.py

Lines changed: 65 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ def parse_arguments(argv):
6262
type=int,
6363
help='Percent of input data for test dataset.')
6464
parser.add_argument('--output_dir',
65-
type=str,
65+
type=str,
6666
required=True,
6767
help=('Google Cloud Storage or Local directory in which '
6868
'to place outputs.'))
69-
parser.add_argument('--transforms_config_file',
70-
type=str,
69+
parser.add_argument('--schema_file',
70+
type=str,
7171
required=True,
72-
help=('File describing the schema and transforms of '
73-
'each column in the csv data files.'))
72+
help=('File describing the schema of each column in the '
73+
'csv data files.'))
7474
parser.add_argument('--job_name',
7575
type=str,
7676
help=('If using --cloud, the job name as listed in'
@@ -93,17 +93,47 @@ def parse_arguments(argv):
9393

9494
# args.job_name will not be used unless --cloud is used.
9595
if not args.job_name:
96-
args.job_name = ('structured-data-' +
96+
args.job_name = ('structured-data-' +
9797
datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
9898

9999
return args
100100

101101

102+
def load_and_check_config(schema_file_path):
103+
"""Checks the sschema file is well formatted."""
104+
105+
try:
106+
json_str = ml.util._file.load_file(schema_file_path)
107+
config = json.loads(json_str)
108+
except:
109+
print('ERROR reading schema file.')
110+
sys.exit(1)
111+
112+
model_columns = (config.get('numerical_columns', [])
113+
+ config.get('categorical_columns', []))
114+
if config['target_column'] not in model_columns:
115+
print('ERROR: target not listed as a numerical or categorial column.')
116+
sys.exit(1)
117+
118+
if set(config['column_names']) != set(model_columns + [config['key_column']]):
119+
print('ERROR: column_names do not match what was listed other fields')
120+
sys.exit(1)
121+
122+
if set(config['numerical_columns']) & set(config['categorical_columns']):
123+
print('ERROR: numerical_columns and categorical_columns must be disjoint.')
124+
sys.exit(1)
125+
126+
if config['key_column'] in model_columns:
127+
print('ERROR: kye_column should not be listed in numerical_columns or categorical_columns')
128+
sys.exit(1)
129+
130+
return config
131+
132+
102133
def preprocessing_features(args):
103134

104135
# Read the config file.
105-
json_str = ml.util._file.load_file(args.transforms_config_file)
106-
config = json.loads(json_str)
136+
config = load_and_check_config(args.schema_file)
107137

108138
column_names = config['column_names']
109139

@@ -114,48 +144,38 @@ def preprocessing_features(args):
114144

115145
# Extract target feature
116146
target_name = config['target_column']
117-
if config['problem_type'] == 'regression':
147+
key_name = config['key_column']
148+
if target_name in config.get('numerical_columns', []):
118149
feature_set[target_name] = features.target(target_name).continuous()
119150
else:
120151
feature_set[target_name] = features.target(target_name).discrete()
121152

122153

123154
# Extract numeric features
124-
if 'numerical' in config:
125-
for name, transform_config in config['numerical'].iteritems():
126-
transform = transform_config['transform']
127-
default = transform_config.get('default', None)
128-
if transform == 'scale':
129-
feature_set[name] = features.numeric(name, default=default).scale()
130-
elif transform == 'max_abs_scale':
131-
feature_set[name] = features.numeric(name, default=default).max_abs_scale(transform_config['value'])
132-
elif transform == 'identity':
133-
feature_set[name] = features.numeric(name, default=default).identity()
134-
else:
135-
print('Error: unkown numerical transform name %s in %s' % (transform, str(transform_config)))
136-
sys.exit(1)
155+
for name in config.get('numerical_columns', []):
156+
if name == target_name or name == key_name:
157+
continue
158+
# apply identity to all numerical features.
159+
default = config.get('defaults', {}).get(name, None)
160+
feature_set[name] = features.numeric(name, default=default).identity()
137161

138162
# Extract categorical features
139-
if 'categorical' in config:
140-
for name, transform_config in config['categorical'].iteritems():
141-
transform = transform_config['transform']
142-
default = transform_config.get('default', None)
143-
frequency_threshold = transform_config.get('frequency_threshold', 5)
144-
if transform == 'one_hot' or transform == 'embedding':
145-
feature_set[name] = features.categorical(
146-
name,
147-
default=default,
148-
frequency_threshold=frequency_threshold)
149-
else:
150-
print('Error: unkown categorical transform name %s in %s' % (transform, str(transform_config)))
151-
sys.exit(1)
163+
for name in config.get('categorical_columns', []):
164+
if name == target_name or name == key_name:
165+
continue
166+
# apply sparse transform to all categorical features.
167+
default = config.get('defaults', {}).get(name, None)
168+
feature_set[name] = features.categorical(
169+
name,
170+
default=default,
171+
frequency_threshold=1).sparse(use_counts=True)
152172

153173
return feature_set, column_names
154174

155175

156-
157176
def preprocess(pipeline, feature_set, column_names, input_file_path,
158-
train_percent, eval_percent, test_percent, output_dir):
177+
schema_file, train_percent, eval_percent, test_percent,
178+
output_dir):
159179
"""Builds the preprocessing Dataflow pipeline.
160180
161181
The input files are split into a training, eval and test sets, and the SDK
@@ -206,11 +226,14 @@ def _partition_fn(row_unused, num_partitions_unused): # pylint: disable=unused-
206226
>> io.SaveFeatures(
207227
os.path.join(output_dir, 'features_test')))
208228
# pylint: enable=expression-not-assigned
229+
# Put a copy of the schema file in the output folder. Datalab will look for
230+
# it there.
231+
ml.util._file.copy_file(schema_file, os.path.join(output_dir, 'schema.json'))
209232

210233

211-
def run_dataflow(feature_set, column_names, input_file_path, train_percent,
212-
eval_percent, test_percent, output_dir, cloud, project_id,
213-
job_name):
234+
def run_dataflow(feature_set, column_names, input_file_path, schema_file,
235+
train_percent, eval_percent, test_percent, output_dir, cloud,
236+
project_id, job_name):
214237
"""Run Preprocessing as a Dataflow pipeline."""
215238

216239
# Configure the pipeline.
@@ -233,6 +256,7 @@ def run_dataflow(feature_set, column_names, input_file_path, train_percent,
233256
feature_set=feature_set,
234257
column_names=column_names,
235258
input_file_path=input_file_path,
259+
schema_file=schema_file,
236260
train_percent=train_percent,
237261
eval_percent=eval_percent,
238262
test_percent=test_percent,
@@ -251,6 +275,7 @@ def main(argv=None):
251275
feature_set=feature_set,
252276
column_names=column_names,
253277
input_file_path=args.input_file_path,
278+
schema_file=args.schema_file,
254279
train_percent=args.train_percent,
255280
eval_percent=args.eval_percent,
256281
test_percent=args.test_percent,

solutionbox/structured_data/datalab_solutions/structured_data/setup.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@
2424
version=VERSION,
2525
packages=['trainer', 'preprocess'],
2626
author='Google',
27-
author_email='cloudml-feedback@google.com',)
27+
author_email='cloudml-feedback@google.com',
28+
test_suite='nose.collector',
29+
tests_require=['nose'])

solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
# limitations under the License.
1414
# ==============================================================================
1515

16+
17+
import os
1618
import random
1719
import subprocess
18-
import os
20+
1921

2022
def make_csv_data(filename, num_rows, problem_type):
2123
random.seed(12321)
@@ -29,9 +31,9 @@ def make_csv_data(filename, num_rows, problem_type):
2931
str2 = random.choice(['abc', 'def', 'ghi', 'jkl', 'mno', 'pqr'])
3032
str3 = random.choice(['car', 'truck', 'van', 'bike', 'train', 'drone'])
3133

32-
map1 = {'red':2, 'blue':6, 'green':4, 'pink':-5, 'yellow':-6, 'brown':-1, 'black':7}
33-
map2 = {'abc':10, 'def':1, 'ghi':1, 'jkl':1, 'mno':1, 'pqr':1}
34-
map3 = {'car':5, 'truck':10, 'van':15, 'bike':20, 'train':25, 'drone': 30}
34+
map1 = {'red': 2, 'blue': 6, 'green': 4, 'pink': -5, 'yellow': -6, 'brown': -1, 'black': 7}
35+
map2 = {'abc': 10, 'def': 1, 'ghi': 1, 'jkl': 1, 'mno': 1, 'pqr': 1}
36+
map3 = {'car': 5, 'truck': 10, 'van': 15, 'bike': 20, 'train': 25, 'drone': 30}
3537

3638
# Build some model.
3739
t = 0.5 + 0.5*num1 -2.5*num2 + num3
@@ -56,67 +58,83 @@ def make_csv_data(filename, num_rows, problem_type):
5658
str3=str3)
5759
f1.write(csv_line)
5860

59-
config = {'column_names': ['key', 'target', 'num1', 'num2', 'num3',
61+
schema = {'column_names': ['key', 'target', 'num1', 'num2', 'num3',
6062
'str1', 'str2', 'str3'],
6163
'key_column': 'key',
6264
'target_column': 'target',
63-
'problem_type': problem_type,
64-
'model_type': '',
65-
'numerical': {'num1': {'transform': 'identity'},
66-
'num2': {'transform': 'identity'},
67-
'num3': {'transform': 'identity'}},
68-
'categorical': {'str1': {'transform': 'one_hot'},
69-
'str2': {'transform': 'one_hot'},
70-
'str3': {'transform': 'one_hot'}}
65+
'numerical_columns': ['num1', 'num2', 'num3'],
66+
'categorical_columns': ['str1', 'str2', 'str3']
7167
}
72-
return config
73-
68+
if problem_type == 'classification':
69+
schema['categorical_columns'] += ['target']
70+
else:
71+
schema['numerical_columns'] += ['target']
7472

73+
# use defaults for num3 and str3
74+
transforms = {'num1': {'transform': 'identity'},
75+
'num2': {'transform': 'identity'},
76+
# 'num3': {'transform': 'identity'},
77+
'str1': {'transform': 'one_hot'},
78+
'str2': {'transform': 'one_hot'},
79+
# 'str3': {'transform': 'one_hot'}
80+
}
81+
return schema, transforms
7582

7683

77-
def run_preprocess(output_dir, csv_filename, config_filename,
84+
def run_preprocess(output_dir, csv_filename, schema_filename,
7885
train_percent='80', eval_percent='10', test_percent='10'):
79-
cmd = ['python', './preprocess/preprocess.py',
86+
preprocess_script = os.path.abspath(
87+
os.path.join(os.path.dirname(__file__), '../preprocess/preprocess.py'))
88+
cmd = ['python', preprocess_script,
8089
'--output_dir', output_dir,
81-
'--input_file_path', csv_filename,
82-
'--transforms_config_file', config_filename,
90+
'--input_file_path', csv_filename,
91+
'--schema_file', schema_filename,
8392
'--train_percent', train_percent,
8493
'--eval_percent', eval_percent,
8594
'--test_percent', test_percent,
8695
]
87-
print('Current working directoyr: %s' % os.getcwd())
8896
print('Going to run command: %s' % ' '.join(cmd))
8997
subprocess.check_call(cmd, stderr=open(os.devnull, 'wb'))
9098

91-
def run_training(output_dir, input_dir, config_filename, extra_args=[]):
92-
"""Runs Training via gcloud alpha ml local train.
99+
100+
def run_training(output_dir, input_dir, schema_filename, transforms_filename,
101+
max_steps, extra_args=[]):
102+
"""Runs Training via gcloud beta ml local train.
93103
94104
Args:
95105
output_dir: the trainer's output folder
96-
input_folder: should contain features_train*, features_eval*, and
106+
input_dir: should contain features_train*, features_eval*, and
97107
mmetadata.json.
98-
config_filename: path to the config file
108+
schema_filename: path to the schema file
109+
transforms_filename: path to the transforms file.
110+
max_steps: int. max training steps.
99111
extra_args: array of strings, passed to the trainer.
112+
113+
Returns:
114+
The stderr of training as one string. TF writes to stderr, so basically, the
115+
output of training.
100116
"""
101117
train_filename = os.path.join(input_dir, 'features_train*')
102118
eval_filename = os.path.join(input_dir, 'features_eval*')
103119
metadata_filename = os.path.join(input_dir, 'metadata.json')
104-
cmd = ['gcloud alpha ml local train',
120+
121+
# Gcloud has the fun bug that you have to be in the parent folder of task.py
122+
# when you call it. So cd there first.
123+
task_parent_folder = os.path.abspath(
124+
os.path.join(os.path.dirname(__file__), '..'))
125+
cmd = ['cd %s &&' % task_parent_folder,
126+
'gcloud beta ml local train',
105127
'--module-name=trainer.task',
106128
'--package-path=trainer',
107129
'--',
108130
'--train_data_paths=%s' % train_filename,
109131
'--eval_data_paths=%s' % eval_filename,
110132
'--metadata_path=%s' % metadata_filename,
111133
'--output_path=%s' % output_dir,
112-
'--transforms_config_file=%s' % config_filename,
113-
'--max_steps=2500'] + extra_args
114-
print('Current working directoyr: %s' % os.getcwd())
134+
'--schema_file=%s' % schema_filename,
135+
'--transforms_file=%s' % transforms_filename,
136+
'--max_steps=%s' % max_steps] + extra_args
115137
print('Going to run command: %s' % ' '.join(cmd))
116-
sp = subprocess.Popen(' '.join(cmd), shell=True, stderr=subprocess.PIPE) #open(os.devnull, 'wb'))
138+
sp = subprocess.Popen(' '.join(cmd), shell=True, stderr=subprocess.PIPE)
117139
_, err = sp.communicate()
118-
err = err.splitlines()
119-
print 'last line'
120-
print err[len(err)-1]
121-
122-
stderr=subprocess.PIPE
140+
return err

0 commit comments

Comments
 (0)