Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Cloudmlsm #229

Merged
merged 6 commits into from
Feb 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
475 changes: 250 additions & 225 deletions solutionbox/structured_data/datalab_solutions/structured_data/_package.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def get_version():
long_description="""
""",
install_requires=[
'tensorflow==1.0',
'protobuf==3.1.0',
'google-cloud-dataflow==0.5.5'
],
package_data={
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def parse_arguments(argv):
action='store_false',
help='Don\'t shard files')
parser.set_defaults(shard_files=True)

parser.add_argument('--output_format',
choices=['csv', 'json'],
default='csv',
Expand Down Expand Up @@ -104,55 +105,6 @@ def parse_arguments(argv):
return args


class FixMissingTarget(beam.DoFn):
"""A DoFn to fix missing target columns."""

def __init__(self, trained_model_dir):
"""Reads the schema file and extracted the expected number of columns.

Args:
trained_model_dir: path to model.

Raises:
ValueError: if schema.json not found in trained_model_dir
"""
from tensorflow.python.lib.io import file_io
import json
import os

schema_path = os.path.join(trained_model_dir, 'schema.json')
if not file_io.file_exists(schema_path):
raise ValueError('schema.json missing from %s' % schema_path)
schema = json.loads(file_io.read_file_to_string(schema_path))
self._num_expected_columns = len(schema)

def process(self, element):
"""Fixes csv line if target is missing.

The first column is assumed to be the target column, and the TF graph
expects to always parse the target column, even in prediction. Below,
we check how many csv columns there are, and if the target is missing, we
prepend a ',' to denote the missing column.

Example:
'target,key,value1,...' -> 'target,key,value1,...' (no change)
'key,value1,...' -> ',key,value1,...' (add missing target column)

The value of the missing target column comes from the default value given
to tf.decode_csv in the graph.
"""
import apache_beam as beam

num_columns = len(element.split(','))
if num_columns == self._num_expected_columns:
yield element
elif num_columns + 1 == self._num_expected_columns:
yield ',' + element
else:
yield beam.pvalue.SideOutputValue('errors',
('bad columns', element))


class EmitAsBatchDoFn(beam.DoFn):
"""A DoFn that buffers the records and emits them batch by batch."""

Expand Down Expand Up @@ -185,22 +137,22 @@ def __init__(self, trained_model_dir):
self._session = None

def start_bundle(self, element=None):
from tensorflow.contrib.session_bundle import session_bundle
from tensorflow.python.saved_model import tag_constants
from tensorflow.contrib.session_bundle import bundle_shim
import json

self._session, _ = session_bundle.load_session_bundle_from_path(
self._trained_model_dir)

# input_alias_map {'input_csv_string': tensor_name}
self._input_alias_map = json.loads(
self._session.graph.get_collection('inputs')[0])

# output_alias_map {'target_from_input': tensor_name, 'key': ...}
self._output_alias_map = json.loads(
self._session.graph.get_collection('outputs')[0])
self._session, meta_graph = bundle_shim.load_session_bundle_or_saved_model_bundle_from_path(self._trained_model_dir, tags=[tag_constants.SERVING])
signature = meta_graph.signature_def['serving_default']

# get the mappings between aliases and tensor names
# for both inputs and outputs
self._input_alias_map = {friendly_name: tensor_info_proto.name
for (friendly_name, tensor_info_proto) in signature.inputs.items() }
self._output_alias_map = {friendly_name: tensor_info_proto.name
for (friendly_name, tensor_info_proto) in signature.outputs.items() }
self._aliases, self._tensor_names = zip(*self._output_alias_map.items())


def finish_bundle(self, element=None):
self._session.close()

Expand All @@ -220,6 +172,11 @@ def process(self, element):

feed_dict = collections.defaultdict(list)
for line in element:

# Remove trailing newline.
if line.endswith('\n'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line.rstrip('\n')?

line = line[:-1]

feed_dict[self._input_alias_map.values()[0]].append(line)
num_in_batch += 1

Expand Down Expand Up @@ -311,26 +268,41 @@ def __init__(self, args):
self._output_format = args.output_format
self._output_dir = args.output_dir

# See if the target vocab should be loaded.
# Get the BQ schema if csv.
if self._output_format == 'csv':
from tensorflow.contrib.session_bundle import session_bundle
import json

self._session, _ = session_bundle.load_session_bundle_from_path(
args.trained_model_dir)

# output_alias_map {'target_from_input': tensor_name, 'key': ...}
output_alias_map = json.loads(
self._session.graph.get_collection('outputs')[0])

self._header = sorted(output_alias_map.keys())
self._session.close()

from tensorflow.python.saved_model import tag_constants
from tensorflow.contrib.session_bundle import bundle_shim
from tensorflow.core.framework import types_pb2

session, meta_graph = bundle_shim.load_session_bundle_or_saved_model_bundle_from_path(args.trained_model_dir, tags=[tag_constants.SERVING])
signature = meta_graph.signature_def['serving_default']

self._schema = []
for friendly_name in sorted(signature.outputs):
tensor_info_proto = signature.outputs[friendly_name]

# TODO(brandondutra): Could dtype be DT_INVALID?
# Consider getting the dtype from the graph via
# session.graph.get_tensor_by_name(tensor_info_proto.name).dtype)
dtype = tensor_info_proto.dtype
if dtype == types_pb2.DT_FLOAT or dtype == types_pb2.DT_DOUBLE:
bq_type == 'FLOAT'
elif dtype == types_pb2.DT_INT32 or dtype == types_pb2.DT_INT64:
bq_type == 'INTEGER'
else:
bq_type = 'STRING'

self._schema.append({'mode': 'NULLABLE',
'name': friendly_name,
'type': bq_type})
session.close()

def apply(self, datasets):
return self.expand(datasets)

def expand(self, datasets):
import json

tf_graph_predictions, errors = datasets

if self._output_format == 'json':
Expand All @@ -344,15 +316,16 @@ def expand(self, datasets):
shard_name_template=self._shard_name_template))
elif self._output_format == 'csv':
# make a csv header file
csv_coder = CSVCoder(self._header)
header = [col['name'] for col in self._schema]
csv_coder = CSVCoder(header)
_ = (
tf_graph_predictions.pipeline
| 'Make CSV Header'
>> beam.Create([csv_coder.make_header_string()])
| 'Write CSV Header File'
>> beam.Create([json.dumps(self._schema, indent=2)])
| 'Write CSV Schema File'
>> beam.io.textio.WriteToText(
os.path.join(self._output_dir, 'csv_header'),
file_name_suffix='.txt',
file_name_suffix='.json',
shard_name_template=''))

# Write the csv predictions
Expand Down Expand Up @@ -387,15 +360,11 @@ def make_prediction_pipeline(pipeline, args):
pipeline: the pipeline
args: command line args
"""


predicted_values, errors = (
pipeline
| 'Read CSV Files'
>> beam.io.ReadFromText(args.predict_data,
strip_trailing_newlines=True)
| 'Is Target Missing'
>> beam.ParDo(FixMissingTarget(args.trained_model_dir))
| 'Batch Input'
>> beam.ParDo(EmitAsBatchDoFn(args.batch_size))
| 'Run TF Graph on Batches'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ def _init_numerical_results():
categorical_results[col_name].update([parsed_line[col_name]])
else:
# numerical column.

# if empty, skip
if not parsed_line[col_name].strip():
continue;

numerical_results[col_name]['min'] = (
min(numerical_results[col_name]['min'],
float(parsed_line[col_name])))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def make_csv_data(filename, num_rows, problem_type, keep_target=True):
t = 102

if keep_target:
csv_line = "{target},{id},{num1},{num2},{num3},{str1},{str2},{str3}\n".format(
csv_line = "{id},{target},{num1},{num2},{num3},{str1},{str2},{str3}\n".format(
id=i,
target=t,
num1=num1,
Expand Down Expand Up @@ -89,16 +89,16 @@ def make_preprocess_schema(filename, problem_type):
problem_type: regression or classification
"""
schema = [
{
"mode": "REQUIRED",
"name": "target",
"type": ("STRING" if problem_type == 'classification' else "FLOAT")
},
{
"mode": "NULLABLE",
"name": "key",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "target",
"type": ("STRING" if problem_type == 'classification' else "FLOAT")
},
{
"mode": "NULLABLE",
"name": "num1",
Expand Down Expand Up @@ -199,7 +199,10 @@ def run_training(
make_csv_data('raw_train_regression.csv', 5000, 'regression', True)
make_csv_data('raw_eval_regression.csv', 1000, 'regression', True)
make_csv_data('raw_predict_regression.csv', 100, 'regression', False)
make_preprocess_schema('schema_regression.json', 'regression')

make_csv_data('raw_train_classification.csv', 5000, 'classification', True)
make_csv_data('raw_eval_classification.csv', 1000, 'classification', True)
make_csv_data('raw_predict_classification.csv', 100, 'classification', False)
make_preprocess_schema('schema_classification.json', 'classification')

Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,17 @@ def _check_training_screen_output(self, accuracy=None, loss=None):
"""
# Print the last line of training output which has the loss value.
lines = self._training_screen_output.splitlines()
last_line = lines[len(lines)-1]
for line in lines:
if line.startswith('INFO:tensorflow:Saving dict for global step %s:' % 2500):
last_line = line
break
print(last_line)

# supports positive numbers (int, real) with exponential form support.
positive_number_re = re.compile('[+]?\d+(?:\.\d+)?(?:[eE][+-]?\d+)?')

# Check it made it to step 2500
saving_num_re = re.compile('Saving evaluation summary for step \d+')
saving_num_re = re.compile('global_step = \d+')
saving_num = saving_num_re.findall(last_line)
# saving_num == ['Saving evaluation summary for step NUM']
self.assertEqual(len(saving_num), 1)
Expand Down Expand Up @@ -142,12 +145,16 @@ def _check_training_screen_output(self, accuracy=None, loss=None):


def _check_train_files(self):
model_folder = os.path.join(self._train_output, 'model')
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'checkpoint')))
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'export')))
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'export.meta')))
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'schema.json')))
self.assertTrue(os.path.isfile(os.path.join(model_folder, 'transforms.json')))
model_folder = os.path.join(self._train_output,
'train/export/prediction_model')
self.assertTrue(
os.path.isfile(os.path.join(model_folder, 'saved_model.pb')))
self.assertTrue(
os.path.isfile(os.path.join(model_folder, 'variables/variables.index')))
self.assertTrue(
os.path.isfile(os.path.join(model_folder, 'assets.extra/schema.json')))
self.assertTrue(
os.path.isfile(os.path.join(model_folder, 'assets.extra/transforms.json')))


def testRegressionDnn(self):
Expand Down
Loading