Skip to content

Commit

Permalink
[Sample] Add a notebook sample under parameterized_tfx_oss (#2729)
Browse files Browse the repository at this point in the history
* add notebook sample

* Update

* remove my kfp endpoint.

* simplify import paths.
  • Loading branch information
Jiaxiao Zheng authored and k8s-ci-robot committed Dec 18, 2019
1 parent 5349e7f commit 3fa7556
Showing 1 changed file with 343 additions and 0 deletions.
343 changes: 343 additions & 0 deletions samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# TFX pipeline example - Chicago Taxi tips prediction\n",
"\n",
"## Overview\n",
"[Tensorflow Extended (TFX)](https://github.com/tensorflow/tfx) is a Google-production-scale machine\n",
"learning platform based on TensorFlow. It provides a configuration framework to express ML pipelines\n",
"consisting of TFX components, which brings the user large-scale ML task orchestration, artifact lineage, as well as the power of various [TFX libraries](https://www.tensorflow.org/resources/libraries-extensions). Kubeflow Pipelines can be used as the orchestrator supporting the \n",
"execution of a TFX pipeline.\n",
"\n",
"This sample demonstrates how to author a ML pipeline in TFX and run it on a KFP deployment. \n",
"\n",
"## Permission\n",
"\n",
"This pipeline requires Google Cloud Storage permission to run. \n",
"If KFP was deployed through K8S marketplace, please follow instructions in [the guideline](https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials)\n",
"to make sure the service account has `storage.admin` role."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!python3 -m pip install pip --upgrade --quiet --user\n",
"!python3 -m pip install kfp --upgrade --quiet --user\n",
"!python3 -m pip install tfx --upgrade --quiet --user"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example we'll need a very recent version of TFX SDK to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n",
"\n",
"## RuntimeParameter in TFX DSL\n",
"Currently, TFX DSL only supports parameterizing field in the `PARAMETERS` section of `ComponentSpec`, see [here](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/types/component_spec.py#L126). This prevents runtime-parameterizing the pipeline topology. Also, if the declared type of the field is a protobuf, the user needs to pass in a dictionary with exactly the same names for each field, and specify one or more value as `RuntimeParameter` objects. In other word, the dictionary should be able to be passed in to [`ParseDict()` method](https://github.com/protocolbuffers/protobuf/blob/04a11fc91668884d1793bff2a0f72ee6ce4f5edd/python/google/protobuf/json_format.py#L433) and produce the correct pb message."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!python3 -m pip install --quiet --index-url https://test.pypi.org/simple/ tfx==0.16.0.dev20191212 --user"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from typing import Optional, Text\n",
"\n",
"import kfp\n",
"from kfp import dsl\n",
"\n",
"from tfx.components import Evaluator\n",
"from tfx.components import CsvExampleGen\n",
"from tfx.components import ExampleValidator\n",
"from tfx.components import ModelValidator\n",
"from tfx.components import Pusher\n",
"from tfx.components import SchemaGen\n",
"from tfx.components import StatisticsGen\n",
"from tfx.components import Trainer\n",
"from tfx.components import Transform\n",
"from tfx.orchestration import data_types\n",
"from tfx.orchestration import pipeline\n",
"from tfx.orchestration.kubeflow import kubeflow_dag_runner\n",
"from tfx.proto import pusher_pb2\n",
"from tfx.utils.dsl_utils import external_input"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# In TFX MLMD schema, pipeline name is used as the unique id of each pipeline.\n",
"# Assigning workflow ID as part of pipeline name allows the user to bypass\n",
"# some schema checks which are redundant for experimental pipelines.\n",
"pipeline_name = 'taxi_pipeline_with_parameters_' + kfp.dsl.RUN_ID_PLACEHOLDER\n",
"\n",
"# Path of pipeline data root, should be a GCS path.\n",
"# Note that when running on KFP, the pipeline root is always a runtime parameter.\n",
"pipeline_root = os.path.join('gs://my-bucket', 'tfx_taxi_simple',\n",
" kfp.dsl.RUN_ID_PLACEHOLDER)\n",
"\n",
"# Location of input data, should be a GCS path under which there is a csv file.\n",
"data_root_param = data_types.RuntimeParameter(\n",
" name='data-root',\n",
" default='gs://ml-pipeline-playground/tfx_taxi_simple/data',\n",
" ptype=Text,\n",
")\n",
"\n",
"# Path to the module file, GCS path.\n",
"# Module file is one of the recommended way to provide customized logic for component\n",
"# includeing Trainer and Transformer.\n",
"# See https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/components/trainer/component.py#L38\n",
"taxi_module_file_param = data_types.RuntimeParameter(\n",
" name='module-file',\n",
" default='gs://ml-pipeline-playground/tfx_taxi_simple/modules/tfx_taxi_utils_1205.py',\n",
" ptype=Text,\n",
")\n",
"\n",
"# Number of epochs in training.\n",
"train_steps = data_types.RuntimeParameter(\n",
" name='train-steps',\n",
" default=10,\n",
" ptype=int,\n",
")\n",
"\n",
"# Number of epochs in evaluation.\n",
"eval_steps = data_types.RuntimeParameter(\n",
" name='eval-steps',\n",
" default=5,\n",
" ptype=int,\n",
")\n",
"\n",
"# Column name for slicing.\n",
"slicing_column = data_types.RuntimeParameter(\n",
" name='slicing-column',\n",
" default='trip_start_hour',\n",
" ptype=Text,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## TFX Components\n",
"\n",
"Please refer to the [official guide](https://www.tensorflow.org/tfx/guide#tfx_pipeline_components) for the detailed explanation and purpose of each TFX component."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The input data location is parameterized by _data_root_param\n",
"examples = external_input(data_root_param)\n",
"example_gen = CsvExampleGen(input=examples)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"statistics_gen = StatisticsGen(input_data=example_gen.outputs['examples'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"infer_schema = SchemaGen(\n",
" stats=statistics_gen.outputs['statistics'], infer_feature_shape=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"validate_stats = ExampleValidator(\n",
" stats=statistics_gen.outputs['statistics'],\n",
" schema=infer_schema.outputs['schema'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The module file used in Transform and Trainer component is paramterized by\n",
"# _taxi_module_file_param.\n",
"transform = Transform(\n",
" input_data=example_gen.outputs['examples'],\n",
" schema=infer_schema.outputs['schema'],\n",
" module_file=taxi_module_file_param)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The numbers of steps in train_args are specified as RuntimeParameter with\n",
"# name 'train-steps' and 'eval-steps', respectively.\n",
"trainer = Trainer(\n",
" module_file=taxi_module_file_param,\n",
" transformed_examples=transform.outputs['transformed_examples'],\n",
" schema=infer_schema.outputs['schema'],\n",
" transform_output=transform.outputs['transform_graph'],\n",
" train_args={'num_steps': train_steps},\n",
" eval_args={'num_steps': eval_steps})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# The name of slicing column is specified as a RuntimeParameter.\n",
"model_analyzer = Evaluator(\n",
" examples=example_gen.outputs['examples'],\n",
" model_exports=trainer.outputs['model'],\n",
" feature_slicing_spec=dict(specs=[{\n",
" 'column_for_slicing': [slicing_column]\n",
" }]))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model_validator = ModelValidator(\n",
" examples=example_gen.outputs['examples'], model=trainer.outputs['model'])\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Currently we use this hack to ensure push_destination can\n",
"# be correctly parameterized and interpreted.\n",
"# pipeline root will be specified as a dsl.PipelineParam with the name\n",
"# pipeline-root, see:\n",
"# https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226\n",
"pipeline_root_param = dsl.PipelineParam(name='pipeline-root')\n",
"pusher = Pusher(\n",
" model_export=trainer.outputs['model'],\n",
" model_blessing=model_validator.outputs['blessing'],\n",
" push_destination=pusher_pb2.PushDestination(\n",
" filesystem=pusher_pb2.PushDestination.Filesystem(\n",
" base_directory=os.path.join(\n",
" str(pipeline_root_param), 'model_serving'))))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create the DSL pipeline object.\n",
"# This pipeline obj carries the business logic of the pipeline, but no runner-specific information\n",
"# was included.\n",
"dsl_pipeline = pipeline.Pipeline(\n",
" pipeline_name=pipeline_name,\n",
" pipeline_root=pipeline_root,\n",
" components=[\n",
" example_gen, statistics_gen, infer_schema, validate_stats, transform,\n",
" trainer, model_analyzer, model_validator, pusher\n",
" ],\n",
" enable_cache=True,\n",
" beam_pipeline_args=['--direct_num_workers=%d' % 4],\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Specify a TFX docker image. For the full list of tags please see:\n",
"# https://hub.docker.com/r/tensorflow/tfx/tags\n",
"tfx_image = 'tensorflow/tfx:0.16.0.dev20191205'\n",
"config = kubeflow_dag_runner.KubeflowDagRunnerConfig(\n",
" kubeflow_metadata_config=kubeflow_dag_runner\n",
" .get_default_kubeflow_metadata_config(),\n",
" tfx_image=tfx_image)\n",
"kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=config)\n",
"# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.\n",
"# By default it is named <pipeline_name>.tar.gz\n",
"kfp_runner.run(dsl_pipeline)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run_result = kfp.Client(\n",
" host='<KFP end point>'\n",
").create_run_from_pipeline_package(\n",
" pipeline_name + '.tar.gz', \n",
" arguments={\n",
" 'pipeline-root': '<gcs path to root>' + kfp.dsl.RUN_ID_PLACEHOLDER,\n",
" 'module-file': '<gcs path to the module file>', # delete this line to use default module file.\n",
" 'data-root': '<gcs path to the data>' # delete this line to use default data.\n",
"})"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

0 comments on commit 3fa7556

Please sign in to comment.