Skip to content

Commit

Permalink
Fix kubeflow#340: handle big data passing with multi dependencies (ku…
Browse files Browse the repository at this point in the history
  • Loading branch information
fenglixa authored Nov 11, 2020
1 parent 4545ff9 commit cdd5625
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 0 deletions.
1 change: 1 addition & 0 deletions sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ def big_data_passing_tasks(task: dict, pipelinerun_template: dict,
task_name, task_parma.get('name'))
task['taskSpec'] = replace_big_data_placeholder(
task_spec, placeholder, workspaces_parameter)
task_spec = task.get('taskSpec', {})
# Handle the case of input artifact without dependent the output of other tasks
for task_artifact in task_artifacts:
if (task_name, task_artifact.get('name')) not in inputs_tasks:
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ def test_big_data_workflow(self):
from .testdata.big_data_passing import file_passing_pipelines
self._test_pipeline_workflow(file_passing_pipelines, 'big_data_passing.yaml')

def test_create_component_from_func_workflow(self):
"""
Test compiling a creating component from func workflow.
"""
from .testdata.create_component_from_func import test_pipeline
self._test_pipeline_workflow(test_pipeline, 'create_component_from_func.yaml')

def test_katib_workflow(self):
"""
Test compiling a katib workflow.
Expand Down
64 changes: 64 additions & 0 deletions sdk/python/tests/compiler/testdata/create_component_from_func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import kfp
import os
from kfp.components import create_component_from_func, OutputPath

cwd = os.path.dirname(__file__)
filesystem_component_root = 'components/filesystem'
get_subdir_op = kfp.components.load_component_from_file(os.path.join(
cwd, '../../../../../', filesystem_component_root, 'get_subdirectory/component.yaml'))
list_item_op_1 = kfp.components.load_component_from_file(os.path.join(
cwd, '../../../../../', filesystem_component_root, 'list_items/component.yaml'))
list_item_op_2 = kfp.components.load_component_from_file(
os.path.join(cwd, 'create_component_from_func_component.yaml'))


def test_pipeline():
@create_component_from_func
def produce_dir_with_files_python_op(output_dir_path: OutputPath(), num_files: int = 10):
import os
os.makedirs(os.path.join(output_dir_path, 'subdir'), exist_ok=True)
for i in range(num_files):
file_path = os.path.join(output_dir_path, 'subdir', str(i) + '.txt')
with open(file_path, 'w') as f:
f.write(str(i))

produce_dir_python_task = produce_dir_with_files_python_op(num_files=15)

get_subdir_task = get_subdir_op(
# Input name "Input 1" is converted to pythonic parameter name "input_1"
directory=produce_dir_python_task.output,
subpath="subdir"
)

list_items_task_1 = list_item_op_1(
# Input name "Input 1" is converted to pythonic parameter name "input_1"
directory=get_subdir_task.output
)

list_items_task_2 = list_item_op_2(
# Input name "Input 1" is converted to pythonic parameter name "input_1"
directory1=get_subdir_task.output,
directory2=produce_dir_python_task.output
)


# General by kfp-tekton
if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(test_pipeline,
__file__.replace('.py', '.yaml'))
173 changes: 173 additions & 0 deletions sdk/python/tests/compiler/testdata/create_component_from_func.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"name": "Test pipeline"}'
sidecar.istio.io/inject: 'false'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"get-subdirectory": [["Subdir", "$(workspaces.get-subdirectory.path)/get-subdirectory-Subdir"]],
"list-items": [["Items", "$(results.items.path)"]], "list-items-2": [["Items",
"$(results.items.path)"]], "produce-dir-with-files-python-op": [["output_dir",
"$(workspaces.produce-dir-with-files-python-op.path)/produce-dir-with-files-python-op-output_dir"]]}'
tekton.dev/input_artifacts: '{"get-subdirectory": [{"name": "produce-dir-with-files-python-op-output_dir",
"parent_task": "produce-dir-with-files-python-op"}], "list-items": [{"name":
"get-subdirectory-Subdir", "parent_task": "get-subdirectory"}], "list-items-2":
[{"name": "get-subdirectory-Subdir", "parent_task": "get-subdirectory"}, {"name":
"produce-dir-with-files-python-op-output_dir", "parent_task": "produce-dir-with-files-python-op"}]}'
tekton.dev/output_artifacts: '{"get-subdirectory": [{"key": "artifacts/$PIPELINERUN/get-subdirectory/Subdir.tgz",
"name": "get-subdirectory-Subdir", "path": "/tmp/outputs/Subdir/data"}], "list-items":
[{"key": "artifacts/$PIPELINERUN/list-items/Items.tgz", "name": "list-items-Items",
"path": "/tmp/outputs/Items/data"}], "list-items-2": [{"key": "artifacts/$PIPELINERUN/list-items-2/Items.tgz",
"name": "list-items-2-Items", "path": "/tmp/outputs/Items/data"}], "produce-dir-with-files-python-op":
[{"key": "artifacts/$PIPELINERUN/produce-dir-with-files-python-op/output_dir.tgz",
"name": "produce-dir-with-files-python-op-output_dir", "path": "/tmp/outputs/output_dir/data"}]}'
name: test-pipeline
spec:
pipelineSpec:
tasks:
- name: produce-dir-with-files-python-op
taskSpec:
steps:
- args:
- --num-files
- '15'
- --output-dir
- $(workspaces.produce-dir-with-files-python-op.path)/produce-dir-with-files-python-op-output_dir
command:
- python3
- -u
- -c
- "def _make_parent_dirs_and_return_path(file_path: str):\n import os\n\
\ os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return\
\ file_path\n\ndef produce_dir_with_files_python_op(output_dir_path, num_files\
\ = 10):\n import os\n os.makedirs(os.path.join(output_dir_path,\
\ 'subdir'), exist_ok=True)\n for i in range(num_files):\n file_path\
\ = os.path.join(output_dir_path, 'subdir', str(i) + '.txt')\n \
\ with open(file_path, 'w') as f:\n f.write(str(i))\n\nimport\
\ argparse\n_parser = argparse.ArgumentParser(prog='Produce dir with files\
\ python op', description='')\n_parser.add_argument(\"--num-files\", dest=\"\
num_files\", type=int, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"\
--output-dir\", dest=\"output_dir_path\", type=_make_parent_dirs_and_return_path,\
\ required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\
\n_outputs = produce_dir_with_files_python_op(**_parsed_args)\n"
image: python:3.7
name: main
workspaces:
- name: produce-dir-with-files-python-op
timeout: 0s
workspaces:
- name: produce-dir-with-files-python-op
workspace: test-pipeline
- name: get-subdirectory
runAfter:
- produce-dir-with-files-python-op
taskSpec:
steps:
- command:
- sh
- -ex
- -c
- 'mkdir -p "$(dirname "$2")"
cp -r "$0/$1" "$2"
'
- $(workspaces.get-subdirectory.path)/produce-dir-with-files-python-op-output_dir
- subdir
- $(workspaces.get-subdirectory.path)/get-subdirectory-Subdir
image: alpine
name: main
workspaces:
- name: get-subdirectory
timeout: 0s
workspaces:
- name: get-subdirectory
workspace: test-pipeline
- name: list-items
runAfter:
- get-subdirectory
taskSpec:
results:
- description: /tmp/outputs/Items/data
name: items
steps:
- command:
- sh
- -ex
- -c
- 'mkdir -p "$(dirname "$1")"
#ls --almost-all --recursive "$0" > "$1"
ls -A -R "$0" > "$1"
'
- $(workspaces.list-items.path)/get-subdirectory-Subdir
- $(results.items.path)
image: alpine
name: main
workspaces:
- name: list-items
timeout: 0s
workspaces:
- name: list-items
workspace: test-pipeline
- name: list-items-2
runAfter:
- get-subdirectory
- produce-dir-with-files-python-op
taskSpec:
results:
- description: /tmp/outputs/Items/data
name: items
steps:
- command:
- sh
- -ex
- -c
- 'mkdir -p "$(dirname "$2")"
ls -A -R "$0" > "$2"
ls -A -R "$1" >> "$2"
'
- $(workspaces.list-items-2.path)/get-subdirectory-Subdir
- $(workspaces.list-items-2.path)/produce-dir-with-files-python-op-output_dir
- $(results.items.path)
image: alpine
name: main
workspaces:
- name: list-items-2
timeout: 0s
workspaces:
- name: list-items-2
workspace: test-pipeline
workspaces:
- name: test-pipeline
timeout: 0s
workspaces:
- name: test-pipeline
volumeClaimTemplate:
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 2Gi
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: List items 2
description: Recursively list directory contents.
inputs:
- {name: Directory1, type: Directory}
- {name: Directory2, type: Directory}
outputs:
- {name: Items}
implementation:
container:
image: alpine
command:
- sh
- -ex
- -c
- |
mkdir -p "$(dirname "$2")"
ls -A -R "$0" > "$2"
ls -A -R "$1" >> "$2"
- inputPath: Directory1
- inputPath: Directory2
- outputPath: Items

0 comments on commit cdd5625

Please sign in to comment.