diff --git a/sdk/README.md b/sdk/README.md index 789a9198d71..7aebbddafe6 100644 --- a/sdk/README.md +++ b/sdk/README.md @@ -81,3 +81,20 @@ Here we update the `Compiler` of the KFP SDK to generate `Tekton` YAML for a bas ## Test Kubeflow Pipelines with Tekton Please [refer to the instructions here](./python/tests/README.md) as you work on a PR test sample Kubeflow Pipelines in their test data folder to ensure your PR is improving the number of successful samples + +## Compile Kubeflow Pipelines as Tekton pipelineRun + +By default, Tekton pipelineRun is generated by the `tkn` CLI so that users can interactively change their pipeline parameters during each execution. However, `tkn` CLI is lagging several important features when generating pipelineRun. Therefore, we added support for generating pipelineRun using `dsl-compile-tekton` with all the latest kfp-tekton compiler features. The comparison between Tekton pipeline and Argo workflow is described in our [design docs](https://docs.google.com/document/d/1oXOdiItI4GbEe_qzyBmMAqfLBjfYX1nM94WHY3EPa94/edit#heading=h.f38y0bqkxo87). + +Compiling Kubeflow Pipelines into Tekton pipelineRun is currently under the experimental stage. [Here](https://github.com/tektoncd/pipeline/blob/master/docs/pipelineruns.md) is the list of supported features in pipelineRun. + +As of today, the below pipelineRun features are available within `dsl-compile-tekton`: +- Affinity +- Node Selector +- Tolerations + +To compile Kubeflow Pipelines as Tekton pipelineRun, simply add the `--generate-pipelinerun` as part of your `dsl-compile-tekton`commands. e.g. +- `dsl-compile-tekton --py sdk/python/tests/compiler/testdata/tolerations.py --output pipeline.yaml --generate-pipelinerun` + +## Troubleshooting +- Please be aware that defined Affinity, Node Selector, and Tolerations are applied to all the tasks in the same pipeline because there's only one podTemplate allowed in each pipeline. \ No newline at end of file diff --git a/sdk/python/kfp_tekton/compiler/_op_to_template.py b/sdk/python/kfp_tekton/compiler/_op_to_template.py index 739ce31e243..79724868db2 100644 --- a/sdk/python/kfp_tekton/compiler/_op_to_template.py +++ b/sdk/python/kfp_tekton/compiler/_op_to_template.py @@ -152,21 +152,6 @@ def _op_to_template(op: BaseOp): # NOTE: the following features are still under development # ********************************************************** - # node selector - if processed_op.node_selector: - raise NotImplementedError("'nodeSelector' is not (yet) implemented") - template['nodeSelector'] = processed_op.node_selector - - # tolerations - if processed_op.tolerations: - raise NotImplementedError("'tolerations' is not (yet) implemented") - template['tolerations'] = processed_op.tolerations - - # affinity - if processed_op.affinity: - raise NotImplementedError("'affinity' is not (yet) implemented") - template['affinity'] = convert_k8s_obj_to_json(processed_op.affinity) - # metadata if processed_op.pod_annotations or processed_op.pod_labels: template.setdefault('metadata', {}) # Tekton change, don't wipe out existing metadata diff --git a/sdk/python/kfp_tekton/compiler/compiler.py b/sdk/python/kfp_tekton/compiler/compiler.py index ce09f74f1c5..b5b3081627f 100644 --- a/sdk/python/kfp_tekton/compiler/compiler.py +++ b/sdk/python/kfp_tekton/compiler/compiler.py @@ -26,6 +26,7 @@ from kfp.compiler.compiler import Compiler from kfp.components.structures import InputSpec from kfp.dsl._metadata import _extract_pipeline_metadata +from kfp.compiler._k8s_helper import convert_k8s_obj_to_json from .. import tekton_api_version @@ -136,7 +137,7 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli task['timeout'] = '%ds' % op.timeout # generate the Tekton Pipeline document - pipeline = { + pipeline_template = { 'apiVersion': tekton_api_version, 'kind': 'Pipeline', 'metadata': { @@ -149,32 +150,48 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli } # append Task and Pipeline documents - workflow = tasks + [pipeline] + workflow = tasks + [pipeline_template] # Generate pipelinerun if generate-pipelinerun flag is enabled + # The base templete is generated first and then insert optional parameters. if self.generate_pipelinerun: pipelinerun = { 'apiVersion': tekton_api_version, 'kind': 'PipelineRun', 'metadata': { - 'name': pipeline['metadata']['name'] + '-run' + 'name': pipeline_template['metadata']['name'] + '-run' }, 'spec': { 'params': [{ 'name': p['name'], 'value': p['default'] - } for p in pipeline['spec']['params'] + } for p in pipeline_template['spec']['params'] ], 'pipelineRef': { - 'name': pipeline['metadata']['name'] + 'name': pipeline_template['metadata']['name'] } } } + + pod_template = {} + for task in task_refs: + op = pipeline.ops.get(task['name']) + if op.affinity: + pod_template['affinity'] = convert_k8s_obj_to_json(op.affinity) + if op.tolerations: + pod_template['tolerations'] = pod_template.get('tolerations', []) + op.tolerations + if op.node_selector: + pod_template['nodeSelector'] = op.node_selector + + if pod_template: + pipelinerun['spec']['podtemplate'] = pod_template + # add workflow level timeout to pipeline run if pipeline_conf.timeout: pipelinerun['spec']['timeout'] = '%ds' % pipeline_conf.timeout + workflow = workflow + [pipelinerun] return workflow # Tekton change, from return type Dict[Text, Any] to List[Dict[Text, Any]] diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index b4b23eb1fac..9ab393d6428 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -98,6 +98,27 @@ def test_timeout_workflow(self): from .testdata.timeout import timeout_sample_pipeline self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout.yaml') + def test_tolerations_workflow(self): + """ + Test compiling a tolerations workflow. + """ + from .testdata.tolerations import tolerations + self._test_pipeline_workflow(tolerations, 'tolerations.yaml', generate_pipelinerun=True) + + def test_affinity_workflow(self): + """ + Test compiling a affinity workflow. + """ + from .testdata.affinity import affinity_pipeline + self._test_pipeline_workflow(affinity_pipeline, 'affinity.yaml', generate_pipelinerun=True) + + def test_node_selector_workflow(self): + """ + Test compiling a node selector workflow. + """ + from .testdata.node_selector import node_selector_pipeline + self._test_pipeline_workflow(node_selector_pipeline, 'node_selector.yaml', generate_pipelinerun=True) + def _test_pipeline_workflow(self, pipeline_function, pipeline_yaml, generate_pipelinerun=False): test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml) diff --git a/sdk/python/tests/compiler/testdata/affinity.py b/sdk/python/tests/compiler/testdata/affinity.py new file mode 100644 index 00000000000..4e611de6993 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/affinity.py @@ -0,0 +1,45 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kubernetes.client import V1Affinity, V1NodeSelector, V1NodeSelectorRequirement, V1NodeSelectorTerm, V1NodeAffinity +from kfp.dsl import ContainerOp +from kfp import dsl + + +def some_op(): + return dsl.ContainerOp( + name='sleep', + image='busybox', + command=['sleep 1'], + ) + +@dsl.pipeline( + name='affinity', + description='A pipeline with affinity' +) +def affinity_pipeline( +): + """A pipeline with affinity""" + affinity = V1Affinity( + node_affinity=V1NodeAffinity( + required_during_scheduling_ignored_during_execution=V1NodeSelector( + node_selector_terms=[V1NodeSelectorTerm( + match_expressions=[V1NodeSelectorRequirement( + key='beta.kubernetes.io/instance-type', operator='In', values=['p2.xlarge'])])]))) + some_op().add_affinity(affinity) + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(affinity_pipeline, __file__.replace('.py', '.yaml'), generate_pipelinerun=True) diff --git a/sdk/python/tests/compiler/testdata/affinity.yaml b/sdk/python/tests/compiler/testdata/affinity.yaml new file mode 100644 index 00000000000..2226c2d28b6 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/affinity.yaml @@ -0,0 +1,58 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: sleep +spec: + steps: + - command: + - sleep 1 + image: busybox + name: sleep +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with affinity", + "name": "affinity"}' + name: affinity +spec: + params: [] + tasks: + - name: sleep + params: [] + taskRef: + name: sleep +--- +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: affinity-run +spec: + params: [] + pipelineRef: + name: affinity + podtemplate: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: beta.kubernetes.io/instance-type + operator: In + values: + - p2.xlarge diff --git a/sdk/python/tests/compiler/testdata/node_selector.py b/sdk/python/tests/compiler/testdata/node_selector.py new file mode 100644 index 00000000000..1a6ebf9f58d --- /dev/null +++ b/sdk/python/tests/compiler/testdata/node_selector.py @@ -0,0 +1,38 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp.dsl import ContainerOp +from kfp import dsl + + +def some_op(): + return dsl.ContainerOp( + name='sleep', + image='busybox', + command=['sleep 1'], + ) + +@dsl.pipeline( + name='node_selector', + description='A pipeline with Node Selector' +) +def node_selector_pipeline( +): + """A pipeline with Node Selector""" + some_op().add_node_selector_constraint('accelerator', 'nvidia-tesla-k80') + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(node_selector_pipeline, __file__.replace('.py', '.yaml'), generate_pipelinerun=True) diff --git a/sdk/python/tests/compiler/testdata/node_selector.yaml b/sdk/python/tests/compiler/testdata/node_selector.yaml new file mode 100644 index 00000000000..9305ea58e59 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/node_selector.yaml @@ -0,0 +1,51 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: sleep +spec: + steps: + - command: + - sleep 1 + image: busybox + name: sleep +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with Node Selector", + "name": "node_selector"}' + name: node-selector +spec: + params: [] + tasks: + - name: sleep + params: [] + taskRef: + name: sleep +--- +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: node-selector-run +spec: + params: [] + pipelineRef: + name: node-selector + podtemplate: + nodeSelector: + accelerator: nvidia-tesla-k80 diff --git a/sdk/python/tests/compiler/testdata/tolerations.py b/sdk/python/tests/compiler/testdata/tolerations.py new file mode 100644 index 00000000000..1b3b34321a7 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/tolerations.py @@ -0,0 +1,40 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kubernetes.client import V1Toleration +from kfp.dsl import ContainerOp +from kfp import dsl + +@dsl.pipeline( + name='tolerations', + description='A pipeline with tolerations' +) +def tolerations( +): + """A pipeline with tolerations""" + op1 = dsl.ContainerOp( + name='download', + image='busybox', + command=['sh', '-c'], + arguments=['sleep 10; wget localhost:5678 -O /tmp/results.txt'], + file_outputs={'downloaded': '/tmp/results.txt'})\ + .add_toleration(V1Toleration(effect='NoSchedule', + key='gpu', + operator='Equal', + value='run')) + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(tolerations, __file__.replace('.py', '.yaml'), generate_pipelinerun=True) diff --git a/sdk/python/tests/compiler/testdata/tolerations.yaml b/sdk/python/tests/compiler/testdata/tolerations.yaml new file mode 100644 index 00000000000..4f0ad1508a4 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/tolerations.yaml @@ -0,0 +1,60 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: download +spec: + results: + - description: /tmp/results.txt + name: downloaded + steps: + - args: + - sleep 10; wget localhost:5678 -O $(results.downloaded.path) + command: + - sh + - -c + image: busybox + name: download +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with tolerations", + "name": "tolerations"}' + name: tolerations +spec: + params: [] + tasks: + - name: download + params: [] + taskRef: + name: download +--- +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: tolerations-run +spec: + params: [] + pipelineRef: + name: tolerations + podtemplate: + tolerations: + - effect: NoSchedule + key: gpu + operator: Equal + value: run