Skip to content

Commit

Permalink
Extend compiler to support parallelFor with loop static params (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenglixa authored Apr 3, 2020
1 parent 910ba9b commit cf39681
Show file tree
Hide file tree
Showing 7 changed files with 513 additions and 8 deletions.
69 changes: 69 additions & 0 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import json
import tarfile
import yaml
import copy
import itertools
import zipfile
from typing import Callable, Set, List, Text, Dict, Tuple, Any, Union, Optional

Expand Down Expand Up @@ -59,6 +61,59 @@ def my_pipeline(a: int = 1, b: str = "default value"):
```
"""

def _get_loop_task(self, task: Dict, op_name_to_for_loop_op):
"""Get the list of task references which will flatten the loop parameters defined in pipeline.
Args:
task: ops template in pipeline.
op_name_to_for_loop_op: a dictionary of ospgroup
"""
# Get all the params in the task
task_parms_list = []
for tp in task.get('params', []):
task_parms_list.append(tp)
# Get the loop values for each params
for tp in task_parms_list:
for loop_param in op_name_to_for_loop_op.values():
loop_args = loop_param.loop_args
if loop_args.name in tp['name']:
lpn = tp['name'].replace(loop_args.name, '').replace('-subvar-', '')
if lpn:
tp['loopvalue'] = [value[lpn] for value in loop_args.items_or_pipeline_param]
else:
tp['loopvalue'] = loop_args.items_or_pipeline_param
# Get the task params list
## Get the task_params list without loop first
loop_value = [p['loopvalue'] for p in task_parms_list if p.get('loopvalue')]
task_params_without_loop = [p for p in task_parms_list if not p.get('loopvalue')]
## Get the task_params list with loop
loop_params = [p for p in task_parms_list if p.get('loopvalue')]
for parm in loop_params:
del parm['loopvalue']
del parm['value']
value_iter = list(itertools.product(*loop_value))
value_iter_list = []
for values in value_iter:
opt = []
for value in values:
opt.append({"value": str(value)})
value_iter_list.append(opt)
{value[i].update(loop_params[i]) for i in range(len(loop_params)) for value in value_iter_list}
task_params_with_loop = value_iter_list
## combine task params
list(a.extend(task_params_without_loop) for a in task_params_with_loop)
task_parms_all = task_params_with_loop
# Get the task list based on parmas list
task_list = []
del task['params']
task_old_name = task['name']
for i in range(len(task_parms_all)):
task['params'] = task_parms_all[i]
task['name'] = '%s-loop-items-%d' % (task_old_name, i)
task_list.append(copy.deepcopy(task))
del task['params']
return task_list

def _create_dag_templates(self, pipeline, op_transformers=None, params=None, op_to_templates_handler=None):
"""Create all groups and ops templates in the pipeline.
Expand All @@ -76,6 +131,7 @@ def _create_dag_templates(self, pipeline, op_transformers=None, params=None, op_
for op in pipeline.ops.values():
for transformer in op_transformers or []:
transformer(op)

tasks = []
for op in pipeline.ops.values():
tasks.extend(op_to_steps_handler(op))
Expand Down Expand Up @@ -148,6 +204,19 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
if op.timeout:
task['timeout'] = '%ds' % op.timeout

# process loop parameters, keep this section in the behind of other processes, ahead of gen pipeline
root_group = pipeline.groups[0]
op_name_to_for_loop_op = self._get_for_loop_ops(root_group)
if op_name_to_for_loop_op:
for loop_param in op_name_to_for_loop_op.values():
if loop_param.items_is_pipeline_param is True:
raise NotImplementedError("dynamic params are not yet implemented")
include_loop_task_refs = []
for task in task_refs:
with_loop_task = self._get_loop_task(task, op_name_to_for_loop_op)
include_loop_task_refs.extend(with_loop_task)
task_refs = include_loop_task_refs

# generate the Tekton Pipeline document
pipeline_template = {
'apiVersion': tekton_api_version,
Expand Down
27 changes: 26 additions & 1 deletion sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tempfile
import unittest
import yaml
import re

from kfp_tekton import compiler

Expand Down Expand Up @@ -62,6 +63,24 @@ def test_sidecar_workflow(self):
"""
from .testdata.sidecar import sidecar_pipeline
self._test_pipeline_workflow(sidecar_pipeline, 'sidecar.yaml')

def test_loop_static_workflow(self):
"""
Test compiling a loop static params in workflow.
"""
from .testdata.loop_static import pipeline
self._test_pipeline_workflow(
pipeline,
'loop_static.yaml',
normalize_compiler_output_function=lambda f: re.sub(
"loop-item-param-.*-subvar", "loop-item-param-subvar", f))

def test_withitem_nested_workflow(self):
"""
Test compiling a withitem nested in workflow.
"""
from .testdata.withitem_nested import pipeline
self._test_pipeline_workflow(pipeline, 'withitem_nested.yaml')

def test_pipelineparams_workflow(self):
"""
Expand Down Expand Up @@ -133,14 +152,20 @@ def test_pipeline_transformers_workflow(self):
from .testdata.pipeline_transformers import transform_pipeline
self._test_pipeline_workflow(transform_pipeline, 'pipeline_transformers.yaml')

def _test_pipeline_workflow(self, pipeline_function, pipeline_yaml, generate_pipelinerun=False):
def _test_pipeline_workflow(self,
pipeline_function,
pipeline_yaml,
generate_pipelinerun=False,
normalize_compiler_output_function=None):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml)
temp_dir = tempfile.mkdtemp()
compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml')
try:
compiler.TektonCompiler().compile(pipeline_function, compiled_yaml_file, generate_pipelinerun=generate_pipelinerun)
with open(compiled_yaml_file, 'r') as f:
f = normalize_compiler_output_function(
f.read()) if normalize_compiler_output_function else f
compiled = list(yaml.safe_load_all(f))
if GENERATE_GOLDEN_YAML:
with open(golden_yaml_file, 'w') as f:
Expand Down
47 changes: 47 additions & 0 deletions sdk/python/tests/compiler/testdata/loop_static.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.dsl as dsl


@dsl.pipeline(name='my-pipeline')
def pipeline(my_pipe_param='10'):
loop_args = [{'A_a': 1, 'B_b': 2}, {'A_a': 10, 'B_b': 20}]
with dsl.ParallelFor(loop_args) as item:
op1 = dsl.ContainerOp(
name="my-in-coop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s" % (item.A_a, my_pipe_param)],
)

op2 = dsl.ContainerOp(
name="my-in-coop2",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op2 %s" % item.B_b],
)

op_out = dsl.ContainerOp(
name="my-out-cop",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo %s" % my_pipe_param],
)


if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml'))
109 changes: 109 additions & 0 deletions sdk/python/tests/compiler/testdata/loop_static.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: my-in-coop1
spec:
params:
- name: loop-item-param-subvar-A_a
- name: my_pipe_param
steps:
- args:
- echo op1 $(inputs.params.loop-item-param-subvar-A_a) $(inputs.params.my_pipe_param)
command:
- sh
- -c
image: library/bash:4.4.23
name: my-in-coop1
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: my-in-coop2
spec:
params:
- name: loop-item-param-subvar-B_b
steps:
- args:
- echo op2 $(inputs.params.loop-item-param-subvar-B_b)
command:
- sh
- -c
image: library/bash:4.4.23
name: my-in-coop2
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: my-out-cop
spec:
params:
- name: my_pipe_param
steps:
- args:
- echo $(inputs.params.my_pipe_param)
command:
- sh
- -c
image: library/bash:4.4.23
name: my-out-cop
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param",
"optional": true}], "name": "my-pipeline"}'
name: my-pipeline
spec:
params:
- default: '10'
name: my_pipe_param
tasks:
- name: my-in-coop1-loop-items-0
params:
- name: loop-item-param-subvar-A_a
value: '1'
- name: my_pipe_param
value: $(params.my_pipe_param)
taskRef:
name: my-in-coop1
- name: my-in-coop1-loop-items-1
params:
- name: loop-item-param-subvar-A_a
value: '10'
- name: my_pipe_param
value: $(params.my_pipe_param)
taskRef:
name: my-in-coop1
- name: my-in-coop2-loop-items-0
params:
- name: loop-item-param-subvar-B_b
value: '2'
taskRef:
name: my-in-coop2
- name: my-in-coop2-loop-items-1
params:
- name: loop-item-param-subvar-B_b
value: '20'
taskRef:
name: my-in-coop2
- name: my-out-cop-loop-items-0
params:
- name: my_pipe_param
value: $(params.my_pipe_param)
taskRef:
name: my-out-cop
67 changes: 67 additions & 0 deletions sdk/python/tests/compiler/testdata/withitem_nested.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.dsl as dsl
from kfp.dsl import _for_loop

class Coder:
def __init__(self, ):
self._code_id = 0

def get_code(self, ):
self._code_id += 1
return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS)


dsl.ParallelFor._get_unique_id_code = Coder().get_code


@dsl.pipeline(name='my-pipeline')
def pipeline(my_pipe_param: int = 10):
loop_args = [{'a': 1, 'b': 2}, {'a': 10, 'b': 20}]
with dsl.ParallelFor(loop_args) as item:
op1 = dsl.ContainerOp(
name="my-in-coop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s" % (item.a, my_pipe_param)],
)

with dsl.ParallelFor([100, 200, 300]) as inner_item:
op11 = dsl.ContainerOp(
name="my-inner-inner-coop",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op1 %s %s %s" % (item.a, inner_item, my_pipe_param)],
)

op2 = dsl.ContainerOp(
name="my-in-coop2",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo op2 %s" % item.b],
)

op_out = dsl.ContainerOp(
name="my-out-cop",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo %s" % my_pipe_param],
)


if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml'))
Loading

0 comments on commit cf39681

Please sign in to comment.