Skip to content

Commit

Permalink
fix(components): fixes issues with tfjob launcher component. (kubeflo…
Browse files Browse the repository at this point in the history
…w#4806)

* updated sample

* missed to update component
  • Loading branch information
NikeNano committed Feb 17, 2021
1 parent 12dcd24 commit dd31142
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 38 deletions.
28 changes: 14 additions & 14 deletions components/kubeflow/launcher/component.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
name: Kubeflow - Launch TFJob
description: Kubeflow TFJob launcher
inputs:
- {name: Name, type: String, description: 'TFJob name.'}
- {name: Namespace, type: String, default: kubeflow, description: 'TFJob namespace.'}
- {name: Version, type: String, default: v1, description: 'TFJob version.'}
- {name: ActiveDeadlineSeconds, type: Integer, default: -1, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'}
- {name: BackoffLimit, type: Integer, default: -1, description: 'Number of retries before marking this job as failed.'}
- {name: ttl Seconds After Finished, type: Integer, default: -1, description: 'Defines the TTL for cleaning up finished TFJobs.'}
- {name: CleanPodPolicy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the TFJob completes.'}
- {name: PS Spec, type: JSON, default: '{}', description: 'TFJob ps replicaSpecs.'}
- {name: Worker Spec, type: JSON, default: '{}', description: 'TFJob worker replicaSpecs.'}
- {name: Chief Spec, type: JSON, default: '{}', description: 'TFJob chief replicaSpecs.'}
- {name: Evaluator Spec, type: JSON, default: '{}', description: 'TFJob evaluator replicaSpecs.'}
- {name: Tfjob Timeout Minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the TFJob to complete.'}
- {name: Delete Finished Tfjob, type: Bool, default: 'True' , description: 'Whether to delete the tfjob after it is finished.'}
- {name: Name, type: String, description: 'TFJob name.'}
- {name: Namespace, type: String, default: kubeflow, description: 'TFJob namespace.'}
- {name: Version, type: String, default: v1, description: 'TFJob version.'}
- {name: ActiveDeadlineSeconds, type: Integer, default: -1, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'}
- {name: BackoffLimit, type: Integer, default: -1, description: 'Number of retries before marking this job as failed.'}
- {name: ttl Seconds After Finished, type: Integer, default: -1, description: 'Defines the TTL for cleaning up finished TFJobs.'}
- {name: CleanPodPolicy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the TFJob completes.'}
- {name: PS Spec, type: JsonObject, default: '{}', description: 'TFJob ps replicaSpecs.'}
- {name: Worker Spec, type: JsonObject, default: '{}', description: 'TFJob worker replicaSpecs.'}
- {name: Chief Spec, type: JsonObject, default: '{}', description: 'TFJob chief replicaSpecs.'}
- {name: Evaluator Spec, type: JsonObject, default: '{}', description: 'TFJob evaluator replicaSpecs.'}
- {name: Tfjob Timeout Minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the TFJob to complete.'}
- {name: Delete Finished Tfjob, type: Bool, default: 'True' , description: 'Whether to delete the tfjob after it is finished.'}
implementation:
container:
image: liuhougangxa/kubeflow-tfjob-launcher:latest
image: nikenano/launchernew:latest
command: [python, /ml/launch_tfjob.py]
args: [
--name, {inputValue: Name},
Expand Down
72 changes: 48 additions & 24 deletions components/kubeflow/launcher/sample.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import json
from kfp import components
import kfp.dsl as dsl
from kfp import components
from kfp.dsl.types import Integer
from typing import NamedTuple

@dsl.pipeline(
name="Launch kubeflow tfjob",
description="An example to launch tfjob."
)
def mnist_train(
name="mnist",
namespace="kubeflow",
workerNum=3,
ttlSecondsAfterFinished=-1,
tfjobTimeoutMinutes=60,
deleteAfterDone=False):
tfjob_launcher_op = components.load_component_from_file("./component.yaml")
# tfjob_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/launcher/component.yaml')

def create_worker_spec(workerNum: int=0) -> NamedTuple(
'CreatWorkerSpec',
[
('worker_spec', dict),
]):
"""
Creates tf-job worker spec
"""

worker = {}

chief = {
"replicas": 1,
if workerNum > 0:
worker = {
"replicas": workerNum ,
"restartPolicy": "OnFailure",
"template": {
"spec": {
Expand All @@ -28,7 +29,7 @@ def mnist_train(
"/opt/model.py"
],
"args": [
"--tf-train-steps=6000"
"--tf-train-steps=60"
],
"image": "liuhougangxa/tf-estimator-mnist",
"name": "tensorflow",
Expand All @@ -37,10 +38,30 @@ def mnist_train(
}
}
}
worker = {}
if workerNum > 0:
worker = {
"replicas": workerNum,
from collections import namedtuple
worker_spec_output = namedtuple(
'MyWorkerOutput',
['worker_spec'])
return worker_spec_output(worker)

worker_spec_op = components.func_to_container_op(
create_worker_spec, base_image='tensorflow/tensorflow:1.11.0-py3')

@dsl.pipeline(
name="Launch kubeflow tfjob",
description="An example to launch tfjob."
)
def mnist_train(name: str="mnist",
namespace: str="kubeflow",
workerNum: int=3,
ttlSecondsAfterFinished: int=-1,
tfjobTimeoutMinutes: int=60,
deleteAfterDone =False):
tfjob_launcher_op = components.load_component_from_file("./component.yaml")
# tfjob_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/launcher/component.yaml')

chief = {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"spec": {
Expand All @@ -51,7 +72,7 @@ def mnist_train(
"/opt/model.py"
],
"args": [
"--tf-train-steps=6000"
"--tf-train-steps=60"
],
"image": "liuhougangxa/tf-estimator-mnist",
"name": "tensorflow",
Expand All @@ -60,16 +81,19 @@ def mnist_train(
}
}
}

worker_spec_create = worker_spec_op(workerNum)

tfjob_launcher_op(
name=name,
namespace=namespace,
ttl_seconds_after_finished=ttlSecondsAfterFinished,
worker_spec=worker,
worker_spec=worker_spec_create.outputs['worker_spec'],
chief_spec=chief,
tfjob_timeout_minutes=tfjobTimeoutMinutes,
delete_finished_tfjob=deleteAfterDone
)

if __name__ == "__main__":
import kfp.compiler as compiler
compiler.Compiler().compile(mnist_train, __file__ + ".tar.gz")
compiler.Compiler().compile(mnist_train, __file__ + ".tar.gz")

0 comments on commit dd31142

Please sign in to comment.