Skip to content

Commit

Permalink
Fix the issue when loop/graph inside a graph (kubeflow#650)
Browse files Browse the repository at this point in the history
  • Loading branch information
pugangxa authored Jul 2, 2021
1 parent 2cfc78d commit 791fe53
Show file tree
Hide file tree
Showing 6 changed files with 603 additions and 3 deletions.
4 changes: 1 addition & 3 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies, pipeline_
self.loops_pipeline[group_name]['task_list'].append(sanitize_k8s_name(condition_op.name))
if op.groups:
for condition_op in op.groups:
# graph task should be a nested cr and need be appended to task list.
if condition_op.type == 'graph':
self.loops_pipeline[group_name]['task_list'].append(sanitize_k8s_name(condition_op.name))
self.loops_pipeline[group_name]['task_list'].append(sanitize_k8s_name(condition_op.name))
self.loops_pipeline[group_name]['spec']['name'] = group_name
self.loops_pipeline[group_name]['spec']['taskRef'] = {
"apiVersion": "custom.tekton.dev/v1alpha1",
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ def test_cond_recur_workflow(self):
from .testdata.cond_recur import condition_and_recur
self._test_pipeline_workflow(condition_and_recur, 'cond_recur.yaml')

def test_loop_in_recur_workflow(self):
"""
Test compiling a conditional recursive workflow.
"""
from .testdata.loop_in_recursion import flipcoin
self._test_pipeline_workflow(flipcoin, 'loop_in_recursion.yaml')

def test_recur_nested_workflow(self):
"""
Test compiling a nested recursive workflow.
Expand Down
90 changes: 90 additions & 0 deletions sdk/python/tests/compiler/testdata/loop_in_recursion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp_tekton.compiler import TektonCompiler


class Coder:
def empty(self):
return ""


TektonCompiler._get_unique_id_code = Coder.empty


def flip_coin_op():
"""Flip a coin and output heads or tails randomly."""
return dsl.ContainerOp(
name='Flip coin',
image='python:alpine3.6',
command=['sh', '-c'],
arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
'else \'tails\'; print(result)" | tee /tmp/output'],
file_outputs={'output': '/tmp/output'}
)


def print_op(msg):
"""Print a message."""
return dsl.ContainerOp(
name='Print',
image='alpine:3.6',
command=['echo', msg],
)


@dsl._component.graph_component
def flip_component(flip_result, maxVal, my_pipe_param):
with dsl.Condition(flip_result == 'heads'):
print_flip = print_op(flip_result)
flipA = flip_coin_op().after(print_flip)
loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]
with dsl.ParallelFor(loop_args) as item:
op1 = dsl.ContainerOp(
name="my-in-coop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s" % (item.a, my_pipe_param)],
)

with dsl.ParallelFor([100, 200, 300]) as inner_item:
op11 = dsl.ContainerOp(
name="my-inner-inner-coop",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s %s" % (item.a, inner_item, my_pipe_param)],
)

op2 = dsl.ContainerOp(
name="my-in-coop2",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op2 %s" % item.b],
)
flip_component(flipA.output, maxVal, my_pipe_param)


@dsl.pipeline(
name='loop in recursion pipeline',
description='shows how to use graph_component and recursion.'
)
def flipcoin(maxVal=12, my_pipe_param: int = 10):
flip_out = flip_coin_op()
flip_loop = flip_component(flip_out.output, maxVal, my_pipe_param)
print_op('cool, it is over. %s' % flip_out.output).after(flip_loop)


if __name__ == '__main__':
TektonCompiler().compile(flipcoin, __file__.replace('.py', '.yaml'))
112 changes: 112 additions & 0 deletions sdk/python/tests/compiler/testdata/loop_in_recursion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "shows how to use graph_component
and recursion.", "inputs": [{"default": "12", "name": "maxVal", "optional":
true}, {"default": "10", "name": "my_pipe_param", "optional": true, "type":
"Integer"}], "name": "loop in recursion pipeline"}'
sidecar.istio.io/inject: 'false'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"flip-coin": [["output", "$(results.output.path)"]],
"flip-coin-2": [["output", "$(results.output.path)"]], "my-in-coop1": [], "my-in-coop2":
[], "my-inner-inner-coop": [], "print": [], "print-2": []}'
tekton.dev/input_artifacts: '{"print": [{"name": "flip-coin-output", "parent_task":
"flip-coin"}], "print-2": [{"name": "flip-coin-output", "parent_task": "flip-coin"}]}'
tekton.dev/output_artifacts: '{"flip-coin": [{"key": "artifacts/$PIPELINERUN/flip-coin/output.tgz",
"name": "flip-coin-output", "path": "/tmp/output"}], "flip-coin-2": [{"key":
"artifacts/$PIPELINERUN/flip-coin-2/output.tgz", "name": "flip-coin-2-output",
"path": "/tmp/output"}]}'
name: loop-in-recursion-pipeline
spec:
params:
- name: maxVal
value: '12'
- name: my_pipe_param
value: '10'
pipelineSpec:
params:
- default: '12'
name: maxVal
- default: '10'
name: my_pipe_param
tasks:
- name: flip-coin
taskSpec:
metadata:
annotations:
tekton.dev/template: ''
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
results:
- description: /tmp/output
name: output
steps:
- args:
- python -c "import random; result = 'heads' if random.randint(0,1) == 0
else 'tails'; print(result)" | tee $(results.output.path)
command:
- sh
- -c
image: python:alpine3.6
name: main
timeout: 0s
- name: print-2
params:
- name: flip-coin-output
value: $(tasks.flip-coin.results.output)
runAfter:
- loop-in-recursion-pipeline-graph-flip-component-1
taskSpec:
metadata:
annotations:
tekton.dev/template: ''
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
params:
- name: flip-coin-output
steps:
- command:
- echo
- cool, it is over. $(inputs.params.flip-coin-output)
image: alpine:3.6
name: main
timeout: 0s
- name: loop-in-recursion-pipeline-graph-flip-component-1
params:
- name: flip-coin-output
value: $(tasks.flip-coin.results.output)
- name: just_one_iteration
value:
- '1'
- name: maxVal
value: $(params.maxVal)
- name: my_pipe_param
value: $(params.my_pipe_param)
runAfter:
- flip-coin
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: loop-in-recursion-pipeline-graph-flip-component-1
timeout: 0s
112 changes: 112 additions & 0 deletions sdk/python/tests/compiler/testdata/loop_in_recursion_noninlined.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "shows how to use graph_component
and recursion.", "inputs": [{"default": "12", "name": "maxVal", "optional":
true}, {"default": "10", "name": "my_pipe_param", "optional": true, "type":
"Integer"}], "name": "loop in recursion pipeline"}'
sidecar.istio.io/inject: 'false'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"flip-coin": [["output", "$(results.output.path)"]],
"flip-coin-2": [["output", "$(results.output.path)"]], "my-in-coop1": [], "my-in-coop2":
[], "my-inner-inner-coop": [], "print": [], "print-2": []}'
tekton.dev/input_artifacts: '{"print": [{"name": "flip-coin-output", "parent_task":
"flip-coin"}], "print-2": [{"name": "flip-coin-output", "parent_task": "flip-coin"}]}'
tekton.dev/output_artifacts: '{"flip-coin": [{"key": "artifacts/$PIPELINERUN/flip-coin/output.tgz",
"name": "flip-coin-output", "path": "/tmp/output"}], "flip-coin-2": [{"key":
"artifacts/$PIPELINERUN/flip-coin-2/output.tgz", "name": "flip-coin-2-output",
"path": "/tmp/output"}]}'
name: loop-in-recursion-pipeline
spec:
params:
- name: maxVal
value: '12'
- name: my_pipe_param
value: '10'
pipelineSpec:
params:
- default: '12'
name: maxVal
- default: '10'
name: my_pipe_param
tasks:
- name: flip-coin
taskSpec:
metadata:
annotations:
tekton.dev/template: ''
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
results:
- description: /tmp/output
name: output
steps:
- args:
- python -c "import random; result = 'heads' if random.randint(0,1) == 0
else 'tails'; print(result)" | tee $(results.output.path)
command:
- sh
- -c
image: python:alpine3.6
name: main
timeout: 0s
- name: print-2
params:
- name: flip-coin-output
value: $(tasks.flip-coin.results.output)
runAfter:
- loop-in-recursion-pipeline-graph-flip-component-1
taskSpec:
metadata:
annotations:
tekton.dev/template: ''
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
params:
- name: flip-coin-output
steps:
- command:
- echo
- cool, it is over. $(inputs.params.flip-coin-output)
image: alpine:3.6
name: main
timeout: 0s
- name: loop-in-recursion-pipeline-graph-flip-component-1
params:
- name: flip-coin-output
value: $(tasks.flip-coin.results.output)
- name: just_one_iteration
value:
- '1'
- name: maxVal
value: $(params.maxVal)
- name: my_pipe_param
value: $(params.my_pipe_param)
runAfter:
- flip-coin
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: loop-in-recursion-pipeline-graph-flip-component-1
timeout: 0s
Loading

0 comments on commit 791fe53

Please sign in to comment.