Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK - Components - Creating graph components from python pipeline function #2273

1 change: 1 addition & 0 deletions sdk/python/kfp/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
from ._airflow_op import *
from ._components import *
from ._python_op import *
from ._python_to_graph_component import *
from ._component_store import *
147 changes: 147 additions & 0 deletions sdk/python/kfp/components/_python_to_graph_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Copyright 2018 Google LLC
Ark-kun marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = [
'create_graph_component_from_pipeline_func',
]


import inspect
from collections import OrderedDict
from typing import Callable

from . import _components
from ._structures import TaskSpec, ComponentSpec, OutputSpec, GraphInputArgument, TaskOutputArgument, GraphImplementation, GraphSpec
from ._naming import _make_name_unique_by_adding_index
from ._python_op import _extract_component_interface


def create_graph_component_from_pipeline_func(pipeline_func: Callable, output_component_file: str, embed_component_specs: bool = False) -> None:
'''Experimental! Creates graph component definition from a python pipeline function. The component file can be published for sharing.
Pipeline function is a function that only calls component functions and passes outputs to inputs.
This feature is experimental and lacks support for some of the DSL features like conditions and loops.
Only pipelines consisting of loaded components or python components are currently supported (no manually created ContainerOps or ResourceOps).

Args:
pipeline_func: Python function to convert
output_component_file: Path of the file where the component definition will be written. The `component.yaml` file can then be published for sharing.
embed_component_specs: Whether to embed component definitions or just reference them. Embedding makes the graph component self-contained. Default is False.

Example:

producer_op = load_component_from_file('producer/component.yaml')
processor_op = load_component_from_file('processor/component.yaml')

def pipeline1(pipeline_param_1: int):
producer_task = producer_op()
processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2'])

return OrderedDict([
('Pipeline output 1', producer_task.outputs['Output 1']),
('Pipeline output 2', processor_task.outputs['Output 2']),
])

create_graph_component_from_pipeline_func(pipeline1, output_component_file='pipeline.component.yaml')
'''
component_spec = create_graph_component_spec_from_pipeline_func(pipeline_func, embed_component_specs)
if output_component_file:
from pathlib import Path
from ._yaml_utils import dump_yaml
component_dict = component_spec.to_dict()
component_yaml = dump_yaml(component_dict)
Path(output_component_file).write_text(component_yaml)


def create_graph_component_spec_from_pipeline_func(pipeline_func: Callable, embed_component_specs: bool = False) -> ComponentSpec:

component_spec = _extract_component_interface(pipeline_func)
# Checking the function parameters - they should not have file passing annotations.
input_specs = component_spec.inputs or []
for input in input_specs:
if input._passing_style:
raise TypeError('Graph component function parameter "{}" cannot have file-passing annotation "{}".'.format(input.name, input._passing_style))

task_map = OrderedDict() #Preserving task order

def task_construction_handler(task: TaskSpec):
#Rewriting task ids so that they're same every time
task_id = task.component_ref.spec.name or "Task"
task_id = _make_name_unique_by_adding_index(task_id, task_map.keys(), ' ')
for output_ref in task.outputs.values():
output_ref.task_output.task_id = task_id
output_ref.task_output.task = None
task_map[task_id] = task
# Remove the component spec from component reference unless it will make the reference empty or unless explicitly asked by the user
if not embed_component_specs and any([task.component_ref.name, task.component_ref.url, task.component_ref.digest]):
task.component_ref.spec = None

return task #The handler is a transformation function, so it must pass the task through.

# Preparing the pipeline_func arguments
# TODO: The key should be original parameter name if different
pipeline_func_args = {input.name: GraphInputArgument(input_name=input.name) for input in input_specs}

try:
#Setting the handler to fix and catch the tasks.
_components._created_task_transformation_handler.append(task_construction_handler)

#Calling the pipeline_func with GraphInputArgument instances as arguments
pipeline_func_result = pipeline_func(**pipeline_func_args)
finally:
_components._created_task_transformation_handler.pop()


# Getting graph outputs
output_names = [output.name for output in (component_spec.outputs or [])]

if len(output_names) == 1 and output_names[0] == 'Output': # TODO: Check whether the NamedTuple syntax was used
Ark-kun marked this conversation as resolved.
Show resolved Hide resolved
pipeline_func_result = [pipeline_func_result]

if isinstance(pipeline_func_result, tuple) and hasattr(pipeline_func_result, '_asdict'): # collections.namedtuple and typing.NamedTuple
pipeline_func_result = pipeline_func_result._asdict()

if isinstance(pipeline_func_result, dict):
if output_names:
if set(output_names) != set(pipeline_func_result.keys()):
raise ValueError('Returned outputs do not match outputs specified in the function signature: {} = {}'.format(str(set(pipeline_func_result.keys())), str(set(output_names))))

if pipeline_func_result is None:
graph_output_value_map = {}
elif isinstance(pipeline_func_result, dict):
graph_output_value_map = OrderedDict(pipeline_func_result)
elif isinstance(pipeline_func_result, (list, tuple)):
if output_names:
if len(pipeline_func_result) != len(output_names):
raise ValueError('Expected {} values from pipeline function, but got {}.'.format(len(output_names), len(pipeline_func_result)))
graph_output_value_map = OrderedDict((name_value[0], name_value[1]) for name_value in zip(output_names, pipeline_func_result))
else:
graph_output_value_map = OrderedDict((output_value.task_output.output_name, output_value) for output_value in pipeline_func_result) # TODO: Fix possible name non-uniqueness (e.g. use task id as prefix or add index to non-unique names)
else:
raise TypeError('Pipeline must return outputs as tuple or OrderedDict.')

#Checking the pipeline_func output object types
for output_name, output_value in graph_output_value_map.items():
if not isinstance(output_value, TaskOutputArgument):
raise TypeError('Only TaskOutputArgument instances should be returned from graph component, but got "{output_name}" = "{}".'.format(output_name, str(output_value)))

if not component_spec.outputs and graph_output_value_map:
component_spec.outputs = [OutputSpec(name=output_name, type=output_value.task_output.type) for output_name, output_value in graph_output_value_map.items()]

component_spec.implementation = GraphImplementation(
graph=GraphSpec(
tasks=task_map,
output_values=graph_output_value_map,
)
)
return component_spec
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: Component with 0 inputs and 2 outputs
outputs:
- {name: Output 1}
- {name: Output 2}
implementation:
container:
image: busybox
command: [sh, -c, '
echo "Data 1" > $0
echo "Data 2" > $1
'
]
args:
- {outputPath: Output 1}
- {outputPath: Output 2}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: Component with 2 inputs and 0 outputs
inputs:
- {name: Input parameter}
- {name: Input artifact}
implementation:
container:
image: busybox
command: [sh, -c, '
echo "Input parameter = $0"
echo "Input artifact = $(< $1)"
'
]
args:
- {inputValue: Input parameter}
- {inputPath: Input artifact}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Component with 2 inputs and 2 outputs
inputs:
- {name: Input parameter}
- {name: Input artifact}
outputs:
- {name: Output 1}
- {name: Output 2}
implementation:
container:
image: busybox
command: [sh, -c, '
mkdir -p $(dirname "$2")
mkdir -p $(dirname "$3")
echo "$0" > "$2"
cp "$1" "$3"
'
]
args:
- {inputValue: Input parameter}
- {inputPath: Input artifact}
- {outputPath: Output 1}
- {outputPath: Output 2}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import unittest
from collections import OrderedDict
from pathlib import Path

import kfp.components as comp
from kfp.components._python_to_graph_component import create_graph_component_spec_from_pipeline_func


class PythonPipelineToGraphComponentTestCase(unittest.TestCase):
def test_handle_creating_graph_component_from_pipeline_that_uses_container_components(self):
test_data_dir = Path(__file__).parent / 'test_data'
producer_op = comp.load_component_from_file(str(test_data_dir / 'component_with_0_inputs_and_2_outputs.component.yaml'))
processor_op = comp.load_component_from_file(str(test_data_dir / 'component_with_2_inputs_and_2_outputs.component.yaml'))
consumer_op = comp.load_component_from_file(str(test_data_dir / 'component_with_2_inputs_and_0_outputs.component.yaml'))

def pipeline1(pipeline_param_1: int):
producer_task = producer_op()
processor_task = processor_op(pipeline_param_1, producer_task.outputs['Output 2'])
consumer_task = consumer_op(processor_task.outputs['Output 1'], processor_task.outputs['Output 2'])

return OrderedDict([ # You can safely return normal dict in python 3.6+
('Pipeline output 1', producer_task.outputs['Output 1']),
('Pipeline output 2', processor_task.outputs['Output 2']),
])

graph_component = create_graph_component_spec_from_pipeline_func(pipeline1)

self.assertEqual(len(graph_component.inputs), 1)
self.assertListEqual([input.name for input in graph_component.inputs], ['pipeline_param_1']) #Relies on human name conversion function stability
self.assertListEqual([output.name for output in graph_component.outputs], ['Pipeline output 1', 'Pipeline output 2'])
self.assertEqual(len(graph_component.implementation.graph.tasks), 3)


if __name__ == '__main__':
unittest.main()