From 3fa755641e63ed42103733baf0dfe00bbd9df7a6 Mon Sep 17 00:00:00 2001 From: Jiaxiao Zheng Date: Tue, 17 Dec 2019 20:32:05 -0800 Subject: [PATCH] [Sample] Add a notebook sample under parameterized_tfx_oss (#2729) * add notebook sample * Update * remove my kfp endpoint. * simplify import paths. --- .../taxi_pipeline_notebook.ipynb | 343 ++++++++++++++++++ 1 file changed, 343 insertions(+) create mode 100644 samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb diff --git a/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb b/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb new file mode 100644 index 00000000000..3d581df0021 --- /dev/null +++ b/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb @@ -0,0 +1,343 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# TFX pipeline example - Chicago Taxi tips prediction\n", + "\n", + "## Overview\n", + "[Tensorflow Extended (TFX)](https://github.com/tensorflow/tfx) is a Google-production-scale machine\n", + "learning platform based on TensorFlow. It provides a configuration framework to express ML pipelines\n", + "consisting of TFX components, which brings the user large-scale ML task orchestration, artifact lineage, as well as the power of various [TFX libraries](https://www.tensorflow.org/resources/libraries-extensions). Kubeflow Pipelines can be used as the orchestrator supporting the \n", + "execution of a TFX pipeline.\n", + "\n", + "This sample demonstrates how to author a ML pipeline in TFX and run it on a KFP deployment. \n", + "\n", + "## Permission\n", + "\n", + "This pipeline requires Google Cloud Storage permission to run. \n", + "If KFP was deployed through K8S marketplace, please follow instructions in [the guideline](https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials)\n", + "to make sure the service account has `storage.admin` role." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!python3 -m pip install pip --upgrade --quiet --user\n", + "!python3 -m pip install kfp --upgrade --quiet --user\n", + "!python3 -m pip install tfx --upgrade --quiet --user" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this example we'll need a very recent version of TFX SDK to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n", + "\n", + "## RuntimeParameter in TFX DSL\n", + "Currently, TFX DSL only supports parameterizing field in the `PARAMETERS` section of `ComponentSpec`, see [here](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/types/component_spec.py#L126). This prevents runtime-parameterizing the pipeline topology. Also, if the declared type of the field is a protobuf, the user needs to pass in a dictionary with exactly the same names for each field, and specify one or more value as `RuntimeParameter` objects. In other word, the dictionary should be able to be passed in to [`ParseDict()` method](https://github.com/protocolbuffers/protobuf/blob/04a11fc91668884d1793bff2a0f72ee6ce4f5edd/python/google/protobuf/json_format.py#L433) and produce the correct pb message." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!python3 -m pip install --quiet --index-url https://test.pypi.org/simple/ tfx==0.16.0.dev20191212 --user" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from typing import Optional, Text\n", + "\n", + "import kfp\n", + "from kfp import dsl\n", + "\n", + "from tfx.components import Evaluator\n", + "from tfx.components import CsvExampleGen\n", + "from tfx.components import ExampleValidator\n", + "from tfx.components import ModelValidator\n", + "from tfx.components import Pusher\n", + "from tfx.components import SchemaGen\n", + "from tfx.components import StatisticsGen\n", + "from tfx.components import Trainer\n", + "from tfx.components import Transform\n", + "from tfx.orchestration import data_types\n", + "from tfx.orchestration import pipeline\n", + "from tfx.orchestration.kubeflow import kubeflow_dag_runner\n", + "from tfx.proto import pusher_pb2\n", + "from tfx.utils.dsl_utils import external_input" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# In TFX MLMD schema, pipeline name is used as the unique id of each pipeline.\n", + "# Assigning workflow ID as part of pipeline name allows the user to bypass\n", + "# some schema checks which are redundant for experimental pipelines.\n", + "pipeline_name = 'taxi_pipeline_with_parameters_' + kfp.dsl.RUN_ID_PLACEHOLDER\n", + "\n", + "# Path of pipeline data root, should be a GCS path.\n", + "# Note that when running on KFP, the pipeline root is always a runtime parameter.\n", + "pipeline_root = os.path.join('gs://my-bucket', 'tfx_taxi_simple',\n", + " kfp.dsl.RUN_ID_PLACEHOLDER)\n", + "\n", + "# Location of input data, should be a GCS path under which there is a csv file.\n", + "data_root_param = data_types.RuntimeParameter(\n", + " name='data-root',\n", + " default='gs://ml-pipeline-playground/tfx_taxi_simple/data',\n", + " ptype=Text,\n", + ")\n", + "\n", + "# Path to the module file, GCS path.\n", + "# Module file is one of the recommended way to provide customized logic for component\n", + "# includeing Trainer and Transformer.\n", + "# See https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/components/trainer/component.py#L38\n", + "taxi_module_file_param = data_types.RuntimeParameter(\n", + " name='module-file',\n", + " default='gs://ml-pipeline-playground/tfx_taxi_simple/modules/tfx_taxi_utils_1205.py',\n", + " ptype=Text,\n", + ")\n", + "\n", + "# Number of epochs in training.\n", + "train_steps = data_types.RuntimeParameter(\n", + " name='train-steps',\n", + " default=10,\n", + " ptype=int,\n", + ")\n", + "\n", + "# Number of epochs in evaluation.\n", + "eval_steps = data_types.RuntimeParameter(\n", + " name='eval-steps',\n", + " default=5,\n", + " ptype=int,\n", + ")\n", + "\n", + "# Column name for slicing.\n", + "slicing_column = data_types.RuntimeParameter(\n", + " name='slicing-column',\n", + " default='trip_start_hour',\n", + " ptype=Text,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## TFX Components\n", + "\n", + "Please refer to the [official guide](https://www.tensorflow.org/tfx/guide#tfx_pipeline_components) for the detailed explanation and purpose of each TFX component." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# The input data location is parameterized by _data_root_param\n", + "examples = external_input(data_root_param)\n", + "example_gen = CsvExampleGen(input=examples)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "statistics_gen = StatisticsGen(input_data=example_gen.outputs['examples'])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "infer_schema = SchemaGen(\n", + " stats=statistics_gen.outputs['statistics'], infer_feature_shape=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "validate_stats = ExampleValidator(\n", + " stats=statistics_gen.outputs['statistics'],\n", + " schema=infer_schema.outputs['schema'])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# The module file used in Transform and Trainer component is paramterized by\n", + "# _taxi_module_file_param.\n", + "transform = Transform(\n", + " input_data=example_gen.outputs['examples'],\n", + " schema=infer_schema.outputs['schema'],\n", + " module_file=taxi_module_file_param)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# The numbers of steps in train_args are specified as RuntimeParameter with\n", + "# name 'train-steps' and 'eval-steps', respectively.\n", + "trainer = Trainer(\n", + " module_file=taxi_module_file_param,\n", + " transformed_examples=transform.outputs['transformed_examples'],\n", + " schema=infer_schema.outputs['schema'],\n", + " transform_output=transform.outputs['transform_graph'],\n", + " train_args={'num_steps': train_steps},\n", + " eval_args={'num_steps': eval_steps})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# The name of slicing column is specified as a RuntimeParameter.\n", + "model_analyzer = Evaluator(\n", + " examples=example_gen.outputs['examples'],\n", + " model_exports=trainer.outputs['model'],\n", + " feature_slicing_spec=dict(specs=[{\n", + " 'column_for_slicing': [slicing_column]\n", + " }]))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "model_validator = ModelValidator(\n", + " examples=example_gen.outputs['examples'], model=trainer.outputs['model'])\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Currently we use this hack to ensure push_destination can\n", + "# be correctly parameterized and interpreted.\n", + "# pipeline root will be specified as a dsl.PipelineParam with the name\n", + "# pipeline-root, see:\n", + "# https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226\n", + "pipeline_root_param = dsl.PipelineParam(name='pipeline-root')\n", + "pusher = Pusher(\n", + " model_export=trainer.outputs['model'],\n", + " model_blessing=model_validator.outputs['blessing'],\n", + " push_destination=pusher_pb2.PushDestination(\n", + " filesystem=pusher_pb2.PushDestination.Filesystem(\n", + " base_directory=os.path.join(\n", + " str(pipeline_root_param), 'model_serving'))))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create the DSL pipeline object.\n", + "# This pipeline obj carries the business logic of the pipeline, but no runner-specific information\n", + "# was included.\n", + "dsl_pipeline = pipeline.Pipeline(\n", + " pipeline_name=pipeline_name,\n", + " pipeline_root=pipeline_root,\n", + " components=[\n", + " example_gen, statistics_gen, infer_schema, validate_stats, transform,\n", + " trainer, model_analyzer, model_validator, pusher\n", + " ],\n", + " enable_cache=True,\n", + " beam_pipeline_args=['--direct_num_workers=%d' % 4],\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Specify a TFX docker image. For the full list of tags please see:\n", + "# https://hub.docker.com/r/tensorflow/tfx/tags\n", + "tfx_image = 'tensorflow/tfx:0.16.0.dev20191205'\n", + "config = kubeflow_dag_runner.KubeflowDagRunnerConfig(\n", + " kubeflow_metadata_config=kubeflow_dag_runner\n", + " .get_default_kubeflow_metadata_config(),\n", + " tfx_image=tfx_image)\n", + "kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=config)\n", + "# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.\n", + "# By default it is named .tar.gz\n", + "kfp_runner.run(dsl_pipeline)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "run_result = kfp.Client(\n", + " host=''\n", + ").create_run_from_pipeline_package(\n", + " pipeline_name + '.tar.gz', \n", + " arguments={\n", + " 'pipeline-root': '' + kfp.dsl.RUN_ID_PLACEHOLDER,\n", + " 'module-file': '', # delete this line to use default module file.\n", + " 'data-root': '' # delete this line to use default data.\n", + "})" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.5.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}