Skip to content

Commit

Permalink
Add more k8s resource support (#65)
Browse files Browse the repository at this point in the history
* add more k8s resource support

* add test main functions as requested

* add description on why we need pipelinerun
  • Loading branch information
Tomcli authored Mar 30, 2020
1 parent 82eb831 commit f088d2a
Show file tree
Hide file tree
Showing 10 changed files with 352 additions and 20 deletions.
17 changes: 17 additions & 0 deletions sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,20 @@ Here we update the `Compiler` of the KFP SDK to generate `Tekton` YAML for a bas
## Test Kubeflow Pipelines with Tekton
Please [refer to the instructions here](./python/tests/README.md) as you work on a PR test sample Kubeflow Pipelines in their test data folder to ensure your PR is improving the number of successful samples
## Compile Kubeflow Pipelines as Tekton pipelineRun
By default, Tekton pipelineRun is generated by the `tkn` CLI so that users can interactively change their pipeline parameters during each execution. However, `tkn` CLI is lagging several important features when generating pipelineRun. Therefore, we added support for generating pipelineRun using `dsl-compile-tekton` with all the latest kfp-tekton compiler features. The comparison between Tekton pipeline and Argo workflow is described in our [design docs](https://docs.google.com/document/d/1oXOdiItI4GbEe_qzyBmMAqfLBjfYX1nM94WHY3EPa94/edit#heading=h.f38y0bqkxo87).
Compiling Kubeflow Pipelines into Tekton pipelineRun is currently under the experimental stage. [Here](https://github.com/tektoncd/pipeline/blob/master/docs/pipelineruns.md) is the list of supported features in pipelineRun.
As of today, the below pipelineRun features are available within `dsl-compile-tekton`:
- Affinity
- Node Selector
- Tolerations
To compile Kubeflow Pipelines as Tekton pipelineRun, simply add the `--generate-pipelinerun` as part of your `dsl-compile-tekton`commands. e.g.
- `dsl-compile-tekton --py sdk/python/tests/compiler/testdata/tolerations.py --output pipeline.yaml --generate-pipelinerun`
## Troubleshooting
- Please be aware that defined Affinity, Node Selector, and Tolerations are applied to all the tasks in the same pipeline because there's only one podTemplate allowed in each pipeline.
15 changes: 0 additions & 15 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,6 @@ def _op_to_template(op: BaseOp):
# NOTE: the following features are still under development
# **********************************************************

# node selector
if processed_op.node_selector:
raise NotImplementedError("'nodeSelector' is not (yet) implemented")
template['nodeSelector'] = processed_op.node_selector

# tolerations
if processed_op.tolerations:
raise NotImplementedError("'tolerations' is not (yet) implemented")
template['tolerations'] = processed_op.tolerations

# affinity
if processed_op.affinity:
raise NotImplementedError("'affinity' is not (yet) implemented")
template['affinity'] = convert_k8s_obj_to_json(processed_op.affinity)

# metadata
if processed_op.pod_annotations or processed_op.pod_labels:
template.setdefault('metadata', {}) # Tekton change, don't wipe out existing metadata
Expand Down
27 changes: 22 additions & 5 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from kfp.compiler.compiler import Compiler
from kfp.components.structures import InputSpec
from kfp.dsl._metadata import _extract_pipeline_metadata
from kfp.compiler._k8s_helper import convert_k8s_obj_to_json

from .. import tekton_api_version

Expand Down Expand Up @@ -136,7 +137,7 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
task['timeout'] = '%ds' % op.timeout

# generate the Tekton Pipeline document
pipeline = {
pipeline_template = {
'apiVersion': tekton_api_version,
'kind': 'Pipeline',
'metadata': {
Expand All @@ -149,32 +150,48 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
}

# append Task and Pipeline documents
workflow = tasks + [pipeline]
workflow = tasks + [pipeline_template]

# Generate pipelinerun if generate-pipelinerun flag is enabled
# The base templete is generated first and then insert optional parameters.
if self.generate_pipelinerun:
pipelinerun = {
'apiVersion': tekton_api_version,
'kind': 'PipelineRun',
'metadata': {
'name': pipeline['metadata']['name'] + '-run'
'name': pipeline_template['metadata']['name'] + '-run'
},
'spec': {
'params': [{
'name': p['name'],
'value': p['default']
} for p in pipeline['spec']['params']
} for p in pipeline_template['spec']['params']
],
'pipelineRef': {
'name': pipeline['metadata']['name']
'name': pipeline_template['metadata']['name']
}
}
}


pod_template = {}
for task in task_refs:
op = pipeline.ops.get(task['name'])
if op.affinity:
pod_template['affinity'] = convert_k8s_obj_to_json(op.affinity)
if op.tolerations:
pod_template['tolerations'] = pod_template.get('tolerations', []) + op.tolerations
if op.node_selector:
pod_template['nodeSelector'] = op.node_selector

if pod_template:
pipelinerun['spec']['podtemplate'] = pod_template

# add workflow level timeout to pipeline run
if pipeline_conf.timeout:
pipelinerun['spec']['timeout'] = '%ds' % pipeline_conf.timeout


workflow = workflow + [pipelinerun]

return workflow # Tekton change, from return type Dict[Text, Any] to List[Dict[Text, Any]]
Expand Down
21 changes: 21 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,27 @@ def test_timeout_workflow(self):
from .testdata.timeout import timeout_sample_pipeline
self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout.yaml')

def test_tolerations_workflow(self):
"""
Test compiling a tolerations workflow.
"""
from .testdata.tolerations import tolerations
self._test_pipeline_workflow(tolerations, 'tolerations.yaml', generate_pipelinerun=True)

def test_affinity_workflow(self):
"""
Test compiling a affinity workflow.
"""
from .testdata.affinity import affinity_pipeline
self._test_pipeline_workflow(affinity_pipeline, 'affinity.yaml', generate_pipelinerun=True)

def test_node_selector_workflow(self):
"""
Test compiling a node selector workflow.
"""
from .testdata.node_selector import node_selector_pipeline
self._test_pipeline_workflow(node_selector_pipeline, 'node_selector.yaml', generate_pipelinerun=True)

def _test_pipeline_workflow(self, pipeline_function, pipeline_yaml, generate_pipelinerun=False):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml)
Expand Down
45 changes: 45 additions & 0 deletions sdk/python/tests/compiler/testdata/affinity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kubernetes.client import V1Affinity, V1NodeSelector, V1NodeSelectorRequirement, V1NodeSelectorTerm, V1NodeAffinity
from kfp.dsl import ContainerOp
from kfp import dsl


def some_op():
return dsl.ContainerOp(
name='sleep',
image='busybox',
command=['sleep 1'],
)

@dsl.pipeline(
name='affinity',
description='A pipeline with affinity'
)
def affinity_pipeline(
):
"""A pipeline with affinity"""
affinity = V1Affinity(
node_affinity=V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=[V1NodeSelectorTerm(
match_expressions=[V1NodeSelectorRequirement(
key='beta.kubernetes.io/instance-type', operator='In', values=['p2.xlarge'])])])))
some_op().add_affinity(affinity)

if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(affinity_pipeline, __file__.replace('.py', '.yaml'), generate_pipelinerun=True)
58 changes: 58 additions & 0 deletions sdk/python/tests/compiler/testdata/affinity.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: sleep
spec:
steps:
- command:
- sleep 1
image: busybox
name: sleep
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with affinity",
"name": "affinity"}'
name: affinity
spec:
params: []
tasks:
- name: sleep
params: []
taskRef:
name: sleep
---
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: affinity-run
spec:
params: []
pipelineRef:
name: affinity
podtemplate:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: beta.kubernetes.io/instance-type
operator: In
values:
- p2.xlarge
38 changes: 38 additions & 0 deletions sdk/python/tests/compiler/testdata/node_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp.dsl import ContainerOp
from kfp import dsl


def some_op():
return dsl.ContainerOp(
name='sleep',
image='busybox',
command=['sleep 1'],
)

@dsl.pipeline(
name='node_selector',
description='A pipeline with Node Selector'
)
def node_selector_pipeline(
):
"""A pipeline with Node Selector"""
some_op().add_node_selector_constraint('accelerator', 'nvidia-tesla-k80')

if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(node_selector_pipeline, __file__.replace('.py', '.yaml'), generate_pipelinerun=True)
51 changes: 51 additions & 0 deletions sdk/python/tests/compiler/testdata/node_selector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: sleep
spec:
steps:
- command:
- sleep 1
image: busybox
name: sleep
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with Node Selector",
"name": "node_selector"}'
name: node-selector
spec:
params: []
tasks:
- name: sleep
params: []
taskRef:
name: sleep
---
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: node-selector-run
spec:
params: []
pipelineRef:
name: node-selector
podtemplate:
nodeSelector:
accelerator: nvidia-tesla-k80
40 changes: 40 additions & 0 deletions sdk/python/tests/compiler/testdata/tolerations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kubernetes.client import V1Toleration
from kfp.dsl import ContainerOp
from kfp import dsl

@dsl.pipeline(
name='tolerations',
description='A pipeline with tolerations'
)
def tolerations(
):
"""A pipeline with tolerations"""
op1 = dsl.ContainerOp(
name='download',
image='busybox',
command=['sh', '-c'],
arguments=['sleep 10; wget localhost:5678 -O /tmp/results.txt'],
file_outputs={'downloaded': '/tmp/results.txt'})\
.add_toleration(V1Toleration(effect='NoSchedule',
key='gpu',
operator='Equal',
value='run'))

if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(tolerations, __file__.replace('.py', '.yaml'), generate_pipelinerun=True)
Loading

0 comments on commit f088d2a

Please sign in to comment.