From dd31142a57053e0b6f1416a3ecb4c8a94faa27f9 Mon Sep 17 00:00:00 2001 From: Niklas Hansson Date: Wed, 17 Feb 2021 23:14:30 +0100 Subject: [PATCH] fix(components): fixes issues with tfjob launcher component. (#4806) * updated sample * missed to update component --- components/kubeflow/launcher/component.yaml | 28 ++++---- components/kubeflow/launcher/sample.py | 72 ++++++++++++++------- 2 files changed, 62 insertions(+), 38 deletions(-) diff --git a/components/kubeflow/launcher/component.yaml b/components/kubeflow/launcher/component.yaml index 3bddf5ec497..1c184012901 100644 --- a/components/kubeflow/launcher/component.yaml +++ b/components/kubeflow/launcher/component.yaml @@ -1,22 +1,22 @@ name: Kubeflow - Launch TFJob description: Kubeflow TFJob launcher inputs: -- {name: Name, type: String, description: 'TFJob name.'} -- {name: Namespace, type: String, default: kubeflow, description: 'TFJob namespace.'} -- {name: Version, type: String, default: v1, description: 'TFJob version.'} -- {name: ActiveDeadlineSeconds, type: Integer, default: -1, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'} -- {name: BackoffLimit, type: Integer, default: -1, description: 'Number of retries before marking this job as failed.'} -- {name: ttl Seconds After Finished, type: Integer, default: -1, description: 'Defines the TTL for cleaning up finished TFJobs.'} -- {name: CleanPodPolicy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the TFJob completes.'} -- {name: PS Spec, type: JSON, default: '{}', description: 'TFJob ps replicaSpecs.'} -- {name: Worker Spec, type: JSON, default: '{}', description: 'TFJob worker replicaSpecs.'} -- {name: Chief Spec, type: JSON, default: '{}', description: 'TFJob chief replicaSpecs.'} -- {name: Evaluator Spec, type: JSON, default: '{}', description: 'TFJob evaluator replicaSpecs.'} -- {name: Tfjob Timeout Minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the TFJob to complete.'} -- {name: Delete Finished Tfjob, type: Bool, default: 'True' , description: 'Whether to delete the tfjob after it is finished.'} +- {name: Name, type: String, description: 'TFJob name.'} +- {name: Namespace, type: String, default: kubeflow, description: 'TFJob namespace.'} +- {name: Version, type: String, default: v1, description: 'TFJob version.'} +- {name: ActiveDeadlineSeconds, type: Integer, default: -1, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'} +- {name: BackoffLimit, type: Integer, default: -1, description: 'Number of retries before marking this job as failed.'} +- {name: ttl Seconds After Finished, type: Integer, default: -1, description: 'Defines the TTL for cleaning up finished TFJobs.'} +- {name: CleanPodPolicy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the TFJob completes.'} +- {name: PS Spec, type: JsonObject, default: '{}', description: 'TFJob ps replicaSpecs.'} +- {name: Worker Spec, type: JsonObject, default: '{}', description: 'TFJob worker replicaSpecs.'} +- {name: Chief Spec, type: JsonObject, default: '{}', description: 'TFJob chief replicaSpecs.'} +- {name: Evaluator Spec, type: JsonObject, default: '{}', description: 'TFJob evaluator replicaSpecs.'} +- {name: Tfjob Timeout Minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the TFJob to complete.'} +- {name: Delete Finished Tfjob, type: Bool, default: 'True' , description: 'Whether to delete the tfjob after it is finished.'} implementation: container: - image: liuhougangxa/kubeflow-tfjob-launcher:latest + image: nikenano/launchernew:latest command: [python, /ml/launch_tfjob.py] args: [ --name, {inputValue: Name}, diff --git a/components/kubeflow/launcher/sample.py b/components/kubeflow/launcher/sample.py index a4368190e9d..dbff3bf39fa 100644 --- a/components/kubeflow/launcher/sample.py +++ b/components/kubeflow/launcher/sample.py @@ -1,23 +1,24 @@ import json -from kfp import components import kfp.dsl as dsl +from kfp import components +from kfp.dsl.types import Integer +from typing import NamedTuple -@dsl.pipeline( - name="Launch kubeflow tfjob", - description="An example to launch tfjob." -) -def mnist_train( - name="mnist", - namespace="kubeflow", - workerNum=3, - ttlSecondsAfterFinished=-1, - tfjobTimeoutMinutes=60, - deleteAfterDone=False): - tfjob_launcher_op = components.load_component_from_file("./component.yaml") - # tfjob_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/launcher/component.yaml') + +def create_worker_spec(workerNum: int=0) -> NamedTuple( + 'CreatWorkerSpec', + [ + ('worker_spec', dict), + ]): + """ + Creates tf-job worker spec + """ + + worker = {} - chief = { - "replicas": 1, + if workerNum > 0: + worker = { + "replicas": workerNum , "restartPolicy": "OnFailure", "template": { "spec": { @@ -28,7 +29,7 @@ def mnist_train( "/opt/model.py" ], "args": [ - "--tf-train-steps=6000" + "--tf-train-steps=60" ], "image": "liuhougangxa/tf-estimator-mnist", "name": "tensorflow", @@ -37,10 +38,30 @@ def mnist_train( } } } - worker = {} - if workerNum > 0: - worker = { - "replicas": workerNum, + from collections import namedtuple + worker_spec_output = namedtuple( + 'MyWorkerOutput', + ['worker_spec']) + return worker_spec_output(worker) + +worker_spec_op = components.func_to_container_op( + create_worker_spec, base_image='tensorflow/tensorflow:1.11.0-py3') + +@dsl.pipeline( + name="Launch kubeflow tfjob", + description="An example to launch tfjob." +) +def mnist_train(name: str="mnist", + namespace: str="kubeflow", + workerNum: int=3, + ttlSecondsAfterFinished: int=-1, + tfjobTimeoutMinutes: int=60, + deleteAfterDone =False): + tfjob_launcher_op = components.load_component_from_file("./component.yaml") + # tfjob_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/launcher/component.yaml') + + chief = { + "replicas": 1, "restartPolicy": "OnFailure", "template": { "spec": { @@ -51,7 +72,7 @@ def mnist_train( "/opt/model.py" ], "args": [ - "--tf-train-steps=6000" + "--tf-train-steps=60" ], "image": "liuhougangxa/tf-estimator-mnist", "name": "tensorflow", @@ -60,11 +81,14 @@ def mnist_train( } } } + + worker_spec_create = worker_spec_op(workerNum) + tfjob_launcher_op( name=name, namespace=namespace, ttl_seconds_after_finished=ttlSecondsAfterFinished, - worker_spec=worker, + worker_spec=worker_spec_create.outputs['worker_spec'], chief_spec=chief, tfjob_timeout_minutes=tfjobTimeoutMinutes, delete_finished_tfjob=deleteAfterDone @@ -72,4 +96,4 @@ def mnist_train( if __name__ == "__main__": import kfp.compiler as compiler - compiler.Compiler().compile(mnist_train, __file__ + ".tar.gz") + compiler.Compiler().compile(mnist_train, __file__ + ".tar.gz") \ No newline at end of file