Skip to content

Commit

Permalink
Samples - Moved secret application to the pipeline definition (#536)
Browse files Browse the repository at this point in the history
* Samples - Moved use_gcp_secret application to the pipeline definition

* Also changed the notebooks

* Addressed the formatting feedback
  • Loading branch information
Ark-kun authored and k8s-ci-robot committed Dec 14, 2018
1 parent 0c0d8a2 commit dd24c80
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 36 deletions.
16 changes: 8 additions & 8 deletions samples/kubeflow-tf/kubeflow-training-classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', sc
'--output', transform_output,
],
file_outputs = {'transformed': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)


def kubeflow_tf_training_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'):
Expand All @@ -50,7 +50,7 @@ def kubeflow_tf_training_op(transformed_data_dir, schema: 'GcsUri[text/json]', l
'--job-dir', training_output,
],
file_outputs = {'train': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):
return dsl.ContainerOp(
Expand All @@ -66,7 +66,7 @@ def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]
'--output', prediction_output,
],
file_outputs = {'prediction': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

def confusion_matrix_op(predictions, output, step_name='confusionmatrix'):
return dsl.ContainerOp(
Expand All @@ -76,7 +76,7 @@ def confusion_matrix_op(predictions, output, step_name='confusionmatrix'):
'--predictions', predictions,
'--output', output,
]
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

@dsl.pipeline(
name='Pipeline TFJob',
Expand All @@ -97,10 +97,10 @@ def kubeflow_training(output, project,
# TODO: use the argo job name as the workflow
workflow = '{{workflow.name}}'

preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, '', '%s/%s/transformed' % (output, workflow))
training = kubeflow_tf_training_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, '', '%s/%s/train' % (output, workflow))
prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, '%s/%s/predict' % (output, workflow))
confusion_matrix = confusion_matrix_op(prediction.output, '%s/%s/confusionmatrix' % (output, workflow))
preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, '', '%s/%s/transformed' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa'))
training = kubeflow_tf_training_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, '', '%s/%s/train' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa'))
prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, '%s/%s/predict' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa'))
confusion_matrix = confusion_matrix_op(prediction.output, '%s/%s/confusionmatrix' % (output, workflow)).apply(gcp.use_gcp_secret('user-gcp-sa'))


if __name__ == '__main__':
Expand Down
30 changes: 15 additions & 15 deletions samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@
" 'output': '/output.txt',\n",
" 'schema': '/output_schema.json',\n",
" }\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" )\n",
"\n",
"def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'):\n",
" return dsl.ContainerOp(\n",
Expand All @@ -255,7 +255,7 @@
" '--output', transform_output,\n",
" ],\n",
" file_outputs = {'transformed': '/output.txt'}\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" )\n",
"\n",
"\n",
"def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'):\n",
Expand All @@ -273,7 +273,7 @@
" '--job-dir', training_output,\n",
" ],\n",
" file_outputs = {'train': '/output.txt'}\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" )\n",
"\n",
"def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'):\n",
" return dsl.ContainerOp(\n",
Expand All @@ -289,7 +289,7 @@
" '--output', analysis_output,\n",
" ],\n",
" file_outputs = {'analysis': '/output.txt'}\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" )\n",
"\n",
"\n",
"def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):\n",
Expand All @@ -306,7 +306,7 @@
" '--output', prediction_output,\n",
" ],\n",
" file_outputs = {'prediction': '/output.txt'}\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" )\n",
"\n",
"def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='deploy'):\n",
" return dsl.ContainerOp(\n",
Expand All @@ -316,7 +316,7 @@
" '--model-path', model,\n",
" '--server-name', tf_server_name\n",
" ]\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" )\n",
"\n",
"\n",
"# The pipeline definition\n",
Expand Down Expand Up @@ -352,11 +352,11 @@
" validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, validation_mode, validation_output)\n",
" schema = '%s/schema.json' % validation.outputs['output']\n",
"\n",
" preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, preprocess_module, transform_output)\n",
" training = tf_train_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, preprocess_module, training_output)\n",
" analysis = dataflow_tf_model_analyze_op(training.output, evaluation, schema, project, analyze_mode, analyze_slice_column, analysis_output)\n",
" prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, prediction_output)\n",
" deploy = kubeflow_deploy_op(training.output, tf_server_name)"
" preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, preprocess_module, transform_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" training = tf_train_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, preprocess_module, training_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" analysis = dataflow_tf_model_analyze_op(training.output, evaluation, schema, project, analyze_mode, analyze_slice_column, analysis_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, prediction_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" deploy = kubeflow_deploy_op(training.output, tf_server_name).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
Expand Down Expand Up @@ -744,10 +744,10 @@
" validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, validation_mode, validation_output)\n",
" schema = '%s/schema.json' % validation.outputs['output']\n",
"\n",
" preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, preprocess_module, transform_output)\n",
" training = tf_train_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, preprocess_module, training_output)\n",
" analysis = dataflow_tf_model_analyze_op(training.output, evaluation, schema, project, analyze_mode, analyze_slice_column, analysis_output)\n",
" prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, prediction_output)\n",
" preprocess = dataflow_tf_transform_op(train, evaluation, schema, project, preprocess_mode, preprocess_module, transform_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" training = tf_train_op(preprocess.output, schema, learning_rate, hidden_layer_size, steps, target, preprocess_module, training_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" analysis = dataflow_tf_model_analyze_op(training.output, evaluation, schema, project, analyze_mode, analyze_slice_column, analysis_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" prediction = dataflow_tf_predict_op(evaluation, schema, target, training.output, predict_mode, project, prediction_output).apply(gcp.use_gcp_secret('user-gcp-sa'))\n",
" \n",
" # The new deployer. Note that the DeployerOp interface is similar to the function \"deploy_model\".\n",
" deploy = DeployerOp(gcp_project=project, model_dot_version=model, runtime='1.9', model_path=training.output).apply(gcp.use_gcp_secret('user-gcp-sa'))"
Expand Down
32 changes: 19 additions & 13 deletions samples/tfx/taxi-cab-classification-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'G
'schema': '/schema.txt',
'validation': '/output_validation_result.txt',
}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'):
return dsl.ContainerOp(
Expand All @@ -51,7 +51,7 @@ def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', sc
'--output', '%s/{{workflow.name}}/transformed' % transform_output,
],
file_outputs = {'transformed': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)


def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'):
Expand All @@ -69,7 +69,7 @@ def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate
'--job-dir', '%s/{{workflow.name}}/train' % training_output,
],
file_outputs = {'train': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)

def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'):
return dsl.ContainerOp(
Expand All @@ -85,7 +85,7 @@ def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'Gc
'--output', '%s/{{workflow.name}}/analysis' % analysis_output,
],
file_outputs = {'analysis': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)


def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):
Expand All @@ -102,7 +102,7 @@ def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]
'--output', '%s/{{workflow.name}}/predict' % prediction_output,
],
file_outputs = {'prediction': '/output.txt'}
).apply(gcp.use_gcp_secret('user-gcp-sa'))
)


def confusion_matrix_op(predictions: 'GcsUri', output: 'GcsUri', step_name='confusion_matrix'):
Expand Down Expand Up @@ -157,18 +157,24 @@ def taxi_cab_classification(
analyze_slice_column='trip_start_hour'):

tf_server_name = 'taxi-cab-classification-model-{{workflow.name}}'
validation = dataflow_tf_data_validation_op(train, evaluation, column_names, key_columns, project, mode, output)
validation = dataflow_tf_data_validation_op(train, evaluation, column_names,
key_columns, project, mode, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
preprocess = dataflow_tf_transform_op(train, evaluation, validation.outputs['schema'],
project, mode, preprocess_module, output)
project, mode, preprocess_module, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate,
hidden_layer_size, steps, 'tips', preprocess_module, output)
hidden_layer_size, steps, 'tips', preprocess_module, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
analysis = dataflow_tf_model_analyze_op(training.output, evaluation,
validation.outputs['schema'], project, mode, analyze_slice_column, output)
validation.outputs['schema'], project, mode, analyze_slice_column, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
prediction = dataflow_tf_predict_op(evaluation, validation.outputs['schema'], 'tips',
training.output, mode, project, output)
cm = confusion_matrix_op(prediction.output, output)
roc = roc_op(prediction.output, output)
deploy = kubeflow_deploy_op(training.output, tf_server_name)
training.output, mode, project, output
).apply(gcp.use_gcp_secret('user-gcp-sa'))
cm = confusion_matrix_op(prediction.output, output).apply(gcp.use_gcp_secret('user-gcp-sa'))
roc = roc_op(prediction.output, output).apply(gcp.use_gcp_secret('user-gcp-sa'))
deploy = kubeflow_deploy_op(training.output, tf_server_name).apply(gcp.use_gcp_secret('user-gcp-sa'))

if __name__ == '__main__':
import kfp.compiler as compiler
Expand Down

0 comments on commit dd24c80

Please sign in to comment.