|
| 1 | +# Copyright 2016 Google Inc. All rights reserved. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
| 4 | +# in compliance with the License. You may obtain a copy of the License at |
| 5 | +# |
| 6 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 7 | +# |
| 8 | +# Unless required by applicable law or agreed to in writing, software distributed under the License |
| 9 | +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| 10 | +# or implied. See the License for the specific language governing permissions and limitations under |
| 11 | +# the License. |
| 12 | + |
| 13 | +"""Implements Cloud ML Eval Results Analysis""" |
| 14 | + |
| 15 | +import apache_beam as beam |
| 16 | +from collections import namedtuple |
| 17 | + |
| 18 | +"""Prepresents an eval results CSV file. For example, the content is like: |
| 19 | + 107,Iris-versicolor,1.64827824278e-07,0.999999880791,6.27104979056e-10 |
| 20 | + 100,Iris-versicolor,3.5338824091e-05,0.99996471405,1.32811195375e-09 |
| 21 | + ... |
| 22 | +""" |
| 23 | +CsvEvalResults = namedtuple('CsvEvalResults', 'source, key_index predicted_index score_index_start num_scores') |
| 24 | + |
| 25 | +"""Prepresents an eval source CSV file. For example, the content is like: |
| 26 | + 107,Iris-virginica,4.9,2.5,4.5,1.7 |
| 27 | + 100,Iris-versicolor,5.7,2.8,4.1,1.3 |
| 28 | + ... |
| 29 | + The metadata is generated in the preprocessing pipeline. It is used to describe the CSV file, |
| 30 | + including schema, headers, etc. |
| 31 | +""" |
| 32 | +CsvEvalSource = namedtuple('CsvEvalSource', 'source metadata') |
| 33 | + |
| 34 | + |
| 35 | +class EvalResultsCsvCoder(beam.coders.Coder): |
| 36 | + """A coder to read from Eval results CSV file. Note encode() is only needed in cloud run. |
| 37 | + """ |
| 38 | + def __init__(self, eval_results): |
| 39 | + self._eval_results = eval_results |
| 40 | + |
| 41 | + def decode(self, csv_line): |
| 42 | + import csv |
| 43 | + source_elem = next(csv.reader([csv_line])) |
| 44 | + key = source_elem[self._eval_results.key_index] |
| 45 | + element = { |
| 46 | + 'predicted': source_elem[self._eval_results.predicted_index], |
| 47 | + 'scores': source_elem[self._eval_results.score_index_start: \ |
| 48 | + self._eval_results.score_index_start+self._eval_results.num_scores] |
| 49 | + } |
| 50 | + return (key, element) |
| 51 | + |
| 52 | + def encode(self, element): |
| 53 | + return str(element) |
| 54 | + |
| 55 | + |
| 56 | +class AccuracyFn(beam.CombineFn): |
| 57 | + """A transform to compute accuracy for feature slices. |
| 58 | + """ |
| 59 | + def __init__(self, target_column_name): |
| 60 | + self._target_column_name = target_column_name |
| 61 | + |
| 62 | + def create_accumulator(self): |
| 63 | + return (0.0, 0) |
| 64 | + |
| 65 | + def add_input(self, (sum, count), input): |
| 66 | + new_sum = sum |
| 67 | + if (input['predicted'] == input[self._target_column_name]): |
| 68 | + new_sum += 1 |
| 69 | + return new_sum, count + 1 |
| 70 | + |
| 71 | + def merge_accumulators(self, accumulators): |
| 72 | + sums, counts = zip(*accumulators) |
| 73 | + return sum(sums), sum(counts) |
| 74 | + |
| 75 | + def extract_output(self, (sum, count)): |
| 76 | + accuracy = float(sum) / count if count else float('NaN') |
| 77 | + return {'accuracy': accuracy, 'totalWeightedExamples': count} |
| 78 | + |
| 79 | + |
| 80 | +class FeatureSlicingPipeline(object): |
| 81 | + """The pipeline to generate feature slicing stats. For example, accuracy values given |
| 82 | + "species = Iris-versicolor", "education = graduate", etc. |
| 83 | + It is implemented with DataFlow. |
| 84 | + """ |
| 85 | + @staticmethod |
| 86 | + def _pair_source_with_key(element): |
| 87 | + key = element['key'] |
| 88 | + del element['key'] |
| 89 | + return (key, element) |
| 90 | + |
| 91 | + @staticmethod |
| 92 | + def _join_info((key, info)): |
| 93 | + value = info['source'][0] |
| 94 | + value.update(info['results'][0]) |
| 95 | + return (key, value) |
| 96 | + |
| 97 | + def _pipeline_def(self, p, eval_source, eval_results, features_to_slice, metrics, output_file, |
| 98 | + shard_name_template=None): |
| 99 | + import datalab.mlalpha as mlalpha |
| 100 | + import google.cloud.ml.io as io |
| 101 | + import json |
| 102 | + |
| 103 | + metadata = mlalpha.Metadata(eval_source.metadata) |
| 104 | + target_name, _ = metadata.get_target_name_and_scenario() |
| 105 | + |
| 106 | + # Load eval source. |
| 107 | + eval_source_coder = io.CsvCoder(metadata.get_csv_headers(), metadata.get_numeric_columns()) |
| 108 | + eval_source_data = p | beam.io.ReadFromText(eval_source.source, coder=eval_source_coder) | \ |
| 109 | + beam.Map('pair_source_with_key', FeatureSlicingPipeline._pair_source_with_key) |
| 110 | + |
| 111 | + # Load eval results. |
| 112 | + eval_results_data = p | \ |
| 113 | + beam.Read('ReadEvalResults', beam.io.TextFileSource(eval_results.source, |
| 114 | + coder=EvalResultsCsvCoder(eval_results))) |
| 115 | + |
| 116 | + # Join source with results by key. |
| 117 | + joined_results = {'source': eval_source_data, 'results': eval_results_data} | \ |
| 118 | + beam.CoGroupByKey() | beam.Map('join by key', FeatureSlicingPipeline._join_info) |
| 119 | + |
| 120 | + feature_metrics_list = [] |
| 121 | + for feature_to_slice in features_to_slice: |
| 122 | + feature_metrics = joined_results | \ |
| 123 | + beam.Map('slice_get_key_%s' % feature_to_slice, |
| 124 | + lambda (k,v),f=feature_to_slice: (v[f], v)) | \ |
| 125 | + beam.CombinePerKey('slice_combine_%s' % feature_to_slice, |
| 126 | + AccuracyFn(target_name)) | \ |
| 127 | + beam.Map('slice_prepend_feature_name_%s' % feature_to_slice, |
| 128 | + lambda (k,v),f=feature_to_slice: ('%s:%s' % (f, k), v)) |
| 129 | + feature_metrics_list.append(feature_metrics) |
| 130 | + |
| 131 | + feature_metrics_list | beam.Flatten() | \ |
| 132 | + beam.Map('ToJsonFormat', lambda (k,v): json.dumps({'feature': k, 'metricValues': v})) | \ |
| 133 | + beam.io.WriteToText(output_file, shard_name_template=shard_name_template) |
| 134 | + return p |
| 135 | + |
| 136 | + |
| 137 | + def run_local(self, eval_source, eval_results, features_to_slice, metrics, output_file): |
| 138 | + """Run the pipeline locally. Blocks execution until it finishes. |
| 139 | +
|
| 140 | + Args: |
| 141 | + eval_source: The only supported format is CsvEvalResults now while we may add more. |
| 142 | + Note the source can be either a GCS path or a local path. |
| 143 | + eval_results: The only supported format is CsvEvalSource now while we may add more. |
| 144 | + Note the source can be either a GCS path or a local path. |
| 145 | + features_to_slice: A list of features to slice on. The features must exist in |
| 146 | + eval_source, and can be numeric, categorical, or target. |
| 147 | + metrics: A list of metrics to compute. For classification, it supports "accuracy", |
| 148 | + "logloss". For regression, it supports "RMSE". |
| 149 | + output_file: The path to a local file holding the aggregated results. |
| 150 | + """ |
| 151 | + p = beam.Pipeline('DirectPipelineRunner') |
| 152 | + self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file, |
| 153 | + shard_name_template='') |
| 154 | + p.run() |
| 155 | + |
| 156 | + |
| 157 | + def default_pipeline_options(self, output_dir): |
| 158 | + """Get default DataFlow options. Users can customize it further on top of it and then |
| 159 | + send the option to run_cloud(). |
| 160 | +
|
| 161 | + Args: |
| 162 | + output_dir: A GCS path which will be used as base path for tmp and staging dir. |
| 163 | +
|
| 164 | + Returns: |
| 165 | + A dictionary of options. |
| 166 | + """ |
| 167 | + import datalab.context as context |
| 168 | + import datetime |
| 169 | + import google.cloud.ml as ml |
| 170 | + import os |
| 171 | + |
| 172 | + options = { |
| 173 | + 'staging_location': os.path.join(output_dir, 'tmp', 'staging'), |
| 174 | + 'temp_location': os.path.join(output_dir, 'tmp'), |
| 175 | + 'job_name': 'feature-slicing-pipeline' + '-' + \ |
| 176 | + datetime.datetime.now().strftime('%y%m%d-%H%M%S'), |
| 177 | + 'project': context.Context.default().project_id, |
| 178 | + 'extra_packages': ['gs://cloud-datalab/dataflow/datalab.tar.gz', ml.sdk_location], |
| 179 | + 'teardown_policy': 'TEARDOWN_ALWAYS', |
| 180 | + 'no_save_main_session': True |
| 181 | + } |
| 182 | + return options |
| 183 | + |
| 184 | + def run_cloud(self, eval_source, eval_results, features_to_slice, metrics, output_file, |
| 185 | + pipeline_option=None): |
| 186 | + """Run the pipeline in cloud. Returns when the job is submitted. |
| 187 | + Calling of this function may incur some cost since it runs a DataFlow job in Google Cloud. |
| 188 | + If pipeline_option is not specified, make sure you are signed in (through Datalab) |
| 189 | + and a default project is set so it can get credentials and projects from global context. |
| 190 | +
|
| 191 | + Args: |
| 192 | + eval_source: The only supported format is CsvEvalResults now while we may add more. |
| 193 | + The source needs to be a GCS path and is readable to current signed in user. |
| 194 | + eval_results: The only supported format is CsvEvalSource now while we may add more. |
| 195 | + The source needs to be a GCS path and is readable to current signed in user. |
| 196 | + features_to_slice: A list of features to slice on. The features must exist in |
| 197 | + eval_source, and can be numeric, categorical, or target. |
| 198 | + metrics: A list of metrics to compute. For classification, it supports "accuracy", |
| 199 | + "logloss". For regression, it supports "RMSE". |
| 200 | + pipeline_option: If not specified, use default options. Recommend customizing your options |
| 201 | + based on default one obtained from default_pipeline_options(). For example, |
| 202 | + options = fsp.default_pipeline_options() |
| 203 | + options['num_workers'] = 10 |
| 204 | + ... |
| 205 | + output_file: A GCS file prefix holding the aggregated results. |
| 206 | + """ |
| 207 | + import os |
| 208 | + if pipeline_option is None: |
| 209 | + output_dir = os.path.dirname(output_file) |
| 210 | + pipeline_option = self.default_pipeline_options(output_dir) |
| 211 | + opts = beam.pipeline.PipelineOptions(flags=[], **pipeline_option) |
| 212 | + p = beam.Pipeline('DataflowPipelineRunner', options=opts) |
| 213 | + self._pipeline_def(p, eval_source, eval_results, features_to_slice, metrics, output_file) |
| 214 | + p.run() |
| 215 | + |
0 commit comments