Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Commit cf827ba

Browse files
authored
Cloudmlsm (#229)
* csv prediction graph done * csv works, but not json!!! * sw, train working * cloud training working * finished census sample, cleaned up interface * review comments
1 parent 8e67dbf commit cf827ba

File tree

9 files changed

+652
-574
lines changed

9 files changed

+652
-574
lines changed

solutionbox/structured_data/datalab_solutions/structured_data/_package.py

Lines changed: 250 additions & 225 deletions
Large diffs are not rendered by default.

solutionbox/structured_data/datalab_solutions/structured_data/master_setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ def get_version():
6363
long_description="""
6464
""",
6565
install_requires=[
66+
'tensorflow==1.0',
67+
'protobuf==3.1.0',
68+
'google-cloud-dataflow==0.5.5'
6669
],
6770
package_data={
6871
},

solutionbox/structured_data/datalab_solutions/structured_data/predict/predict.py

Lines changed: 51 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def parse_arguments(argv):
7676
action='store_false',
7777
help='Don\'t shard files')
7878
parser.set_defaults(shard_files=True)
79+
7980
parser.add_argument('--output_format',
8081
choices=['csv', 'json'],
8182
default='csv',
@@ -104,55 +105,6 @@ def parse_arguments(argv):
104105
return args
105106

106107

107-
class FixMissingTarget(beam.DoFn):
108-
"""A DoFn to fix missing target columns."""
109-
110-
def __init__(self, trained_model_dir):
111-
"""Reads the schema file and extracted the expected number of columns.
112-
113-
Args:
114-
trained_model_dir: path to model.
115-
116-
Raises:
117-
ValueError: if schema.json not found in trained_model_dir
118-
"""
119-
from tensorflow.python.lib.io import file_io
120-
import json
121-
import os
122-
123-
schema_path = os.path.join(trained_model_dir, 'schema.json')
124-
if not file_io.file_exists(schema_path):
125-
raise ValueError('schema.json missing from %s' % schema_path)
126-
schema = json.loads(file_io.read_file_to_string(schema_path))
127-
self._num_expected_columns = len(schema)
128-
129-
def process(self, element):
130-
"""Fixes csv line if target is missing.
131-
132-
The first column is assumed to be the target column, and the TF graph
133-
expects to always parse the target column, even in prediction. Below,
134-
we check how many csv columns there are, and if the target is missing, we
135-
prepend a ',' to denote the missing column.
136-
137-
Example:
138-
'target,key,value1,...' -> 'target,key,value1,...' (no change)
139-
'key,value1,...' -> ',key,value1,...' (add missing target column)
140-
141-
The value of the missing target column comes from the default value given
142-
to tf.decode_csv in the graph.
143-
"""
144-
import apache_beam as beam
145-
146-
num_columns = len(element.split(','))
147-
if num_columns == self._num_expected_columns:
148-
yield element
149-
elif num_columns + 1 == self._num_expected_columns:
150-
yield ',' + element
151-
else:
152-
yield beam.pvalue.SideOutputValue('errors',
153-
('bad columns', element))
154-
155-
156108
class EmitAsBatchDoFn(beam.DoFn):
157109
"""A DoFn that buffers the records and emits them batch by batch."""
158110

@@ -185,22 +137,22 @@ def __init__(self, trained_model_dir):
185137
self._session = None
186138

187139
def start_bundle(self, element=None):
188-
from tensorflow.contrib.session_bundle import session_bundle
140+
from tensorflow.python.saved_model import tag_constants
141+
from tensorflow.contrib.session_bundle import bundle_shim
189142
import json
190143

191-
self._session, _ = session_bundle.load_session_bundle_from_path(
192-
self._trained_model_dir)
193-
194-
# input_alias_map {'input_csv_string': tensor_name}
195-
self._input_alias_map = json.loads(
196-
self._session.graph.get_collection('inputs')[0])
197-
198-
# output_alias_map {'target_from_input': tensor_name, 'key': ...}
199-
self._output_alias_map = json.loads(
200-
self._session.graph.get_collection('outputs')[0])
144+
self._session, meta_graph = bundle_shim.load_session_bundle_or_saved_model_bundle_from_path(self._trained_model_dir, tags=[tag_constants.SERVING])
145+
signature = meta_graph.signature_def['serving_default']
201146

147+
# get the mappings between aliases and tensor names
148+
# for both inputs and outputs
149+
self._input_alias_map = {friendly_name: tensor_info_proto.name
150+
for (friendly_name, tensor_info_proto) in signature.inputs.items() }
151+
self._output_alias_map = {friendly_name: tensor_info_proto.name
152+
for (friendly_name, tensor_info_proto) in signature.outputs.items() }
202153
self._aliases, self._tensor_names = zip(*self._output_alias_map.items())
203154

155+
204156
def finish_bundle(self, element=None):
205157
self._session.close()
206158

@@ -220,6 +172,11 @@ def process(self, element):
220172

221173
feed_dict = collections.defaultdict(list)
222174
for line in element:
175+
176+
# Remove trailing newline.
177+
if line.endswith('\n'):
178+
line = line[:-1]
179+
223180
feed_dict[self._input_alias_map.values()[0]].append(line)
224181
num_in_batch += 1
225182

@@ -311,26 +268,41 @@ def __init__(self, args):
311268
self._output_format = args.output_format
312269
self._output_dir = args.output_dir
313270

314-
# See if the target vocab should be loaded.
271+
# Get the BQ schema if csv.
315272
if self._output_format == 'csv':
316-
from tensorflow.contrib.session_bundle import session_bundle
317-
import json
318-
319-
self._session, _ = session_bundle.load_session_bundle_from_path(
320-
args.trained_model_dir)
321-
322-
# output_alias_map {'target_from_input': tensor_name, 'key': ...}
323-
output_alias_map = json.loads(
324-
self._session.graph.get_collection('outputs')[0])
325-
326-
self._header = sorted(output_alias_map.keys())
327-
self._session.close()
328-
273+
from tensorflow.python.saved_model import tag_constants
274+
from tensorflow.contrib.session_bundle import bundle_shim
275+
from tensorflow.core.framework import types_pb2
276+
277+
session, meta_graph = bundle_shim.load_session_bundle_or_saved_model_bundle_from_path(args.trained_model_dir, tags=[tag_constants.SERVING])
278+
signature = meta_graph.signature_def['serving_default']
279+
280+
self._schema = []
281+
for friendly_name in sorted(signature.outputs):
282+
tensor_info_proto = signature.outputs[friendly_name]
283+
284+
# TODO(brandondutra): Could dtype be DT_INVALID?
285+
# Consider getting the dtype from the graph via
286+
# session.graph.get_tensor_by_name(tensor_info_proto.name).dtype)
287+
dtype = tensor_info_proto.dtype
288+
if dtype == types_pb2.DT_FLOAT or dtype == types_pb2.DT_DOUBLE:
289+
bq_type == 'FLOAT'
290+
elif dtype == types_pb2.DT_INT32 or dtype == types_pb2.DT_INT64:
291+
bq_type == 'INTEGER'
292+
else:
293+
bq_type = 'STRING'
294+
295+
self._schema.append({'mode': 'NULLABLE',
296+
'name': friendly_name,
297+
'type': bq_type})
298+
session.close()
329299

330300
def apply(self, datasets):
331301
return self.expand(datasets)
332302

333303
def expand(self, datasets):
304+
import json
305+
334306
tf_graph_predictions, errors = datasets
335307

336308
if self._output_format == 'json':
@@ -344,15 +316,16 @@ def expand(self, datasets):
344316
shard_name_template=self._shard_name_template))
345317
elif self._output_format == 'csv':
346318
# make a csv header file
347-
csv_coder = CSVCoder(self._header)
319+
header = [col['name'] for col in self._schema]
320+
csv_coder = CSVCoder(header)
348321
_ = (
349322
tf_graph_predictions.pipeline
350323
| 'Make CSV Header'
351-
>> beam.Create([csv_coder.make_header_string()])
352-
| 'Write CSV Header File'
324+
>> beam.Create([json.dumps(self._schema, indent=2)])
325+
| 'Write CSV Schema File'
353326
>> beam.io.textio.WriteToText(
354327
os.path.join(self._output_dir, 'csv_header'),
355-
file_name_suffix='.txt',
328+
file_name_suffix='.json',
356329
shard_name_template=''))
357330

358331
# Write the csv predictions
@@ -387,15 +360,11 @@ def make_prediction_pipeline(pipeline, args):
387360
pipeline: the pipeline
388361
args: command line args
389362
"""
390-
391-
392363
predicted_values, errors = (
393364
pipeline
394365
| 'Read CSV Files'
395366
>> beam.io.ReadFromText(args.predict_data,
396367
strip_trailing_newlines=True)
397-
| 'Is Target Missing'
398-
>> beam.ParDo(FixMissingTarget(args.trained_model_dir))
399368
| 'Batch Input'
400369
>> beam.ParDo(EmitAsBatchDoFn(args.batch_size))
401370
| 'Run TF Graph on Batches'

solutionbox/structured_data/datalab_solutions/structured_data/preprocess/local_preprocess.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ def _init_numerical_results():
106106
categorical_results[col_name].update([parsed_line[col_name]])
107107
else:
108108
# numerical column.
109+
110+
# if empty, skip
111+
if not parsed_line[col_name].strip():
112+
continue;
113+
109114
numerical_results[col_name]['min'] = (
110115
min(numerical_results[col_name]['min'],
111116
float(parsed_line[col_name])))

solutionbox/structured_data/datalab_solutions/structured_data/test/e2e_functions.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def make_csv_data(filename, num_rows, problem_type, keep_target=True):
5858
t = 102
5959

6060
if keep_target:
61-
csv_line = "{target},{id},{num1},{num2},{num3},{str1},{str2},{str3}\n".format(
61+
csv_line = "{id},{target},{num1},{num2},{num3},{str1},{str2},{str3}\n".format(
6262
id=i,
6363
target=t,
6464
num1=num1,
@@ -89,16 +89,16 @@ def make_preprocess_schema(filename, problem_type):
8989
problem_type: regression or classification
9090
"""
9191
schema = [
92-
{
93-
"mode": "REQUIRED",
94-
"name": "target",
95-
"type": ("STRING" if problem_type == 'classification' else "FLOAT")
96-
},
9792
{
9893
"mode": "NULLABLE",
9994
"name": "key",
10095
"type": "STRING"
10196
},
97+
{
98+
"mode": "REQUIRED",
99+
"name": "target",
100+
"type": ("STRING" if problem_type == 'classification' else "FLOAT")
101+
},
102102
{
103103
"mode": "NULLABLE",
104104
"name": "num1",
@@ -199,7 +199,10 @@ def run_training(
199199
make_csv_data('raw_train_regression.csv', 5000, 'regression', True)
200200
make_csv_data('raw_eval_regression.csv', 1000, 'regression', True)
201201
make_csv_data('raw_predict_regression.csv', 100, 'regression', False)
202+
make_preprocess_schema('schema_regression.json', 'regression')
202203

203204
make_csv_data('raw_train_classification.csv', 5000, 'classification', True)
204205
make_csv_data('raw_eval_classification.csv', 1000, 'classification', True)
205206
make_csv_data('raw_predict_classification.csv', 100, 'classification', False)
207+
make_preprocess_schema('schema_classification.json', 'classification')
208+

solutionbox/structured_data/datalab_solutions/structured_data/test/test_trainer.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,17 @@ def _check_training_screen_output(self, accuracy=None, loss=None):
103103
"""
104104
# Print the last line of training output which has the loss value.
105105
lines = self._training_screen_output.splitlines()
106-
last_line = lines[len(lines)-1]
106+
for line in lines:
107+
if line.startswith('INFO:tensorflow:Saving dict for global step %s:' % 2500):
108+
last_line = line
109+
break
107110
print(last_line)
108111

109112
# supports positive numbers (int, real) with exponential form support.
110113
positive_number_re = re.compile('[+]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?')
111114

112115
# Check it made it to step 2500
113-
saving_num_re = re.compile('Saving evaluation summary for step \d+')
116+
saving_num_re = re.compile('global_step = \d+')
114117
saving_num = saving_num_re.findall(last_line)
115118
# saving_num == ['Saving evaluation summary for step NUM']
116119
self.assertEqual(len(saving_num), 1)
@@ -142,12 +145,16 @@ def _check_training_screen_output(self, accuracy=None, loss=None):
142145

143146

144147
def _check_train_files(self):
145-
model_folder = os.path.join(self._train_output, 'model')
146-
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'checkpoint')))
147-
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'export')))
148-
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'export.meta')))
149-
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'schema.json')))
150-
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'transforms.json')))
148+
model_folder = os.path.join(self._train_output,
149+
'train/export/prediction_model')
150+
self.assertTrue(
151+
os.path.isfile(os.path.join(model_folder, 'saved_model.pb')))
152+
self.assertTrue(
153+
os.path.isfile(os.path.join(model_folder, 'variables/variables.index')))
154+
self.assertTrue(
155+
os.path.isfile(os.path.join(model_folder, 'assets.extra/schema.json')))
156+
self.assertTrue(
157+
os.path.isfile(os.path.join(model_folder, 'assets.extra/transforms.json')))
151158

152159

153160
def testRegressionDnn(self):

0 commit comments

Comments
 (0)