Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Commit f404e5f

Browse files
brandondutraqimingj
authored andcommitted
new preprocessing and training for structured data (#160)
* new preprocessing is done next: work on training, and then update the tests * saving work * sw * seems to be working, going to do tests next * got preprocessing test working * training test pass!!! * added exported graph back in * dl preprocessing for local, cloud/csv, cloud/bigquery DONE :) * gcloud cloud training works * cloud dl training working * ops, this files should not be saved * removed junk function * sw * review comments * removed cloudml sdk usage + lint * review comments
1 parent 15e74c0 commit f404e5f

File tree

12 files changed

+1608
-1055
lines changed

12 files changed

+1608
-1055
lines changed

solutionbox/structured_data/datalab_solutions/structured_data/_package.py

Lines changed: 226 additions & 267 deletions
Large diffs are not rendered by default.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright 2017 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# ==============================================================================
15+
import cloud_preprocess
16+
import local_preprocess
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
# Copyright 2017 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import absolute_import
16+
from __future__ import division
17+
from __future__ import print_function
18+
19+
import argparse
20+
import json
21+
import os
22+
import sys
23+
24+
from tensorflow.python.lib.io import file_io
25+
26+
INPUT_FEATURES_FILE = 'input_features.json'
27+
SCHEMA_FILE = 'schema.json'
28+
29+
NUMERICAL_ANALYSIS_FILE = 'numerical_analysis.json'
30+
CATEGORICAL_ANALYSIS_FILE = 'vocab_%s.csv'
31+
32+
33+
def parse_arguments(argv):
34+
"""Parse command line arguments.
35+
36+
Args:
37+
argv: list of command line arguments, includeing programe name.
38+
39+
Returns:
40+
An argparse Namespace object.
41+
42+
Raises:
43+
ValueError: for bad parameters
44+
"""
45+
parser = argparse.ArgumentParser(
46+
description='Runs Preprocessing on structured data.')
47+
parser.add_argument('--output_dir',
48+
type=str,
49+
required=True,
50+
help='Google Cloud Storage which to place outputs.')
51+
parser.add_argument('--input_feature_file',
52+
type=str,
53+
required=True,
54+
help=('Json file containing feature types'))
55+
56+
parser.add_argument('--schema_file',
57+
type=str,
58+
required=False,
59+
help=('BigQuery json schema file'))
60+
parser.add_argument('--input_file_pattern',
61+
type=str,
62+
required=False,
63+
help='Input CSV file names. May contain a file pattern')
64+
65+
# If using bigquery table
66+
# TODO(brandondutra): maybe also support an sql input, so the table can be
67+
# ad-hoc.
68+
parser.add_argument('--bigquery_table',
69+
type=str,
70+
required=False,
71+
help=('project:dataset.table_name'))
72+
73+
args = parser.parse_args(args=argv[1:])
74+
print(args)
75+
76+
if not args.output_dir.startswith('gs://'):
77+
raise ValueError('--output_dir must point to a location on GCS')
78+
79+
if args.bigquery_table:
80+
if args.schema_file or args.input_file_pattern:
81+
raise ValueError('If using --bigquery_table, then --schema_file and '
82+
'--input_file_pattern, '
83+
'are not needed.')
84+
else:
85+
if not args.schema_file or not args.input_file_pattern:
86+
raise ValueError('If not using --bigquery_table, then --schema_file and '
87+
'--input_file_pattern '
88+
'are required.')
89+
90+
if not args.input_file_pattern.startswith('gs://'):
91+
raise ValueError('--input_file_pattern must point to files on GCS')
92+
93+
return args
94+
95+
96+
def parse_table_name(bigquery_table):
97+
"""Giving a string a:b.c, returns b.c.
98+
99+
Args:
100+
bigquery_table: full table name project_id:dataset:table
101+
102+
Returns:
103+
dataset:table
104+
105+
Raises:
106+
ValueError: if a, b, or c contain the character ':'.
107+
"""
108+
109+
id_name = bigquery_table.split(':')
110+
if len(id_name) != 2:
111+
raise ValueError('Bigquery table name should be in the form '
112+
'project_id:dataset.table_name. Got %s' % bigquery_table)
113+
return id_name[1]
114+
115+
116+
def run_numerical_analysis(table, args, feature_types):
117+
"""Find min/max values for the numerical columns and writes a json file.
118+
119+
Args:
120+
table: Reference to FederatedTable if bigquery_table is false.
121+
args: the command line args
122+
feature_types: python object of the feature types file.
123+
"""
124+
import datalab.bigquery as bq
125+
126+
# Get list of numerical columns.
127+
numerical_columns = []
128+
for name, config in feature_types.iteritems():
129+
if config['type'] == 'numerical':
130+
numerical_columns.append(name)
131+
132+
# Run the numerical analysis
133+
if numerical_columns:
134+
sys.stdout.write('Running numerical analysis...')
135+
max_min = [
136+
'max({name}) as max_{name}, min({name}) as min_{name}'.format(name=name)
137+
for name in numerical_columns]
138+
if args.bigquery_table:
139+
sql = 'SELECT %s from %s' % (', '.join(max_min),
140+
parse_table_name(args.bigquery_table))
141+
numerical_results = bq.Query(sql).to_dataframe()
142+
else:
143+
sql = 'SELECT %s from csv_table' % ', '.join(max_min)
144+
query = bq.Query(sql, data_sources={'csv_table': table})
145+
numerical_results = query.to_dataframe()
146+
147+
# Convert the numerical results to a json file.
148+
results_dict = {}
149+
for name in numerical_columns:
150+
results_dict[name] = {'max': numerical_results.iloc[0]['max_%s' % name],
151+
'min': numerical_results.iloc[0]['min_%s' % name]}
152+
153+
file_io.write_string_to_file(
154+
os.path.join(args.output_dir, NUMERICAL_ANALYSIS_FILE),
155+
json.dumps(results_dict, indent=2, separators=(',', ': ')))
156+
157+
sys.stdout.write('done.\n')
158+
159+
160+
def run_categorical_analysis(table, args, feature_types):
161+
"""Find vocab values for the categorical columns and writes a csv file.
162+
163+
The vocab files are in the from
164+
index,categorical_column_name
165+
0,'abc'
166+
1,'def'
167+
2,'ghi'
168+
...
169+
170+
Args:
171+
table: Reference to FederatedTable if bigquery_table is false.
172+
args: the command line args
173+
feature_types: python object of the feature types file.
174+
"""
175+
import datalab.bigquery as bq
176+
categorical_columns = []
177+
for name, config in feature_types.iteritems():
178+
if config['type'] == 'categorical':
179+
categorical_columns.append(name)
180+
181+
jobs = []
182+
if categorical_columns:
183+
sys.stdout.write('Running categorical analysis...')
184+
for name in categorical_columns:
185+
if args.bigquery_table:
186+
table_name = parse_table_name(args.bigquery_table)
187+
else:
188+
table_name = 'table_name'
189+
190+
sql = """
191+
SELECT
192+
{name},
193+
FROM
194+
{table}
195+
WHERE
196+
{name} IS NOT NULL
197+
GROUP BY
198+
{name}
199+
""".format(name=name, table=table_name)
200+
out_file = os.path.join(args.output_dir,
201+
CATEGORICAL_ANALYSIS_FILE % name)
202+
203+
if args.bigquery_table:
204+
jobs.append(bq.Query(sql).extract_async(out_file, csv_header=False))
205+
else:
206+
query = bq.Query(sql, data_sources={table_name: table})
207+
jobs.append(query.extract_async(out_file, csv_header=False))
208+
209+
for job in jobs:
210+
job.wait()
211+
212+
sys.stdout.write('done.\n')
213+
214+
215+
def run_analysis(args):
216+
"""Builds an analysis file for training.
217+
218+
Uses BiqQuery tables to do the analysis.
219+
220+
Args:
221+
args: command line args
222+
"""
223+
import datalab.bigquery as bq
224+
if args.bigquery_table:
225+
table = bq.Table(args.bigquery_table)
226+
else:
227+
schema_list = json.loads(file_io.read_file_to_string(args.schema_file))
228+
table = bq.FederatedTable().from_storage(
229+
source=args.input_file_pattern,
230+
source_format='csv',
231+
ignore_unknown_values=False,
232+
max_bad_records=0,
233+
compressed=False,
234+
schema=bq.Schema(schema_list))
235+
236+
feature_types = json.loads(
237+
file_io.read_file_to_string(args.input_feature_file))
238+
239+
run_numerical_analysis(table, args, feature_types)
240+
run_categorical_analysis(table, args, feature_types)
241+
242+
# Save a copy of the input types to the output location.
243+
file_io.copy(args.input_feature_file,
244+
os.path.join(args.output_dir, INPUT_FEATURES_FILE),
245+
overwrite=True)
246+
247+
# Save a copy of the schema to the output location.
248+
if args.schema_file:
249+
file_io.copy(args.schema_file,
250+
os.path.join(args.output_dir, SCHEMA_FILE),
251+
overwrite=True)
252+
else:
253+
file_io.write_string_to_file(
254+
os.path.join(args.output_dir, SCHEMA_FILE),
255+
json.dumps(table.schema._bq_schema, indent=2, separators=(',', ': ')))
256+
257+
258+
def main(argv=None):
259+
args = parse_arguments(sys.argv if argv is None else argv)
260+
run_analysis(args)
261+
262+
263+
if __name__ == '__main__':
264+
main()

0 commit comments

Comments
 (0)