Skip to content
This repository was archived by the owner on Sep 3, 2022. It is now read-only.

Remove old feature-slicing pipeline implementation (is replaced by BigQuery) Add Confusion matrix magic. #129

Merged
merged 3 commits into from
Jan 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datalab/mlalpha/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
from ._package import Packager
from ._cloud_models import CloudModels, CloudModelVersions
from ._confusion_matrix import ConfusionMatrix
from ._analysis import CsvEvalResults, CsvEvalSource, EvalResultsCsvCoder, \
AccuracyFn, FeatureSlicingPipeline
from ._analysis import csv_to_dataframe
from ._package_runner import PackageRunner

from plotly.offline import init_notebook_mode
Expand Down
221 changes: 27 additions & 194 deletions datalab/mlalpha/_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,206 +10,39 @@
# or implied. See the License for the specific language governing permissions and limitations under
# the License.

"""Implements Cloud ML Eval Results Analysis"""
"""Implements Cloud ML Analysis Helpers"""

import apache_beam as beam
from collections import namedtuple

"""Prepresents an eval results CSV file. For example, the content is like:
107,Iris-versicolor,1.64827824278e-07,0.999999880791,6.27104979056e-10
100,Iris-versicolor,3.5338824091e-05,0.99996471405,1.32811195375e-09
...
"""
CsvEvalResults = namedtuple('CsvEvalResults', 'source, key_index predicted_index score_index_start num_scores')
import google.cloud.ml as ml
import numpy as np
import pandas as pd
import yaml

"""Prepresents an eval source CSV file. For example, the content is like:
107,Iris-virginica,4.9,2.5,4.5,1.7
100,Iris-versicolor,5.7,2.8,4.1,1.3
...
The metadata is generated in the preprocessing pipeline. It is used to describe the CSV file,
including schema, headers, etc.
"""
CsvEvalSource = namedtuple('CsvEvalSource', 'source metadata')
import datalab.bigquery as bq


class EvalResultsCsvCoder(beam.coders.Coder):
"""A coder to read from Eval results CSV file. Note encode() is only needed in cloud run.
"""
def __init__(self, eval_results):
self._eval_results = eval_results

def decode(self, csv_line):
import csv
source_elem = next(csv.reader([csv_line]))
key = source_elem[self._eval_results.key_index]
element = {
'predicted': source_elem[self._eval_results.predicted_index],
'scores': source_elem[self._eval_results.score_index_start: \
self._eval_results.score_index_start+self._eval_results.num_scores]
}
return (key, element)

def encode(self, element):
return str(element)


class AccuracyFn(beam.CombineFn):
"""A transform to compute accuracy for feature slices.
"""
def __init__(self, target_column_name):
self._target_column_name = target_column_name

def create_accumulator(self):
return (0.0, 0)

def add_input(self, (sum, count), input):
new_sum = sum
if (input['predicted'] == input[self._target_column_name]):
new_sum += 1
return new_sum, count + 1

def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)

def extract_output(self, (sum, count)):
accuracy = float(sum) / count if count else float('NaN')
return {'accuracy': accuracy, 'totalWeightedExamples': count}


class FeatureSlicingPipeline(object):
"""The pipeline to generate feature slicing stats. For example, accuracy values given
"species = Iris-versicolor", "education = graduate", etc.
It is implemented with DataFlow.
"""
@staticmethod
def _pair_source_with_key(element):
key = element['key']
del element['key']
return (key, element)

@staticmethod
def _join_info((key, info)):
value = info['source'][0]
value.update(info['results'][0])
return (key, value)

def _pipeline_def(self, p, eval_source, eval_results, features_to_slice, metrics, output_file,
shard_name_template=None):
import datalab.mlalpha as mlalpha
import google.cloud.ml.io as io
import json

metadata = mlalpha.Metadata(eval_source.metadata)
target_name, _ = metadata.get_target_name_and_scenario()

# Load eval source.
eval_source_coder = io.CsvCoder(metadata.get_csv_headers(), metadata.get_numeric_columns())
eval_source_data = p | beam.io.ReadFromText(eval_source.source, coder=eval_source_coder) | \
beam.Map('pair_source_with_key', FeatureSlicingPipeline._pair_source_with_key)

# Load eval results.
eval_results_data = p | \
beam.Read('ReadEvalResults', beam.io.TextFileSource(eval_results.source,
coder=EvalResultsCsvCoder(eval_results)))

# Join source with results by key.
joined_results = {'source': eval_source_data, 'results': eval_results_data} | \
beam.CoGroupByKey() | beam.Map('join by key', FeatureSlicingPipeline._join_info)

feature_metrics_list = []
for feature_to_slice in features_to_slice:
feature_metrics = joined_results | \
beam.Map('slice_get_key_%s' % feature_to_slice,
lambda (k,v),f=feature_to_slice: (v[f], v)) | \
beam.CombinePerKey('slice_combine_%s' % feature_to_slice,
AccuracyFn(target_name)) | \
beam.Map('slice_prepend_feature_name_%s' % feature_to_slice,
lambda (k,v),f=feature_to_slice: ('%s:%s' % (f, k), v))
feature_metrics_list.append(feature_metrics)

feature_metrics_list | beam.Flatten() | \
beam.Map('ToJsonFormat', lambda (k,v): json.dumps({'feature': k, 'metricValues': v})) | \
beam.io.WriteToText(output_file, shard_name_template=shard_name_template)
return p


def run_local(self, eval_source, eval_results, features_to_slice, metrics, output_file):
"""Run the pipeline locally. Blocks execution until it finishes.

Args:
eval_source: The only supported format is CsvEvalResults now while we may add more.
Note the source can be either a GCS path or a local path.
eval_results: The only supported format is CsvEvalSource now while we may add more.
Note the source can be either a GCS path or a local path.
features_to_slice: A list of features to slice on. The features must exist in
eval_source, and can be numeric, categorical, or target.
metrics: A list of metrics to compute. For classification, it supports "accuracy",
"logloss". For regression, it supports "RMSE".
output_file: The path to a local file holding the aggregated results.
"""
p = beam.Pipeline('DirectPipelineRunner')
self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file,
shard_name_template='')
p.run()


def default_pipeline_options(self, output_dir):
"""Get default DataFlow options. Users can customize it further on top of it and then
send the option to run_cloud().
def csv_to_dataframe(csv_path, schema_path):
"""Given a CSV file together with its BigQuery schema file in yaml, load
content into a dataframe.

Args:
output_dir: A GCS path which will be used as base path for tmp and staging dir.
csv_path: Input CSV path. Can be local or GCS.
schema_path: Input schema path. Can be local or GCS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you give an example of the schema file in the comment?

After loading the schema file, the order of the column names in the python object is preserved, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema would be:

  • name: target
    type: STRING
  • name: predicted
    type: STRING
    ...
    It's a list of dict, so it is ordered even after loaded in memory.


Returns:
A dictionary of options.
"""
import datalab.context as context
import datetime
import google.cloud.ml as ml
import os

options = {
'staging_location': os.path.join(output_dir, 'tmp', 'staging'),
'temp_location': os.path.join(output_dir, 'tmp'),
'job_name': 'feature-slicing-pipeline' + '-' + \
datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
'project': context.Context.default().project_id,
'extra_packages': ['gs://cloud-datalab/dataflow/datalab.tar.gz', ml.sdk_location],
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True
}
return options

def run_cloud(self, eval_source, eval_results, features_to_slice, metrics, output_file,
pipeline_option=None):
"""Run the pipeline in cloud. Returns when the job is submitted.
Calling of this function may incur some cost since it runs a DataFlow job in Google Cloud.
If pipeline_option is not specified, make sure you are signed in (through Datalab)
and a default project is set so it can get credentials and projects from global context.

Args:
eval_source: The only supported format is CsvEvalResults now while we may add more.
The source needs to be a GCS path and is readable to current signed in user.
eval_results: The only supported format is CsvEvalSource now while we may add more.
The source needs to be a GCS path and is readable to current signed in user.
features_to_slice: A list of features to slice on. The features must exist in
eval_source, and can be numeric, categorical, or target.
metrics: A list of metrics to compute. For classification, it supports "accuracy",
"logloss". For regression, it supports "RMSE".
pipeline_option: If not specified, use default options. Recommend customizing your options
based on default one obtained from default_pipeline_options(). For example,
options = fsp.default_pipeline_options()
options['num_workers'] = 10
...
output_file: A GCS file prefix holding the aggregated results.
"""
import os
if pipeline_option is None:
output_dir = os.path.dirname(output_file)
pipeline_option = self.default_pipeline_options(output_dir)
opts = beam.pipeline.PipelineOptions(flags=[], **pipeline_option)
p = beam.Pipeline('DataflowPipelineRunner', options=opts)
self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file)
p.run()

Loaded pandas dataframe.
"""
with ml.util._file.open_local_or_gcs(schema_path, mode='r') as f:
schema = yaml.safe_load(f)
_MAPPINGS = {
'FLOAT': np.float64,
'INTEGER': np.int64,
'TIMESTAMP': np.datetime64,
'BOOLEAN': np.bool,
}
for item in schema:
item['type'] = _MAPPINGS.get(item['type'], object)
names = [x['name'] for x in schema]
dtype = {x['name']: x['type'] for x in schema}
with ml.util._file.open_local_or_gcs(csv_path, mode='r') as f:
return pd.read_csv(f, names=names, dtype=dtype)
86 changes: 86 additions & 0 deletions datalab/mlalpha/commands/_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@
raise Exception('This module can only be loaded in ipython.')

import collections
import google.cloud.ml as cloudml
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
from sklearn.metrics import confusion_matrix
import yaml


import datalab.context
import datalab.mlalpha
import datalab.utils.commands
Expand Down Expand Up @@ -87,6 +93,23 @@ def ml(line, cell=None):
required=True)
batch_predict_parser.set_defaults(func=_batch_predict)

confusion_matrix_parser = parser.subcommand('confusion_matrix',
'Plot confusion matrix. The source is provided ' +
'in one of "csv", "bqtable", and "sql" params.')
confusion_matrix_parser.add_argument('--csv',
help='GCS or local path of CSV file which contains ' +
'"target", "predicted" columns at least. The CSV ' +
'either comes with a schema file in the same dir, ' +
'or specify "headers: name1, name2..." in cell.')
confusion_matrix_parser.add_argument('--bqtable',
help='name of the BigQuery table in the form of ' +
'dataset.table.')
confusion_matrix_parser.add_argument('--sql',
help='name of the sql module defined in previous cell ' +
'which should return "target", "predicted", ' +
'and "count" columns at least in results.')
confusion_matrix_parser.set_defaults(func=_confusion_matrix)

namespace = datalab.utils.commands.notebook_environment()
return datalab.utils.commands.handle_magic_line(line, cell, parser, namespace=namespace)

Expand Down Expand Up @@ -158,3 +181,66 @@ def _predict(args, cell):

def _batch_predict(args, cell):
return _run_package(args, cell, 'batch_predict')


def _plot_confusion_matrix(cm, labels):
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion matrix')
plt.colorbar()
tick_marks = np.arange(len(labels))
plt.xticks(tick_marks, labels, rotation=45)
plt.yticks(tick_marks, labels)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')


def _confusion_matrix_from_csv(input_csv, cell):
schema_file = input_csv + '.schema.yaml'
headers = None
if cell is not None:
env = datalab.utils.commands.notebook_environment()
config = datalab.utils.commands.parse_config(cell, env)
headers_str = config.get('headers', None)
if headers_str is not None:
headers = [x.strip() for x in headers_str.split(',')]
if headers is not None:
with cloudml.util._file.open_local_or_gcs(input_csv, mode='r') as f:
df = pd.read_csv(f, names=headers)
elif cloudml.util._file.file_exists(schema_file):
df = datalab.mlalpha.csv_to_dataframe(input_csv, schema_file)
else:
raise Exception('headers is missing from cell, ' +
'and there is no schema file in the same dir as csv')
labels = sorted(set(df['target']) | set(df['predicted']))
cm = confusion_matrix(df['target'], df['predicted'], labels=labels)
return cm, labels


def _confusion_matrix_from_query(sql_module_name, bq_table):
if sql_module_name is not None:
item = datalab.utils.commands.get_notebook_item(sql_module_name)
query, _ = datalab.data.SqlModule.get_sql_statement_with_environment(item, {})
else:
query = ('select target, predicted, count(*) as count from %s group by target, predicted'
% bq_table)
dfbq = datalab.bigquery.Query(query).results().to_dataframe()
labels = sorted(set(dfbq['target']) | set(dfbq['predicted']))
labels_count = len(labels)
dfbq['target'] = [labels.index(x) for x in dfbq['target']]
dfbq['predicted'] = [labels.index(x) for x in dfbq['predicted']]
cm = [[0]*labels_count for i in range(labels_count)]
for index, row in dfbq.iterrows():
cm[row['target']][row['predicted']] = row['count']
return cm, labels


def _confusion_matrix(args, cell):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function could call two helpers to make it more readable like _cm_from_csv and _cm_from_sql

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done.

if args['csv'] is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. So for now the focus is on building a confusion_matrix from bigquery, and package authors are responsible for getting the results into bigquery. CVS is offered for smaller datasets. Please add a todo saying something like "add option to load csv data into bigquery". Or somehow document that the csv path is not recommended.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO. Will address that TODO once service supports "offline batch prediction".

#TODO: Maybe add cloud run for large CSVs with federated table.
cm, labels = _confusion_matrix_from_csv(args['csv'], cell)
elif args['sql'] is not None or args['bqtable'] is not None:
cm, labels = _confusion_matrix_from_query(args['sql'], args['bqtable'])
else:
raise Exception('One of "csv", "bqtable", and "sql" param is needed.')
_plot_confusion_matrix(cm, labels)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class Evaluator(object):
"""Loads variables from latest checkpoint and performs model evaluation."""

def __init__(self, model, data_paths, batch_size, output_path, dataset='eval'):
self.num_eval_batches = self._data_size(data_paths) // batch_size
data_size = self._data_size(data_paths)
if data_size <= batch_size:
raise Exception('Data size is smaller than batch size.')
self.num_eval_batches = data_size // batch_size
self.batch_of_examples = []
self.checkpoint_path = os.path.join(output_path, 'train')
self.output_path = os.path.join(output_path, dataset)
Expand Down