Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Commit 2c6fca0

Browse files
committed
Remove old feature-slicing pipeline implementation (is replaced by BigQuery) Add Confusion matrix magic. (#129)
* Remove old feature-slicing pipeline implementation (is replaced by BigQuery). Add Confusion matrix magic. * Follow up on code review comments. Also fix an inception issue that eval loss is nan when eval size is smaller than batch size. * Fix set union.
1 parent 7c02c14 commit 2c6fca0

File tree

4 files changed

+118
-197
lines changed

4 files changed

+118
-197
lines changed

datalab/mlalpha/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
from ._package import Packager
2727
from ._cloud_models import CloudModels, CloudModelVersions
2828
from ._confusion_matrix import ConfusionMatrix
29-
from ._analysis import CsvEvalResults, CsvEvalSource, EvalResultsCsvCoder, \
30-
AccuracyFn, FeatureSlicingPipeline
29+
from ._analysis import csv_to_dataframe
3130
from ._package_runner import PackageRunner
3231

3332
from plotly.offline import init_notebook_mode

datalab/mlalpha/_analysis.py

Lines changed: 27 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -10,206 +10,39 @@
1010
# or implied. See the License for the specific language governing permissions and limitations under
1111
# the License.
1212

13-
"""Implements Cloud ML Eval Results Analysis"""
13+
"""Implements Cloud ML Analysis Helpers"""
1414

15-
import apache_beam as beam
16-
from collections import namedtuple
1715

18-
"""Prepresents an eval results CSV file. For example, the content is like:
19-
107,Iris-versicolor,1.64827824278e-07,0.999999880791,6.27104979056e-10
20-
100,Iris-versicolor,3.5338824091e-05,0.99996471405,1.32811195375e-09
21-
...
22-
"""
23-
CsvEvalResults = namedtuple('CsvEvalResults', 'source, key_index predicted_index score_index_start num_scores')
16+
import google.cloud.ml as ml
17+
import numpy as np
18+
import pandas as pd
19+
import yaml
2420

25-
"""Prepresents an eval source CSV file. For example, the content is like:
26-
107,Iris-virginica,4.9,2.5,4.5,1.7
27-
100,Iris-versicolor,5.7,2.8,4.1,1.3
28-
...
29-
The metadata is generated in the preprocessing pipeline. It is used to describe the CSV file,
30-
including schema, headers, etc.
31-
"""
32-
CsvEvalSource = namedtuple('CsvEvalSource', 'source metadata')
21+
import datalab.bigquery as bq
3322

3423

35-
class EvalResultsCsvCoder(beam.coders.Coder):
36-
"""A coder to read from Eval results CSV file. Note encode() is only needed in cloud run.
37-
"""
38-
def __init__(self, eval_results):
39-
self._eval_results = eval_results
40-
41-
def decode(self, csv_line):
42-
import csv
43-
source_elem = next(csv.reader([csv_line]))
44-
key = source_elem[self._eval_results.key_index]
45-
element = {
46-
'predicted': source_elem[self._eval_results.predicted_index],
47-
'scores': source_elem[self._eval_results.score_index_start: \
48-
self._eval_results.score_index_start+self._eval_results.num_scores]
49-
}
50-
return (key, element)
51-
52-
def encode(self, element):
53-
return str(element)
54-
55-
56-
class AccuracyFn(beam.CombineFn):
57-
"""A transform to compute accuracy for feature slices.
58-
"""
59-
def __init__(self, target_column_name):
60-
self._target_column_name = target_column_name
61-
62-
def create_accumulator(self):
63-
return (0.0, 0)
64-
65-
def add_input(self, (sum, count), input):
66-
new_sum = sum
67-
if (input['predicted'] == input[self._target_column_name]):
68-
new_sum += 1
69-
return new_sum, count + 1
70-
71-
def merge_accumulators(self, accumulators):
72-
sums, counts = zip(*accumulators)
73-
return sum(sums), sum(counts)
74-
75-
def extract_output(self, (sum, count)):
76-
accuracy = float(sum) / count if count else float('NaN')
77-
return {'accuracy': accuracy, 'totalWeightedExamples': count}
78-
79-
80-
class FeatureSlicingPipeline(object):
81-
"""The pipeline to generate feature slicing stats. For example, accuracy values given
82-
"species = Iris-versicolor", "education = graduate", etc.
83-
It is implemented with DataFlow.
84-
"""
85-
@staticmethod
86-
def _pair_source_with_key(element):
87-
key = element['key']
88-
del element['key']
89-
return (key, element)
90-
91-
@staticmethod
92-
def _join_info((key, info)):
93-
value = info['source'][0]
94-
value.update(info['results'][0])
95-
return (key, value)
96-
97-
def _pipeline_def(self, p, eval_source, eval_results, features_to_slice, metrics, output_file,
98-
shard_name_template=None):
99-
import datalab.mlalpha as mlalpha
100-
import google.cloud.ml.io as io
101-
import json
102-
103-
metadata = mlalpha.Metadata(eval_source.metadata)
104-
target_name, _ = metadata.get_target_name_and_scenario()
105-
106-
# Load eval source.
107-
eval_source_coder = io.CsvCoder(metadata.get_csv_headers(), metadata.get_numeric_columns())
108-
eval_source_data = p | beam.io.ReadFromText(eval_source.source, coder=eval_source_coder) | \
109-
beam.Map('pair_source_with_key', FeatureSlicingPipeline._pair_source_with_key)
110-
111-
# Load eval results.
112-
eval_results_data = p | \
113-
beam.Read('ReadEvalResults', beam.io.TextFileSource(eval_results.source,
114-
coder=EvalResultsCsvCoder(eval_results)))
115-
116-
# Join source with results by key.
117-
joined_results = {'source': eval_source_data, 'results': eval_results_data} | \
118-
beam.CoGroupByKey() | beam.Map('join by key', FeatureSlicingPipeline._join_info)
119-
120-
feature_metrics_list = []
121-
for feature_to_slice in features_to_slice:
122-
feature_metrics = joined_results | \
123-
beam.Map('slice_get_key_%s' % feature_to_slice,
124-
lambda (k,v),f=feature_to_slice: (v[f], v)) | \
125-
beam.CombinePerKey('slice_combine_%s' % feature_to_slice,
126-
AccuracyFn(target_name)) | \
127-
beam.Map('slice_prepend_feature_name_%s' % feature_to_slice,
128-
lambda (k,v),f=feature_to_slice: ('%s:%s' % (f, k), v))
129-
feature_metrics_list.append(feature_metrics)
130-
131-
feature_metrics_list | beam.Flatten() | \
132-
beam.Map('ToJsonFormat', lambda (k,v): json.dumps({'feature': k, 'metricValues': v})) | \
133-
beam.io.WriteToText(output_file, shard_name_template=shard_name_template)
134-
return p
135-
136-
137-
def run_local(self, eval_source, eval_results, features_to_slice, metrics, output_file):
138-
"""Run the pipeline locally. Blocks execution until it finishes.
139-
140-
Args:
141-
eval_source: The only supported format is CsvEvalResults now while we may add more.
142-
Note the source can be either a GCS path or a local path.
143-
eval_results: The only supported format is CsvEvalSource now while we may add more.
144-
Note the source can be either a GCS path or a local path.
145-
features_to_slice: A list of features to slice on. The features must exist in
146-
eval_source, and can be numeric, categorical, or target.
147-
metrics: A list of metrics to compute. For classification, it supports "accuracy",
148-
"logloss". For regression, it supports "RMSE".
149-
output_file: The path to a local file holding the aggregated results.
150-
"""
151-
p = beam.Pipeline('DirectPipelineRunner')
152-
self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file,
153-
shard_name_template='')
154-
p.run()
155-
156-
157-
def default_pipeline_options(self, output_dir):
158-
"""Get default DataFlow options. Users can customize it further on top of it and then
159-
send the option to run_cloud().
24+
def csv_to_dataframe(csv_path, schema_path):
25+
"""Given a CSV file together with its BigQuery schema file in yaml, load
26+
content into a dataframe.
16027
16128
Args:
162-
output_dir: A GCS path which will be used as base path for tmp and staging dir.
29+
csv_path: Input CSV path. Can be local or GCS.
30+
schema_path: Input schema path. Can be local or GCS.
16331
16432
Returns:
165-
A dictionary of options.
166-
"""
167-
import datalab.context as context
168-
import datetime
169-
import google.cloud.ml as ml
170-
import os
171-
172-
options = {
173-
'staging_location': os.path.join(output_dir, 'tmp', 'staging'),
174-
'temp_location': os.path.join(output_dir, 'tmp'),
175-
'job_name': 'feature-slicing-pipeline' + '-' + \
176-
datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
177-
'project': context.Context.default().project_id,
178-
'extra_packages': ['gs://cloud-datalab/dataflow/datalab.tar.gz', ml.sdk_location],
179-
'teardown_policy': 'TEARDOWN_ALWAYS',
180-
'no_save_main_session': True
181-
}
182-
return options
183-
184-
def run_cloud(self, eval_source, eval_results, features_to_slice, metrics, output_file,
185-
pipeline_option=None):
186-
"""Run the pipeline in cloud. Returns when the job is submitted.
187-
Calling of this function may incur some cost since it runs a DataFlow job in Google Cloud.
188-
If pipeline_option is not specified, make sure you are signed in (through Datalab)
189-
and a default project is set so it can get credentials and projects from global context.
190-
191-
Args:
192-
eval_source: The only supported format is CsvEvalResults now while we may add more.
193-
The source needs to be a GCS path and is readable to current signed in user.
194-
eval_results: The only supported format is CsvEvalSource now while we may add more.
195-
The source needs to be a GCS path and is readable to current signed in user.
196-
features_to_slice: A list of features to slice on. The features must exist in
197-
eval_source, and can be numeric, categorical, or target.
198-
metrics: A list of metrics to compute. For classification, it supports "accuracy",
199-
"logloss". For regression, it supports "RMSE".
200-
pipeline_option: If not specified, use default options. Recommend customizing your options
201-
based on default one obtained from default_pipeline_options(). For example,
202-
options = fsp.default_pipeline_options()
203-
options['num_workers'] = 10
204-
...
205-
output_file: A GCS file prefix holding the aggregated results.
206-
"""
207-
import os
208-
if pipeline_option is None:
209-
output_dir = os.path.dirname(output_file)
210-
pipeline_option = self.default_pipeline_options(output_dir)
211-
opts = beam.pipeline.PipelineOptions(flags=[], **pipeline_option)
212-
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
213-
self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file)
214-
p.run()
215-
33+
Loaded pandas dataframe.
34+
"""
35+
with ml.util._file.open_local_or_gcs(schema_path, mode='r') as f:
36+
schema = yaml.safe_load(f)
37+
_MAPPINGS = {
38+
'FLOAT': np.float64,
39+
'INTEGER': np.int64,
40+
'TIMESTAMP': np.datetime64,
41+
'BOOLEAN': np.bool,
42+
}
43+
for item in schema:
44+
item['type'] = _MAPPINGS.get(item['type'], object)
45+
names = [x['name'] for x in schema]
46+
dtype = {x['name']: x['type'] for x in schema}
47+
with ml.util._file.open_local_or_gcs(csv_path, mode='r') as f:
48+
return pd.read_csv(f, names=names, dtype=dtype)

datalab/mlalpha/commands/_ml.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717
raise Exception('This module can only be loaded in ipython.')
1818

1919
import collections
20+
import google.cloud.ml as cloudml
21+
import matplotlib.pyplot as plt
22+
import numpy as np
2023
import os
24+
import pandas as pd
25+
from sklearn.metrics import confusion_matrix
2126
import yaml
2227

28+
2329
import datalab.context
2430
import datalab.mlalpha
2531
import datalab.utils.commands
@@ -87,6 +93,23 @@ def ml(line, cell=None):
8793
required=True)
8894
batch_predict_parser.set_defaults(func=_batch_predict)
8995

96+
confusion_matrix_parser = parser.subcommand('confusion_matrix',
97+
'Plot confusion matrix. The source is provided ' +
98+
'in one of "csv", "bqtable", and "sql" params.')
99+
confusion_matrix_parser.add_argument('--csv',
100+
help='GCS or local path of CSV file which contains ' +
101+
'"target", "predicted" columns at least. The CSV ' +
102+
'either comes with a schema file in the same dir, ' +
103+
'or specify "headers: name1, name2..." in cell.')
104+
confusion_matrix_parser.add_argument('--bqtable',
105+
help='name of the BigQuery table in the form of ' +
106+
'dataset.table.')
107+
confusion_matrix_parser.add_argument('--sql',
108+
help='name of the sql module defined in previous cell ' +
109+
'which should return "target", "predicted", ' +
110+
'and "count" columns at least in results.')
111+
confusion_matrix_parser.set_defaults(func=_confusion_matrix)
112+
90113
namespace = datalab.utils.commands.notebook_environment()
91114
return datalab.utils.commands.handle_magic_line(line, cell, parser, namespace=namespace)
92115

@@ -158,3 +181,66 @@ def _predict(args, cell):
158181

159182
def _batch_predict(args, cell):
160183
return _run_package(args, cell, 'batch_predict')
184+
185+
186+
def _plot_confusion_matrix(cm, labels):
187+
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
188+
plt.title('Confusion matrix')
189+
plt.colorbar()
190+
tick_marks = np.arange(len(labels))
191+
plt.xticks(tick_marks, labels, rotation=45)
192+
plt.yticks(tick_marks, labels)
193+
plt.tight_layout()
194+
plt.ylabel('True label')
195+
plt.xlabel('Predicted label')
196+
197+
198+
def _confusion_matrix_from_csv(input_csv, cell):
199+
schema_file = input_csv + '.schema.yaml'
200+
headers = None
201+
if cell is not None:
202+
env = datalab.utils.commands.notebook_environment()
203+
config = datalab.utils.commands.parse_config(cell, env)
204+
headers_str = config.get('headers', None)
205+
if headers_str is not None:
206+
headers = [x.strip() for x in headers_str.split(',')]
207+
if headers is not None:
208+
with cloudml.util._file.open_local_or_gcs(input_csv, mode='r') as f:
209+
df = pd.read_csv(f, names=headers)
210+
elif cloudml.util._file.file_exists(schema_file):
211+
df = datalab.mlalpha.csv_to_dataframe(input_csv, schema_file)
212+
else:
213+
raise Exception('headers is missing from cell, ' +
214+
'and there is no schema file in the same dir as csv')
215+
labels = sorted(set(df['target']) | set(df['predicted']))
216+
cm = confusion_matrix(df['target'], df['predicted'], labels=labels)
217+
return cm, labels
218+
219+
220+
def _confusion_matrix_from_query(sql_module_name, bq_table):
221+
if sql_module_name is not None:
222+
item = datalab.utils.commands.get_notebook_item(sql_module_name)
223+
query, _ = datalab.data.SqlModule.get_sql_statement_with_environment(item, {})
224+
else:
225+
query = ('select target, predicted, count(*) as count from %s group by target, predicted'
226+
% bq_table)
227+
dfbq = datalab.bigquery.Query(query).results().to_dataframe()
228+
labels = sorted(set(dfbq['target']) | set(dfbq['predicted']))
229+
labels_count = len(labels)
230+
dfbq['target'] = [labels.index(x) for x in dfbq['target']]
231+
dfbq['predicted'] = [labels.index(x) for x in dfbq['predicted']]
232+
cm = [[0]*labels_count for i in range(labels_count)]
233+
for index, row in dfbq.iterrows():
234+
cm[row['target']][row['predicted']] = row['count']
235+
return cm, labels
236+
237+
238+
def _confusion_matrix(args, cell):
239+
if args['csv'] is not None:
240+
#TODO: Maybe add cloud run for large CSVs with federated table.
241+
cm, labels = _confusion_matrix_from_csv(args['csv'], cell)
242+
elif args['sql'] is not None or args['bqtable'] is not None:
243+
cm, labels = _confusion_matrix_from_query(args['sql'], args['bqtable'])
244+
else:
245+
raise Exception('One of "csv", "bqtable", and "sql" param is needed.')
246+
_plot_confusion_matrix(cm, labels)

solutionbox/inception/datalab_solutions/inception/_trainer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ class Evaluator(object):
4141
"""Loads variables from latest checkpoint and performs model evaluation."""
4242

4343
def __init__(self, model, data_paths, batch_size, output_path, dataset='eval'):
44-
self.num_eval_batches = self._data_size(data_paths) // batch_size
44+
data_size = self._data_size(data_paths)
45+
if data_size <= batch_size:
46+
raise Exception('Data size is smaller than batch size.')
47+
self.num_eval_batches = data_size // batch_size
4548
self.batch_of_examples = []
4649
self.checkpoint_path = os.path.join(output_path, 'train')
4750
self.output_path = os.path.join(output_path, dataset)

0 commit comments

Comments
 (0)