-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Components - TFX #2671
Merged
Merged
Components - TFX #2671
Changes from all commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
1de4368
Added CsvExampleGen component
Ark-kun 1751f99
Switched to using some processing code from the component class
Ark-kun 0ccd7c3
Renamed output_examples to example_artifacts for consistency with the…
Ark-kun ace062e
Fixed the docstring a bit
Ark-kun 8e30a62
Added StatisticsGen
Ark-kun b8fd5a7
Added SchemaGen
Ark-kun 35ab2e0
Fixed the input_dict construction
Ark-kun fcef473
Use None defaults
Ark-kun 8a1d1e5
Switched to TFX container image
Ark-kun d6e6b52
Updated component definitions
Ark-kun fa7374c
Fixed StatisticsGen and SchemaGen
Ark-kun a9e784e
Printing component instance in CsvExampleGen
Ark-kun 3a1159a
Moved components to directories
Ark-kun 5645997
Updated the sample TFX pipeline
Ark-kun eb8f281
Renamed ExamplesPath to Examples for data passing components
Ark-kun f84d7c9
Corrected output_component_file paths
Ark-kun 1cc4a0f
Added the Transform component
Ark-kun 7cc3350
Added the Trainer component
Ark-kun 9f5fe9c
Added the BigQueryExampleGen component
Ark-kun 91ec94d
Added the ImportExampleGen component
Ark-kun 4892bbf
Added the Evaluator component
Ark-kun bda6978
Added the ExampleValidator component
Ark-kun 093e2d2
Updated the sample
Ark-kun 034cd32
Upgraded to TFX 0.15.0
Ark-kun 8fec5f1
Upgraded the sample to 0.15.0
Ark-kun bbf11e3
Silence Flake8 for annotations
Ark-kun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
# flake8: noqa TODO | ||
|
||
from kfp.components import InputPath, OutputPath | ||
|
||
|
||
def Evaluator( | ||
examples_path: InputPath('Examples'), | ||
model_exports_path: InputPath('Model'), | ||
#model_path: InputPath('Model'), | ||
|
||
output_path: OutputPath('ModelEval'), | ||
|
||
feature_slicing_spec: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' = None, | ||
): | ||
""" | ||
A TFX component to evaluate models trained by a TFX Trainer component. | ||
The Evaluator component performs model evaluations in the TFX pipeline and | ||
the resultant metrics can be viewed in a Jupyter notebook. It uses the | ||
input examples generated from the | ||
[ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) | ||
component to evaluate the models. | ||
Specifically, it can provide: | ||
- metrics computed on entire training and eval dataset | ||
- tracking metrics over time | ||
- model quality performance on different feature slices | ||
## Exporting the EvalSavedModel in Trainer | ||
In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be | ||
exported during training, which is a special SavedModel containing | ||
annotations for the metrics, features, labels, and so on in your model. | ||
Evaluator uses this EvalSavedModel to compute metrics. | ||
As part of this, the Trainer component creates eval_input_receiver_fn, | ||
analogous to the serving_input_receiver_fn, which will extract the features | ||
and labels from the input data. As with serving_input_receiver_fn, there are | ||
utility functions to help with this. | ||
Please see https://www.tensorflow.org/tfx/model_analysis for more details. | ||
Args: | ||
examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen | ||
component. @Ark-kun: Must have the eval split. _required_ | ||
model_exports: A Channel of 'ModelExportPath' type, usually produced by | ||
Trainer component. Will be deprecated in the future for the `model` | ||
parameter. | ||
#model: Future replacement of the `model_exports` argument. | ||
feature_slicing_spec: | ||
[evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) | ||
instance that describes how Evaluator should slice the data. | ||
Returns: | ||
output: Channel of `ModelEvalPath` to store the evaluation results. | ||
Either `model_exports` or `model` must be present in the input arguments. | ||
""" | ||
from tfx.components.evaluator.component import Evaluator | ||
component_class = Evaluator | ||
input_channels_with_splits = {'examples'} | ||
output_channels_with_splits = {} | ||
|
||
|
||
import json | ||
import os | ||
from google.protobuf import json_format, message | ||
from tfx.types import Artifact, channel_utils | ||
|
||
arguments = locals().copy() | ||
|
||
component_class_args = {} | ||
|
||
for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): | ||
argument_value_obj = argument_value = arguments.get(name, None) | ||
if argument_value is None: | ||
continue | ||
parameter_type = execution_parameter.type | ||
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple | ||
argument_value_obj = parameter_type() | ||
json_format.Parse(argument_value, argument_value_obj) | ||
component_class_args[name] = argument_value_obj | ||
|
||
for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): | ||
artifact_path = arguments[name + '_path'] | ||
artifacts = [] | ||
if name in input_channels_with_splits: | ||
# Recovering splits | ||
splits = sorted(os.listdir(artifact_path)) | ||
for split in splits: | ||
artifact = Artifact(type_name=channel_parameter.type_name) | ||
artifact.split = split | ||
artifact.uri = os.path.join(artifact_path, split) + '/' | ||
artifacts.append(artifact) | ||
else: | ||
artifact = Artifact(type_name=channel_parameter.type_name) | ||
artifact.uri = artifact_path + '/' # ? | ||
artifacts.append(artifact) | ||
component_class_args[name] = channel_utils.as_channel(artifacts) | ||
|
||
component_class_instance = component_class(**component_class_args) | ||
|
||
input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} | ||
output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} | ||
exec_properties = component_class_instance.exec_properties | ||
|
||
# Generating paths for output artifacts | ||
for name, artifacts in output_dict.items(): | ||
base_artifact_path = arguments[name + '_path'] | ||
for artifact in artifacts: | ||
artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' | ||
|
||
print('component instance: ' + str(component_class_instance)) | ||
|
||
#executor = component_class.EXECUTOR_SPEC.executor_class() # Same | ||
executor = component_class_instance.executor_spec.executor_class() | ||
executor.Do( | ||
input_dict=input_dict, | ||
output_dict=output_dict, | ||
exec_properties=exec_properties, | ||
) | ||
|
||
|
||
if __name__ == '__main__': | ||
import kfp | ||
kfp.components.func_to_container_op( | ||
Evaluator, | ||
base_image='tensorflow/tfx:0.15.0', | ||
output_component_file='component.yaml' | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
name: Evaluator | ||
description: | | ||
A TFX component to evaluate models trained by a TFX Trainer component. | ||
The Evaluator component performs model evaluations in the TFX pipeline and | ||
the resultant metrics can be viewed in a Jupyter notebook. It uses the | ||
input examples generated from the | ||
[ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) | ||
component to evaluate the models. | ||
Specifically, it can provide: | ||
- metrics computed on entire training and eval dataset | ||
- tracking metrics over time | ||
- model quality performance on different feature slices | ||
## Exporting the EvalSavedModel in Trainer | ||
In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be | ||
exported during training, which is a special SavedModel containing | ||
annotations for the metrics, features, labels, and so on in your model. | ||
Evaluator uses this EvalSavedModel to compute metrics. | ||
As part of this, the Trainer component creates eval_input_receiver_fn, | ||
analogous to the serving_input_receiver_fn, which will extract the features | ||
and labels from the input data. As with serving_input_receiver_fn, there are | ||
utility functions to help with this. | ||
Please see https://www.tensorflow.org/tfx/model_analysis for more details. | ||
Args: | ||
examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen | ||
component. @Ark-kun: Must have the eval split. _required_ | ||
model_exports: A Channel of 'ModelExportPath' type, usually produced by | ||
Trainer component. Will be deprecated in the future for the `model` | ||
parameter. | ||
#model: Future replacement of the `model_exports` argument. | ||
feature_slicing_spec: | ||
[evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) | ||
instance that describes how Evaluator should slice the data. | ||
Returns: | ||
output: Channel of `ModelEvalPath` to store the evaluation results. | ||
Either `model_exports` or `model` must be present in the input arguments. | ||
inputs: | ||
- name: examples | ||
type: Examples | ||
- name: model_exports | ||
type: Model | ||
- name: feature_slicing_spec | ||
type: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' | ||
optional: true | ||
outputs: | ||
- name: output | ||
type: ModelEval | ||
implementation: | ||
container: | ||
image: tensorflow/tfx:0.15.0 | ||
command: | ||
- python3 | ||
- -u | ||
- -c | ||
- | | ||
class OutputPath: | ||
'''When creating component from function, OutputPath should be used as function parameter annotation to tell the system that the function wants to output data by writing it into a file with the given path instead of returning the data from the function.''' | ||
def __init__(self, type=None): | ||
self.type = type | ||
class InputPath: | ||
'''When creating component from function, InputPath should be used as function parameter annotation to tell the system to pass the *data file path* to the function instead of passing the actual data.''' | ||
def __init__(self, type=None): | ||
self.type = type | ||
def _make_parent_dirs_and_return_path(file_path: str): | ||
import os | ||
os.makedirs(os.path.dirname(file_path), exist_ok=True) | ||
return file_path | ||
def Evaluator( | ||
examples_path: InputPath('Examples'), | ||
model_exports_path: InputPath('Model'), | ||
#model_path: InputPath('Model'), | ||
output_path: OutputPath('ModelEval'), | ||
feature_slicing_spec: 'JsonObject: evaluator_pb2.FeatureSlicingSpec' = None, | ||
): | ||
""" | ||
A TFX component to evaluate models trained by a TFX Trainer component. | ||
The Evaluator component performs model evaluations in the TFX pipeline and | ||
the resultant metrics can be viewed in a Jupyter notebook. It uses the | ||
input examples generated from the | ||
[ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen) | ||
component to evaluate the models. | ||
Specifically, it can provide: | ||
- metrics computed on entire training and eval dataset | ||
- tracking metrics over time | ||
- model quality performance on different feature slices | ||
## Exporting the EvalSavedModel in Trainer | ||
In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be | ||
exported during training, which is a special SavedModel containing | ||
annotations for the metrics, features, labels, and so on in your model. | ||
Evaluator uses this EvalSavedModel to compute metrics. | ||
As part of this, the Trainer component creates eval_input_receiver_fn, | ||
analogous to the serving_input_receiver_fn, which will extract the features | ||
and labels from the input data. As with serving_input_receiver_fn, there are | ||
utility functions to help with this. | ||
Please see https://www.tensorflow.org/tfx/model_analysis for more details. | ||
Args: | ||
examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen | ||
component. @Ark-kun: Must have the eval split. _required_ | ||
model_exports: A Channel of 'ModelExportPath' type, usually produced by | ||
Trainer component. Will be deprecated in the future for the `model` | ||
parameter. | ||
#model: Future replacement of the `model_exports` argument. | ||
feature_slicing_spec: | ||
[evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto) | ||
instance that describes how Evaluator should slice the data. | ||
Returns: | ||
output: Channel of `ModelEvalPath` to store the evaluation results. | ||
Either `model_exports` or `model` must be present in the input arguments. | ||
""" | ||
from tfx.components.evaluator.component import Evaluator | ||
component_class = Evaluator | ||
input_channels_with_splits = {'examples'} | ||
output_channels_with_splits = {} | ||
import json | ||
import os | ||
from google.protobuf import json_format, message | ||
from tfx.types import Artifact, channel_utils | ||
arguments = locals().copy() | ||
component_class_args = {} | ||
for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items(): | ||
argument_value_obj = argument_value = arguments.get(name, None) | ||
if argument_value is None: | ||
continue | ||
parameter_type = execution_parameter.type | ||
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # execution_parameter.type can also be a tuple | ||
argument_value_obj = parameter_type() | ||
json_format.Parse(argument_value, argument_value_obj) | ||
component_class_args[name] = argument_value_obj | ||
for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items(): | ||
artifact_path = arguments[name + '_path'] | ||
artifacts = [] | ||
if name in input_channels_with_splits: | ||
# Recovering splits | ||
splits = sorted(os.listdir(artifact_path)) | ||
for split in splits: | ||
artifact = Artifact(type_name=channel_parameter.type_name) | ||
artifact.split = split | ||
artifact.uri = os.path.join(artifact_path, split) + '/' | ||
artifacts.append(artifact) | ||
else: | ||
artifact = Artifact(type_name=channel_parameter.type_name) | ||
artifact.uri = artifact_path + '/' # ? | ||
artifacts.append(artifact) | ||
component_class_args[name] = channel_utils.as_channel(artifacts) | ||
component_class_instance = component_class(**component_class_args) | ||
input_dict = {name: channel.get() for name, channel in component_class_instance.inputs.get_all().items()} | ||
output_dict = {name: channel.get() for name, channel in component_class_instance.outputs.get_all().items()} | ||
exec_properties = component_class_instance.exec_properties | ||
# Generating paths for output artifacts | ||
for name, artifacts in output_dict.items(): | ||
base_artifact_path = arguments[name + '_path'] | ||
for artifact in artifacts: | ||
artifact.uri = os.path.join(base_artifact_path, artifact.split) # Default split is '' | ||
print('component instance: ' + str(component_class_instance)) | ||
#executor = component_class.EXECUTOR_SPEC.executor_class() # Same | ||
executor = component_class_instance.executor_spec.executor_class() | ||
executor.Do( | ||
input_dict=input_dict, | ||
output_dict=output_dict, | ||
exec_properties=exec_properties, | ||
) | ||
import argparse | ||
_parser = argparse.ArgumentParser(prog='Evaluator', description="A TFX component to evaluate models trained by a TFX Trainer component.\n\n The Evaluator component performs model evaluations in the TFX pipeline and\n the resultant metrics can be viewed in a Jupyter notebook. It uses the\n input examples generated from the\n [ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen)\n component to evaluate the models.\n\n Specifically, it can provide:\n - metrics computed on entire training and eval dataset\n - tracking metrics over time\n - model quality performance on different feature slices\n\n ## Exporting the EvalSavedModel in Trainer\n\n In order to setup Evaluator in a TFX pipeline, an EvalSavedModel needs to be\n exported during training, which is a special SavedModel containing\n annotations for the metrics, features, labels, and so on in your model.\n Evaluator uses this EvalSavedModel to compute metrics.\n\n As part of this, the Trainer component creates eval_input_receiver_fn,\n analogous to the serving_input_receiver_fn, which will extract the features\n and labels from the input data. As with serving_input_receiver_fn, there are\n utility functions to help with this.\n\n Please see https://www.tensorflow.org/tfx/model_analysis for more details.\n\n Args:\n examples: A Channel of 'ExamplesPath' type, usually produced by ExampleGen\n component. @Ark-kun: Must have the eval split. _required_\n model_exports: A Channel of 'ModelExportPath' type, usually produced by\n Trainer component. Will be deprecated in the future for the `model`\n parameter.\n #model: Future replacement of the `model_exports` argument.\n feature_slicing_spec:\n [evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto)\n instance that describes how Evaluator should slice the data.\n Returns:\n output: Channel of `ModelEvalPath` to store the evaluation results.\n\n Either `model_exports` or `model` must be present in the input arguments.\n") | ||
_parser.add_argument("--examples", dest="examples_path", type=str, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--model-exports", dest="model_exports_path", type=str, required=True, default=argparse.SUPPRESS) | ||
_parser.add_argument("--feature-slicing-spec", dest="feature_slicing_spec", type=str, required=False, default=argparse.SUPPRESS) | ||
_parser.add_argument("--output", dest="output_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS) | ||
_parsed_args = vars(_parser.parse_args()) | ||
_output_files = _parsed_args.pop("_output_paths", []) | ||
_outputs = Evaluator(**_parsed_args) | ||
if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): | ||
_outputs = [_outputs] | ||
_output_serializers = [ | ||
] | ||
import os | ||
for idx, output_file in enumerate(_output_files): | ||
try: | ||
os.makedirs(os.path.dirname(output_file)) | ||
except OSError: | ||
pass | ||
with open(output_file, 'w') as f: | ||
f.write(_output_serializers[idx](_outputs[idx])) | ||
args: | ||
- --examples | ||
- inputPath: examples | ||
- --model-exports | ||
- inputPath: model_exports | ||
- if: | ||
cond: | ||
isPresent: feature_slicing_spec | ||
then: | ||
- --feature-slicing-spec | ||
- inputValue: feature_slicing_spec | ||
- --output | ||
- outputPath: output |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use model_path from the very beginning. I think TFX is migrating to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit non-trivial since the
ComponentSpec.INPUTS
uses the old names.I'll solve this issue in another PR.