Skip to content

Commit

Permalink
fix(sdk): handle the node selectors (kubeflow#916)
Browse files Browse the repository at this point in the history
Handle the node selector info from PipelineConf in
compiler. Node selector info in Op level could overide
PipelinConf.
settings.

Signed-off-by: Yihong Wang <yh.wang@ibm.com>
  • Loading branch information
yhwang authored Apr 12, 2022
1 parent f083d6f commit eac02f0
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 1 deletion.
11 changes: 10 additions & 1 deletion sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import textwrap
import uuid
import zipfile
import copy
from collections import defaultdict
from distutils.util import strtobool
import collections
Expand Down Expand Up @@ -1263,8 +1264,16 @@ def get_when_task(input_task_when, depended_conditions):
task_spec["taskPodTemplate"]["affinity"] = convert_k8s_obj_to_json(op.affinity)
if op.tolerations:
task_spec["taskPodTemplate"]['tolerations'] = op.tolerations
# process pipeline level first
if pipeline_conf and hasattr(pipeline_conf, 'default_pod_node_selector') \
and len(pipeline_conf.default_pod_node_selector) > 0:
task_spec["taskPodTemplate"]['nodeSelector'] = copy.deepcopy(pipeline_conf.default_pod_node_selector)
# process op level and it may oeverride the pipeline level conf
if op.node_selector:
task_spec["taskPodTemplate"]['nodeSelector'] = op.node_selector
if task_spec["taskPodTemplate"].get('nodeSelector'):
task_spec["taskPodTemplate"]['nodeSelector'].update(op.node_selector)
else:
task_spec["taskPodTemplate"]['nodeSelector'] = op.node_selector
if bool(task_spec["taskPodTemplate"]):
task_run_spec.append(task_spec)
if len(task_run_spec) > 0:
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,20 @@ def test_node_selector_workflow(self):
from .testdata.node_selector import node_selector_pipeline
self._test_pipeline_workflow(node_selector_pipeline, 'node_selector.yaml', skip_noninlined=True)

def test_node_selector_from_pipeline_workflow(self):
"""
Test compiling a node selector workflow. node selector is from pipeline conf
"""
from .testdata.node_selector_from_pipeline import node_selector_pipeline
self._test_pipeline_workflow(node_selector_pipeline, 'node_selector_from_pipeline.yaml', skip_noninlined=True)

def test_node_selector_from_pipeline_override_workflow(self):
"""
Test compiling a node selector workflow. node selector from pipeline conf is override by op conf
"""
from .testdata.node_selector_from_pipeline_override import node_selector_pipeline
self._test_pipeline_workflow(node_selector_pipeline, 'node_selector_from_pipeline_override.yaml', skip_noninlined=True)

def test_pipeline_transformers_workflow(self):
"""
Test compiling a pipeline_transformers workflow with pod annotations and labels.
Expand Down
46 changes: 46 additions & 0 deletions sdk/python/tests/compiler/testdata/node_selector_from_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl, components


def echo_op():
return components.load_component_from_text("""
name: echo
description: echo
implementation:
container:
image: busybox
command:
- sh
- -c
args:
- echo
- Found my node
""")()


@dsl.pipeline(
name='node-selector',
description='A pipeline with Node Selector'
)
def node_selector_pipeline(
):
dsl.get_pipeline_conf().set_default_pod_node_selector('kubernetes.io/os', 'linux')
echo_op()


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(node_selector_pipeline, __file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: node-selector
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"echo": []}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with Node Selector",
"name": "node-selector"}'
spec:
pipelineSpec:
tasks:
- name: echo
taskSpec:
steps:
- name: main
args:
- echo
- Found my node
command:
- sh
- -c
image: busybox
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "echo", "outputs":
[], "version": "echo@sha256=e351a00f59eb0eb8d614bb41816020b768770ba7513a5b3c0c7ea5c2efa9c6d6"}'
tekton.dev/template: ''
timeout: 525600m
taskRunSpecs:
- pipelineTaskName: echo
taskPodTemplate:
nodeSelector:
kubernetes.io/os: linux
timeout: 525600m
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl, components


def echo_op():
return components.load_component_from_text("""
name: echo
description: echo
implementation:
container:
image: busybox
command:
- sh
- -c
args:
- echo
- Found my node
""")()


@dsl.pipeline(
name='node-selector',
description='A pipeline with Node Selector'
)
def node_selector_pipeline(
):
dsl.get_pipeline_conf().set_default_pod_node_selector('kubernetes.io/os', 'linux')
echo_op().add_node_selector_constraint(
label_name='kubernetes.io/os',
value='windows')


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(node_selector_pipeline, __file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: node-selector
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"echo": []}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline with Node Selector",
"name": "node-selector"}'
spec:
pipelineSpec:
tasks:
- name: echo
taskSpec:
steps:
- name: main
args:
- echo
- Found my node
command:
- sh
- -c
image: busybox
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "echo", "outputs":
[], "version": "echo@sha256=e351a00f59eb0eb8d614bb41816020b768770ba7513a5b3c0c7ea5c2efa9c6d6"}'
tekton.dev/template: ''
timeout: 525600m
taskRunSpecs:
- pipelineTaskName: echo
taskPodTemplate:
nodeSelector:
kubernetes.io/os: windows
timeout: 525600m

0 comments on commit eac02f0

Please sign in to comment.