Skip to content

Commit

Permalink
fix(sdk): Fix big data passing with multiple type of volume templates (
Browse files Browse the repository at this point in the history
…kubeflow#1006)

* fix big data passing with multiple type of volume templates

* fix lint and license

* fix multi input step bug

* refactor update volume code

* add missing tests
  • Loading branch information
Tomcli authored Jul 19, 2022
1 parent a9f1483 commit 26b4455
Show file tree
Hide file tree
Showing 7 changed files with 512 additions and 15 deletions.
28 changes: 18 additions & 10 deletions sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import List, Optional, Set

from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name
from kfp_tekton.compiler._op_to_template import _get_base_step, _add_mount_path, _prepend_steps
from kfp_tekton.compiler._op_to_template import _get_base_step, _add_mount_path, _prepend_steps, _update_volumes
from os import environ as env

BIG_DATA_MIDPATH = "artifacts/$ORIG_PR_NAME"
Expand Down Expand Up @@ -707,21 +707,29 @@ def input_artifacts_tasks(template: dict, artifact: dict) -> dict:
volume_template = []
mounted_param_paths = []
copy_inputs_step = _get_base_step('copy-inputs')
has_copy_input = False
copy_input_index = 0
if 'raw' in artifact:
copy_inputs_step['command'].append('set -exo pipefail\necho -n "%s" > %s\n' % (
artifact['raw']['data'], artifact['path']))
for step_index in range(len(template['taskSpec']['steps'])):
if template['taskSpec']['steps'][step_index].get('name') == 'copy-inputs':
copy_input_index = step_index
has_copy_input = True
if has_copy_input:
template['taskSpec']['steps'][copy_input_index]['command'][-1] = \
template['taskSpec']['steps'][copy_input_index]['command'][-1] + \
'echo -n "%s" > %s\n' % (artifact['raw']['data'], artifact['path'])
else:
copy_inputs_step['command'].append('set -exo pipefail\necho -n "%s" > %s\n' % (
artifact['raw']['data'], artifact['path']))
mount_path = artifact['path'].rsplit("/", 1)[0]
if mount_path not in mounted_param_paths:
_add_mount_path(artifact['name'], artifact['path'], mount_path,
volume_mount_step_template, volume_template,
mounted_param_paths)
template['taskSpec']['steps'] = _prepend_steps(
[copy_inputs_step], template['taskSpec']['steps'])
# _update_volumes(template, volume_mount_step_template, volume_template)
if volume_mount_step_template:
template['taskSpec']['stepTemplate'] = {}
template['taskSpec']['stepTemplate']['volumeMounts'] = volume_mount_step_template
template['taskSpec']['volumes'] = volume_template
if not has_copy_input:
template['taskSpec']['steps'] = _prepend_steps(
[copy_inputs_step], template['taskSpec']['steps'])
_update_volumes(template['taskSpec'], volume_mount_step_template, volume_template)
return template


Expand Down
12 changes: 7 additions & 5 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,18 @@ def _add_mount_path(name: str,
mounted_param_paths.append(mount_path)


def _update_volumes(template: Dict[Text, Any],
def _update_volumes(template_spec: Dict[Text, Any],
volume_mount_step_template: List[Dict[Text, Any]],
volume_template: List[Dict[Text, Any]]):
"""
Update the list of volumes and volumeMounts on a given template
"""
if volume_mount_step_template:
template['spec']['stepTemplate'] = {}
template['spec']['stepTemplate']['volumeMounts'] = volume_mount_step_template
template['spec']['volumes'] = volume_template
template_spec.setdefault('stepTemplate', {})
template_spec['stepTemplate'].setdefault('volumeMounts', [])
template_spec['stepTemplate']['volumeMounts'].extend(volume_mount_step_template)
template_spec.setdefault('volumes', [])
template_spec['volumes'].extend(volume_template)


def _prepend_steps(prep_steps: List[Dict[Text, Any]], original_steps: List[Dict[Text, Any]]):
Expand Down Expand Up @@ -521,7 +523,7 @@ def _op_to_template(op: BaseOp,
artifact_items[op.name])
if mounted_param_paths:
template['spec']['steps'].append(copy_results_step)
_update_volumes(template, volume_mount_step_template, volume_template)
_update_volumes(template['spec'], volume_mount_step_template, volume_template)

# metadata
if processed_op.pod_annotations or processed_op.pod_labels:
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ def test_parallel_join_workflow(self):
from .testdata.parallel_join import download_and_join
self._test_pipeline_workflow(download_and_join, 'parallel_join.yaml', skip_noninlined=True)

def test_big_data_multi_volumes_1_workflow(self):
"""
Test compiling a big data pipeline with multiple types of volumes workflow.
"""
from .testdata.big_data_multi_volumes_1 import big_data
self._test_pipeline_workflow(big_data, 'big_data_multi_volumes_1.yaml', skip_noninlined=True)

def test_big_data_multi_volumes_2_workflow(self):
"""
Test compiling a big data pipeline with multiple types of volumes workflow.
"""
from .testdata.big_data_multi_volumes_2 import big_data
self._test_pipeline_workflow(big_data, 'big_data_multi_volumes_2.yaml', skip_noninlined=True)

def test_recur_cond_workflow(self):
"""
Test compiling a recurive condition workflow.
Expand Down
106 changes: 106 additions & 0 deletions sdk/python/tests/compiler/testdata/big_data_multi_volumes_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2022 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from kfp import dsl
from kfp.components import load_component_from_text
from kfp_tekton.compiler import TektonCompiler
from kubernetes.client import V1Volume, V1VolumeMount, V1SecretVolumeSource


def PrintOp(name: str, msg: str = None):
if msg is None:
msg = name
print_op = load_component_from_text(
"""
name: %s
inputs:
- {name: input_text, type: String, description: 'Represents an input parameter.'}
outputs:
- {name: output_value, type: String, description: 'Represents an output parameter.'}
implementation:
container:
image: alpine:3.6
command:
- sh
- -c
- |
set -e
echo $0 > $1
- {inputValue: input_text}
- {outputPath: output_value}
""" % name
)
return print_op(msg)


def PrintRefOp(name: str, msg: str = None):
if msg is None:
msg = name
print_op = load_component_from_text(
"""
name: %s
inputs:
- {name: input_text, type: String, description: 'Represents an input parameter.'}
outputs:
- {name: output_value, type: String, description: 'Represents an output parameter.'}
implementation:
container:
image: alpine:3.6
command:
- sh
- -c
- |
set -e
cat $0 > $1
- {inputPath: input_text}
- {outputPath: output_value}
""" % name
)
return print_op(msg)


def add_volume_and_mount(op: dsl.ContainerOp) -> dsl.ContainerOp:
suffix = op.name[op.name.index('-'):]
return op.add_volume(
V1Volume(
name='volume' + suffix,
secret=V1SecretVolumeSource(secret_name='secret' + suffix),
)
).container.add_volume_mount(
V1VolumeMount(
name='volume' + suffix,
mount_path='/volume' + suffix,
)
)


@dsl.pipeline(name='big-data')
def big_data():
# literal -> small
PrintOp(
'print-sm',
'literal',
).apply(add_volume_and_mount)

# literal -> big
PrintRefOp(
'print-big',
'literal',
).apply(add_volume_and_mount)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(big_data, __file__.replace('.py', '.yaml'))
120 changes: 120 additions & 0 deletions sdk/python/tests/compiler/testdata/big_data_multi_volumes_1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: big-data
annotations:
tekton.dev/output_artifacts: '{"print-big": [{"key": "artifacts/$PIPELINERUN/print-big/output_value.tgz",
"name": "print-big-output_value", "path": "/tmp/outputs/output_value/data"}],
"print-sm": [{"key": "artifacts/$PIPELINERUN/print-sm/output_value.tgz", "name":
"print-sm-output_value", "path": "/tmp/outputs/output_value/data"}]}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"print-big": [["output_value", "$(results.output-value.path)"]],
"print-sm": [["output_value", "$(results.output-value.path)"]]}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"name": "big-data"}'
spec:
pipelineSpec:
tasks:
- name: print-sm
taskSpec:
steps:
- name: main
command:
- sh
- -c
- |
set -e
echo $0 > $1
- literal
- $(results.output-value.path)
image: alpine:3.6
volumeMounts:
- mountPath: /volume-sm
name: volume-sm
results:
- name: output-value
type: string
description: /tmp/outputs/output_value/data
volumes:
- name: volume-sm
secret:
secretName: secret-sm
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "print-sm", "outputs":
[{"description": "Represents an output parameter.", "name": "output_value",
"type": "String"}], "version": "print-sm@sha256=e88f5ee1607e92e4cf859907b2f81bdc766fc69cff62010f2f93d4c850a4d429"}'
tekton.dev/template: ''
timeout: 525600m
- name: print-big
taskSpec:
steps:
- image: busybox
name: copy-inputs
command:
- sh
- -ec
- |
set -exo pipefail
echo -n "literal" > /tmp/inputs/input_text/data
- name: main
command:
- sh
- -c
- |
set -e
cat $0 > $1
- /tmp/inputs/input_text/data
- $(results.output-value.path)
image: alpine:3.6
volumeMounts:
- mountPath: /volume-big
name: volume-big
results:
- name: output-value
type: string
description: /tmp/outputs/output_value/data
volumes:
- name: volume-big
secret:
secretName: secret-big
- name: input-text
emptyDir: {}
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "print-big", "outputs":
[{"description": "Represents an output parameter.", "name": "output_value",
"type": "String"}], "version": "print-big@sha256=e85f43e32a0210be43f84f43434e61113ce4770f37f5346acc41523a270e72d4"}'
tekton.dev/template: ''
stepTemplate:
volumeMounts:
- name: input-text
mountPath: /tmp/inputs/input_text
timeout: 525600m
timeout: 525600m
Loading

0 comments on commit 26b4455

Please sign in to comment.