Skip to content

Commit

Permalink
Updated the "TFX Taxi Cab Classification Pipeline" sample (#1115)
Browse files Browse the repository at this point in the history
Modernized the sample pipeline code.
  • Loading branch information
Ark-kun authored and k8s-ci-robot committed Apr 19, 2019
1 parent 07faa08 commit abfdd29
Showing 1 changed file with 94 additions and 146 deletions.
240 changes: 94 additions & 146 deletions samples/tfx/taxi-cab-classification-pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
# Copyright 2018 Google LLC
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,128 +14,22 @@
# limitations under the License.


import kfp.dsl as dsl
import kfp.gcp as gcp
import datetime

def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'GcsUri', column_names: 'GcsUri[text/json]', key_columns, project: 'GcpProject', mode, validation_output: 'GcsUri[Directory]', step_name='validation'):
return dsl.ContainerOp(
name = step_name,
image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:e20fad3e161e88226c83437271adb063221459b9',
arguments = [
'--csv-data-for-inference', inference_data,
'--csv-data-to-validate', validation_data,
'--column-names', column_names,
'--key-columns', key_columns,
'--project', project,
'--mode', mode,
'--output', '%s/{{workflow.name}}/validation' % validation_output,
],
file_outputs = {
'schema': '/schema.txt',
'validation': '/output_validation_result.txt',
}
)

def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'):
return dsl.ContainerOp(
name = step_name,
image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:e20fad3e161e88226c83437271adb063221459b9',
arguments = [
'--train', train_data,
'--eval', evaluation_data,
'--schema', schema,
'--project', project,
'--mode', preprocess_mode,
'--preprocessing-module', preprocess_module,
'--output', '%s/{{workflow.name}}/transformed' % transform_output,
],
file_outputs = {'transformed': '/output.txt'}
)


def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'):
return dsl.ContainerOp(
name = step_name,
image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:e20fad3e161e88226c83437271adb063221459b9',
arguments = [
'--transformed-data-dir', transformed_data_dir,
'--schema', schema,
'--learning-rate', learning_rate,
'--hidden-layer-size', hidden_layer_size,
'--steps', steps,
'--target', target,
'--preprocessing-module', preprocess_module,
'--job-dir', '%s/{{workflow.name}}/train' % training_output,
],
file_outputs = {'train': '/output.txt'}
)

def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'):
return dsl.ContainerOp(
name = step_name,
image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:e20fad3e161e88226c83437271adb063221459b9',
arguments = [
'--model', model,
'--eval', evaluation_data,
'--schema', schema,
'--project', project,
'--mode', analyze_mode,
'--slice-columns', analyze_slice_column,
'--output', '%s/{{workflow.name}}/analysis' % analysis_output,
],
file_outputs = {'analysis': '/output.txt'}
)


def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):
return dsl.ContainerOp(
name = step_name,
image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:e20fad3e161e88226c83437271adb063221459b9',
arguments = [
'--data', evaluation_data,
'--schema', schema,
'--target', target,
'--model', model,
'--mode', predict_mode,
'--project', project,
'--output', '%s/{{workflow.name}}/predict' % prediction_output,
],
file_outputs = {'prediction': '/output.txt'}
)


def confusion_matrix_op(predictions: 'GcsUri', output: 'GcsUri', step_name='confusion_matrix'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:e20fad3e161e88226c83437271adb063221459b9',
arguments=[
'--output', '%s/{{workflow.name}}/confusionmatrix' % output,
'--predictions', predictions,
'--target_lambda', """lambda x: (x['target'] > x['fare'] * 0.2)""",
])


def roc_op(predictions: 'GcsUri', output: 'GcsUri', step_name='roc'):
return dsl.ContainerOp(
name=step_name,
image='gcr.io/ml-pipeline/ml-pipeline-local-roc:e20fad3e161e88226c83437271adb063221459b9',
arguments=[
'--output', '%s/{{workflow.name}}/roc' % output,
'--predictions', predictions,
'--target_lambda', """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0""",
])


def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='deploy'):
return dsl.ContainerOp(
name = step_name,
image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:e20fad3e161e88226c83437271adb063221459b9',
arguments = [
'--model-export-path', '%s/export/export' % model,
'--server-name', tf_server_name
]
)
import kfp
from kfp import components
from kfp import dsl
from kfp import gcp


dataflow_tf_data_validation_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/dataflow/tfdv/component.yaml')
dataflow_tf_transform_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/dataflow/tft/component.yaml')
tf_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/kubeflow/dnntrainer/component.yaml')
dataflow_tf_model_analyze_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/dataflow/tfma/component.yaml')
dataflow_tf_predict_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/dataflow/predict/component.yaml')

confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/local/confusion_matrix/component.yaml')
roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/local/roc/component.yaml')

kubeflow_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/785d474699cffb7463986b9abc4b1fbe03796cb6/components/kubeflow/deployer/component.yaml')


@dsl.pipeline(
Expand All @@ -154,28 +48,82 @@ def taxi_cab_classification(
learning_rate=0.1,
hidden_layer_size='1500',
steps=3000,
analyze_slice_column='trip_start_hour'):

tf_server_name = 'taxi-cab-classification-model-{{workflow.uid}}'
validation = dataflow_tf_data_validation_op(train, evaluation, column_names,
key_columns, project, mode, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
preprocess = dataflow_tf_transform_op(train, evaluation, validation.outputs['schema'],
project, mode, preprocess_module, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate,
hidden_layer_size, steps, 'tips', preprocess_module, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
analysis = dataflow_tf_model_analyze_op(training.output, evaluation,
validation.outputs['schema'], project, mode, analyze_slice_column, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
prediction = dataflow_tf_predict_op(evaluation, validation.outputs['schema'], 'tips',
training.output, mode, project, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
cm = confusion_matrix_op(prediction.output, output).apply(gcp.use_gcp_secret('user-gcp-sa'))
roc = roc_op(prediction.output, output).apply(gcp.use_gcp_secret('user-gcp-sa'))
deploy = kubeflow_deploy_op(training.output, tf_server_name).apply(gcp.use_gcp_secret('user-gcp-sa'))
analyze_slice_column='trip_start_hour'
):
output_template = str(output) + '/{{workflow.uid}}/{{pod.name}}/data'
target_lambda = """lambda x: (x['target'] > x['fare'] * 0.2)"""
target_class_lambda = """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0"""

tf_server_name = 'taxi-cab-classification-model-{{workflow.uid}}'

validation = dataflow_tf_data_validation_op(
inference_data=train,
validation_data=evaluation,
column_names=column_names,
key_columns=key_columns,
gcp_project=project,
run_mode=mode,
validation_output=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))

preprocess = dataflow_tf_transform_op(
training_data_file_pattern=train,
evaluation_data_file_pattern=evaluation,
schema=validation.outputs['schema'],
gcp_project=project,
run_mode=mode,
preprocessing_module=preprocess_module,
transformed_data_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))

training = tf_train_op(
transformed_data_dir=preprocess.output,
schema=validation.outputs['schema'],
learning_rate=learning_rate,
hidden_layer_size=hidden_layer_size,
steps=steps,
target='tips',
preprocessing_module=preprocess_module,
training_output_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))

analysis = dataflow_tf_model_analyze_op(
model=training.output,
evaluation_data=evaluation,
schema=validation.outputs['schema'],
gcp_project=project,
run_mode=mode,
slice_columns=analyze_slice_column,
analysis_results_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))

prediction = dataflow_tf_predict_op(
data_file_pattern=evaluation,
schema=validation.outputs['schema'],
target_column='tips',
model=training.output,
run_mode=mode,
gcp_project=project,
predictions_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))

cm = confusion_matrix_op(
predictions=prediction.output,
target_lambda=target_lambda,
output_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))

roc = roc_op(
predictions_dir=prediction.output,
target_lambda=target_class_lambda,
output_dir=output_template
).apply(gcp.use_gcp_secret('user-gcp-sa'))

deploy = kubeflow_deploy_op(
model_dir=str(training.output) + '/export/export',
server_name=tf_server_name
).apply(gcp.use_gcp_secret('user-gcp-sa'))


if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(taxi_cab_classification, __file__ + '.zip')
kfp.compiler.Compiler().compile(taxi_cab_classification, __file__ + '.zip')

0 comments on commit abfdd29

Please sign in to comment.