forked from kubeflow/pipelines
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Added CsvExampleGen component * Switched to using some processing code from the component class Needs testing * Renamed output_examples to example_artifacts for consistency with the original component * Fixed the docstring a bit * Added StatisticsGen First draft * Added SchemaGen First draft * Fixed the input_dict construction * Use None defaults * Switched to TFX container image * Updated component definitions * Fixed StatisticsGen and SchemaGen Input artifacts must have splits. Split URIs should end with "/'. The ciomponents now work. Also printing component_class_instance for debugging. * Printing component instance in CsvExampleGen * Moved components to directories * Updated the sample TFX pipeline * Renamed ExamplesPath to Examples for data passing components * Corrected output_component_file paths * Added the Transform component The component uses almost completely generic code. * Added the Trainer component * Added the Evaluator component * Added the ExampleValidator component * Added the BigQueryExampleGen component * Added the ImportExampleGen component * Updated the sample Added ExampleValidator, Transform, Trainer, Evaluator * Upgraded to TFX 0.15.0 * Upgraded the sample to 0.15.0 * Silence Flake8 for annotations
- Loading branch information
1 parent
0b684fd
commit b634720
Showing
21 changed files
with
3,364 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
# flake8: noqa TODO | ||
|
||
from kfp.components import InputPath, OutputPath | ||
|
||
|
||
def Evaluator( | ||
examples_path: InputPath('Examples'), | ||
model_exports_path: InputPath('Model'), | ||
#model_path: InputPath('Model'), | ||
|
||
output_path: OutputPath('ModelEval'), | ||
|
||
feature_slicing_spec: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' = None, | ||
): | ||
""" | ||
A TFX component to evaluate models trained by a TFX Trainer component. | ||
The Evaluator component performs model evaluations in the TFX pipeline and | ||
the resultant metrics can be viewed in a Jupyter notebook. It uses the | ||
input examples generated from the | ||
[ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) | ||
component to evaluate the models. | ||
Specifically, it can provide: | ||
- metrics computed on entire training and eval dataset | ||
- tracking metrics over time | ||
- model quality performance on different feature slices | ||
## Exporting the EvalSavedModel in Trainer | ||
In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be | ||
exported during training, which is a special SavedModel containing | ||
annotations for the metrics, features, labels, and so on in your model. | ||
Evaluator uses this EvalSavedModel to compute metrics. | ||
As part of this, the Trainer component creates eval_input_receiver_fn, | ||
analogous to the serving_input_receiver_fn, which will extract the features | ||
and labels from the input data. As with serving_input_receiver_fn, there are | ||
utility functions to help with this. | ||
Please see https://www.tensorflow.org/tfx/model_analysis for more details. | ||
Args: | ||
examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen | ||
component. @Ark-kun: Must have the eval split. _required_ | ||
model_exports: A Channel of 'ModelExportPath' type, usually produced by | ||
Trainer component. Will be deprecated in the future for the `model` | ||
parameter. | ||
#model: Future replacement of the `model_exports` argument. | ||
feature_slicing_spec: | ||
[evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) | ||
instance that describes how Evaluator should slice the data. | ||
Returns: | ||
output: Channel of `ModelEvalPath` to store the evaluation results. | ||
Either `model_exports` or `model` must be present in the input arguments. | ||
""" | ||
from tfx.components.evaluator.component import Evaluator | ||
component_class = Evaluator | ||
input_channels_with_splits = {'examples'} | ||
output_channels_with_splits = {} | ||
|
||
|
||
import json | ||
import os | ||
from google.protobuf import json_format, message | ||
from tfx.types import Artifact, channel_utils | ||
|
||
arguments = locals().copy() | ||
|
||
component_class_args = {} | ||
|
||
for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): | ||
argument_value_obj = argument_value = arguments.get(name, None) | ||
if argument_value is None: | ||
continue | ||
parameter_type = execution_parameter.type | ||
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple | ||
argument_value_obj = parameter_type() | ||
json_format.Parse(argument_value, argument_value_obj) | ||
component_class_args[name] = argument_value_obj | ||
|
||
for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): | ||
artifact_path = arguments[name + '_path'] | ||
artifacts = [] | ||
if name in input_channels_with_splits: | ||
# Recovering splits | ||
splits = sorted(os.listdir(artifact_path)) | ||
for split in splits: | ||
artifact = Artifact(type_name=channel_parameter.type_name) | ||
artifact.split = split | ||
artifact.uri = os.path.join(artifact_path, split) + '/' | ||
artifacts.append(artifact) | ||
else: | ||
artifact = Artifact(type_name=channel_parameter.type_name) | ||
artifact.uri = artifact_path + '/' # ? | ||
artifacts.append(artifact) | ||
component_class_args[name] = channel_utils.as_channel(artifacts) | ||
|
||
component_class_instance = component_class(**component_class_args) | ||
|
||
input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} | ||
output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} | ||
exec_properties = component_class_instance.exec_properties | ||
|
||
# Generating paths for output artifacts | ||
for name, artifacts in output_dict.items(): | ||
base_artifact_path = arguments[name + '_path'] | ||
for artifact in artifacts: | ||
artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' | ||
|
||
print('component instance: ' + str(component_class_instance)) | ||
|
||
#executor = component_class.EXECUTOR_SPEC.executor_class() # Same | ||
executor = component_class_instance.executor_spec.executor_class() | ||
executor.Do( | ||
input_dict=input_dict, | ||
output_dict=output_dict, | ||
exec_properties=exec_properties, | ||
) | ||
|
||
|
||
if __name__ == '__main__': | ||
import kfp | ||
kfp.components.func_to_container_op( | ||
Evaluator, | ||
base_image='tensorflow/tfx:0.15.0', | ||
output_component_file='component.yaml' | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
name: Evaluator | ||
description: | | ||
A TFX component to evaluate models trained by a TFX Trainer component. | ||
The Evaluator component performs model evaluations in the TFX pipeline and | ||
the resultant metrics can be viewed in a Jupyter notebook. It uses the | ||
input examples generated from the | ||
[ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) | ||
component to evaluate the models. | ||
Specifically, it can provide: | ||
- metrics computed on entire training and eval dataset | ||
- tracking metrics over time | ||
- model quality performance on different feature slices | ||
## Exporting the EvalSavedModel in Trainer | ||
In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be | ||
exported during training, which is a special SavedModel containing | ||
annotations for the metrics, features, labels, and so on in your model. | ||
Evaluator uses this EvalSavedModel to compute metrics. | ||
As part of this, the Trainer component creates eval_input_receiver_fn, | ||
analogous to the serving_input_receiver_fn, which will extract the features | ||
and labels from the input data. As with serving_input_receiver_fn, there are | ||
utility functions to help with this. | ||
Please see https://www.tensorflow.org/tfx/model_analysis for more details. | ||
Args: | ||
examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen | ||
component. @Ark-kun: Must have the eval split. _required_ | ||
model_exports: A Channel of 'ModelExportPath' type, usually produced by | ||
Trainer component. Will be deprecated in the future for the `model` | ||
parameter. | ||
#model: Future replacement of the `model_exports` argument. | ||
feature_slicing_spec: | ||
[evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) | ||
instance that describes how Evaluator should slice the data. | ||
Returns: | ||
output: Channel of `ModelEvalPath` to store the evaluation results. | ||
Either `model_exports` or `model` must be present in the input arguments. | ||
inputs: | ||
- name: examples | ||
type: Examples | ||
- name: model_exports | ||
type: Model | ||
- name: feature_slicing_spec | ||
type: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' | ||
optional: true | ||
outputs: | ||
- name: output | ||
type: ModelEval | ||
implementation: | ||
container: | ||
image: tensorflow/tfx:0.15.0 | ||
command: | ||
- python3 | ||
- -u | ||
- -c | ||
- | | ||
class OutputPath: | ||
'''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' | ||
def __init__(self, type=None): | ||
self.type = type | ||
class InputPath: | ||
'''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' | ||
def __init__(self, type=None): | ||
self.type = type | ||
def _make_parent_dirs_and_return_path(file_path: str): | ||
import os | ||
os.makedirs(os.path.dirname(file_path), exist_ok=True) | ||
return file_path | ||
def Evaluator( | ||
examples_path: InputPath('Examples'), | ||
model_exports_path: InputPath('Model'), | ||
#model_path: InputPath('Model'), | ||
output_path: OutputPath('ModelEval'), | ||
feature_slicing_spec: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' = None, | ||
): | ||
""" | ||
A TFX component to evaluate models trained by a TFX Trainer component. | ||
The Evaluator component performs model evaluations in the TFX pipeline and | ||
the resultant metrics can be viewed in a Jupyter notebook. It uses the | ||
input examples generated from the | ||
[ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) | ||
component to evaluate the models. | ||
Specifically, it can provide: | ||
- metrics computed on entire training and eval dataset | ||
- tracking metrics over time | ||
- model quality performance on different feature slices | ||
## Exporting the EvalSavedModel in Trainer | ||
In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be | ||
exported during training, which is a special SavedModel containing | ||
annotations for the metrics, features, labels, and so on in your model. | ||
Evaluator uses this EvalSavedModel to compute metrics. | ||
As part of this, the Trainer component creates eval_input_receiver_fn, | ||
analogous to the serving_input_receiver_fn, which will extract the features | ||
and labels from the input data. As with serving_input_receiver_fn, there are | ||
utility functions to help with this. | ||
Please see https://www.tensorflow.org/tfx/model_analysis for more details. | ||
Args: | ||
examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen | ||
component. @Ark-kun: Must have the eval split. _required_ | ||
model_exports: A Channel of 'ModelExportPath' type, usually produced by | ||
Trainer component. Will be deprecated in the future for the `model` | ||
parameter. | ||
#model: Future replacement of the `model_exports` argument. | ||
feature_slicing_spec: | ||
[evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) | ||
instance that describes how Evaluator should slice the data. | ||
Returns: | ||
output: Channel of `ModelEvalPath` to store the evaluation results. | ||
Either `model_exports` or `model` must be present in the input arguments. | ||
""" | ||
from tfx.components.evaluator.component import Evaluator | ||
component_class = Evaluator | ||
input_channels_with_splits = {'examples'} | ||
output_channels_with_splits = {} | ||
import json | ||
import os | ||
from google.protobuf import json_format, message | ||
from tfx.types import Artifact, channel_utils | ||
arguments = locals().copy() | ||
component_class_args = {} | ||
for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): | ||
argument_value_obj = argument_value = arguments.get(name, None) | ||
if argument_value is None: | ||
continue | ||
parameter_type = execution_parameter.type | ||
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple | ||
argument_value_obj = parameter_type() | ||
json_format.Parse(argument_value, argument_value_obj) | ||
component_class_args[name] = argument_value_obj | ||
for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): | ||
artifact_path = arguments[name + '_path'] | ||
artifacts = [] | ||
if name in input_channels_with_splits: | ||
# Recovering splits | ||
splits = sorted(os.listdir(artifact_path)) | ||
for split in splits: | ||
artifact = Artifact(type_name=channel_parameter.type_name) | ||
artifact.split = split | ||
artifact.uri = os.path.join(artifact_path, split) + '/' | ||
artifacts.append(artifact) | ||
else: | ||
artifact = Artifact(type_name=channel_parameter.type_name) | ||
artifact.uri = artifact_path + '/' # ? | ||
artifacts.append(artifact) | ||
component_class_args[name] = channel_utils.as_channel(artifacts) | ||
component_class_instance = component_class(**component_class_args) | ||
input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} | ||
output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} | ||
exec_properties = component_class_instance.exec_properties | ||
# Generating paths for output artifacts | ||
for name, artifacts in output_dict.items(): | ||
base_artifact_path = arguments[name + '_path'] | ||
for artifact in artifacts: | ||
artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' | ||
print('component instance: ' + str(component_class_instance)) | ||
#executor = component_class.EXECUTOR_SPEC.executor_class() # Same | ||
executor = component_class_instance.executor_spec.executor_class() | ||
executor.Do( | ||
input_dict=input_dict, | ||
output_dict=output_dict, | ||
exec_properties=exec_properties, | ||
) | ||
import argparse | ||
_parser = argparse.ArgumentParser(prog='Evaluator', description="A TFX component to evaluate models trained by a TFX Trainer component.\n\n The Evaluator component performs model evaluations in the TFX pipeline and\n the resultant metrics can be viewed in a Jupyter notebook. It uses the\n input examples generated from the\n [ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen)\n component to evaluate the models.\n\n Specifically, it can provide:\n - metrics computed on entire training and eval dataset\n - tracking metrics over time\n - model quality performance on different feature slices\n\n ## Exporting the EvalSavedModel in Trainer\n\n In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be\n exported during training, which is a special SavedModel containing\n annotations for the metrics, features, labels, and so on in your model.\n Evaluator uses this EvalSavedModel to compute metrics.\n\n As part of this, the Trainer component creates eval_input_receiver_fn,\n analogous to the serving_input_receiver_fn, which will extract the features\n and labels from the input data. As with serving_input_receiver_fn, there are\n utility functions to help with this.\n\n Please see https://www.tensorflow.org/tfx/model_analysis for more details.\n\n Args:\n examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen\n component. @Ark-kun: Must have the eval split. _required_\n model_exports: A Channel of 'ModelExportPath' type, usually produced by\n Trainer component. Will be deprecated in the future for the `model`\n parameter.\n #model: Future replacement of the `model_exports` argument.\n feature_slicing_spec:\n [evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto)\n instance that describes how Evaluator should slice the data.\n Returns:\n output: Channel of `ModelEvalPath` to store the evaluation results.\n\n Either `model_exports` or `model` must be present in the input arguments.\n") | ||
_parser.add_argument("--examples", dest="examples_path", type=str, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--model-exports", dest="model_exports_path", type=str, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--feature-slicing-spec", dest="feature_slicing_spec", type=str, required=False, default=argparse.SUPPRESS) | ||
_parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) | ||
_parsed_args = vars(_parser.parse_args()) | ||
_output_files = _parsed_args.pop("_output_paths", []) | ||
_outputs = Evaluator(**_parsed_args) | ||
if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): | ||
_outputs = [_outputs] | ||
_output_serializers = [ | ||
] | ||
import os | ||
for idx, output_file in enumerate(_output_files): | ||
try: | ||
os.makedirs(os.path.dirname(output_file)) | ||
except OSError: | ||
pass | ||
with open(output_file, 'w') as f: | ||
f.write(_output_serializers[idx](_outputs[idx])) | ||
args: | ||
- --examples | ||
- inputPath: examples | ||
- --model-exports | ||
- inputPath: model_exports | ||
- if: | ||
cond: | ||
isPresent: feature_slicing_spec | ||
then: | ||
- --feature-slicing-spec | ||
- inputValue: feature_slicing_spec | ||
- --output | ||
- outputPath: output |
Oops, something went wrong.