Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK/Components - Simplified _create_task_factory_from_component_spec function #662

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 30 additions & 136 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,156 +186,50 @@ def _make_name_unique_by_adding_index(name:str, collection, delimiter:str):
return unique_name


#Holds the transformation functions that are called each time TaskSpec instance is created from a component. If there are multiple handlers, the last one is used.
_created_task_transformation_handler = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments about _created_task_transformation_handler? The only place this is populated seems to be the line 195 so it is not clear why it needs to exist as a list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is to set it in the Pipeline context. I had that code in the PR, but then I decided to split that into its own PR. The intent will be more obvious there.
Python context managers that replace some value usually make the value holder a list so that the context is re-entrant - so that the code does not break if you happen to have nested contexts. https://docs.python.org/3/library/contextlib.html#reentrant-cms

The high-level idea is as follows: Conceptually, what you get when you give arguments to a Component is a Task. But currently we need to get a ContainerOp instance. So there needs to be a TaskSpec -> ContainerOp transformation that's applied automatically when the pipeline is being compiled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think would be the best wording for a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some new cases (the existing cases will continue to work) load_component will need to return TaskSpec objects instead of ContainerOp objects. So there needs to be a way to enable/disable the TaskSpec conversion to ContainerOp. The _created_task_transformation_handler holds the transformation procedure that can be changed or disabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since some of the code is for graph components, would you hold it off this when we decide graph component priority? Or, is it possible to decouple graph component from container component?

Copy link
Contributor Author

@Ark-kun Ark-kun Jan 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code change is a foundation that is needed for multiple puzzle pieces that me and other team members are delivering this quarter. It's needed for the following efforts:

  • Debug a single component locally (container components for now, graph components - later)
  • Submit a run for a single component which is needed for component tests (container components for now, graph components - later)
  • Intermediate YAML API in the backend.
  • Passing the pipeline metadata (description) and input/output metadata (names, descriptions, types) to the UI. After discussing with @vicaire, @IronPan and @gaoning777 it was decided to skip the "pass information through Argo template metadata" short-term approach I suggested and either go with the intermediate YAML only or attach that intermediate yaml to the end of Argo YAML.
  • Implementing static type checking - All 6 cases (submission from UI, submission from Python, constant arguments in python pipeline, constant arguments in graph component, passing outputs to inputs when composing pipelines in Python, passing outputs to inputs when composing pipelines in graph component YAML)
  • Artifact support. Artifacts are involved when passing data between components which happens in a graph and requires the graph support.

The reason this affects that much is that many features depend on having the full information which TaskSpec has, but that's lost in transition to ContainerOp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks @Ark-kun.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify: The actual change that will make "component task factory function" output TaskSpec instead of ContainerOp in some scenarios is coming in the next PR.

This PR is just a refactoring that does not change the behavior.

BTW, this was the intent behind the whole _dsl_bridge.py file and the _task_object_factory variable - bridge between the component structures and dsl structures like ContainerOp https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/components/_dsl_bridge.py#L40 . It turned out the initial effort was insufficient since only limited information was passed to that handler.



#TODO: Move to the dsl.Pipeline context class
from . import _dsl_bridge
_created_task_transformation_handler.append(_dsl_bridge.create_container_op_from_task)


#TODO: Refactor the function to make it shorter
def _create_task_factory_from_component_spec(component_spec:ComponentSpec, component_filename=None):
def _create_task_factory_from_component_spec(component_spec:ComponentSpec, component_filename=None, component_ref: ComponentReference = None):
name = component_spec.name or _default_component_name
description = component_spec.description

inputs_list = component_spec.inputs or [] #List[InputSpec]
outputs_list = component_spec.outputs or [] #List[OutputSpec]

inputs_dict = {port.name: port for port in inputs_list}

#Creating the name translation tables : Original <-> Pythonic
input_name_to_pythonic = {}
pythonic_name_to_input_name = {}

input_name_to_kubernetes = {}
output_name_to_kubernetes = {}
kubernetes_name_to_input_name = {}
kubernetes_name_to_output_name = {}

for io_port in inputs_list:
pythonic_name = _sanitize_python_function_name(io_port.name)
pythonic_name = _make_name_unique_by_adding_index(pythonic_name, pythonic_name_to_input_name, '_')
input_name_to_pythonic[io_port.name] = pythonic_name
pythonic_name_to_input_name[pythonic_name] = io_port.name

kubernetes_name = _sanitize_kubernetes_resource_name(io_port.name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_input_name, '-')
input_name_to_kubernetes[io_port.name] = kubernetes_name
kubernetes_name_to_input_name[kubernetes_name] = io_port.name

for io_port in outputs_list:
kubernetes_name = _sanitize_kubernetes_resource_name(io_port.name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_output_name, '-')
output_name_to_kubernetes[io_port.name] = kubernetes_name
kubernetes_name_to_output_name[kubernetes_name] = io_port.name

container_spec = component_spec.implementation.container
container_image = container_spec.image

file_outputs_from_def = OrderedDict()
if container_spec.file_outputs != None:
for param, path in container_spec.file_outputs.items():
output_key = output_name_to_kubernetes[param]
file_outputs_from_def[output_key] = path

def create_container_op_with_expanded_arguments(pythonic_input_argument_values):
file_outputs = file_outputs_from_def.copy()

def expand_command_part(arg): #input values with original names
#(Union[str,Mapping[str, Any]]) -> Union[str,List[str]]
if arg is None:
return None
if isinstance(arg, (str, int, float, bool)):
return str(arg)

if isinstance(arg, InputValuePlaceholder):
port_name = arg.input_name
input_value = pythonic_input_argument_values[input_name_to_pythonic[port_name]]
if input_value is not None:
return str(input_value)
else:
input_spec = inputs_dict[port_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
raise ValueError('No value provided for input {}'.format(port_name))

if isinstance(arg, InputPathPlaceholder):
port_name = arg.input_name
input_filename = _generate_input_file_name(port_name)
input_key = input_name_to_kubernetes[port_name]
input_value = pythonic_input_argument_values[input_name_to_pythonic[port_name]]
if input_value is not None:
return input_filename
else:
input_spec = inputs_dict[port_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
raise ValueError('No value provided for input {}'.format(port_name))

elif isinstance(arg, OutputPathPlaceholder):
port_name = arg.output_name
output_filename = _generate_output_file_name(port_name)
output_key = output_name_to_kubernetes[port_name]
if output_key in file_outputs:
if file_outputs[output_key] != output_filename:
raise ValueError('Conflicting output files specified for port {}: {} and {}'.format(port_name, file_outputs[output_key], output_filename))
else:
file_outputs[output_key] = output_filename

return output_filename

elif isinstance(arg, ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)

elif isinstance(arg, IfPlaceholder):
arg = arg.if_structure
condition_result = expand_command_part(arg.condition)
from distutils.util import strtobool
condition_result_bool = condition_result and strtobool(condition_result) #Python gotcha: bool('False') == True; Need to use strtobool; Also need to handle None and []
result_node = arg.then_value if condition_result_bool else arg.else_value
if result_node is None:
return []
if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
else:
expanded_result = expand_command_part(result_node)
return expanded_result

elif isinstance(arg, IsPresentPlaceholder):
pythonic_input_name = input_name_to_pythonic[arg.input_name]
argument_is_present = pythonic_input_argument_values[pythonic_input_name] is not None
return str(argument_is_present)
else:
raise TypeError('Unrecognized argument type: {}'.format(arg))

def expand_argument_list(argument_list):
expanded_list = []
if argument_list is not None:
for part in argument_list:
expanded_part = expand_command_part(part)
if expanded_part is not None:
if isinstance(expanded_part, list):
expanded_list.extend(expanded_part)
else:
expanded_list.append(str(expanded_part))
return expanded_list

expanded_command = expand_argument_list(container_spec.command)
expanded_args = expand_argument_list(container_spec.args)

#Working around Python's variable scoping. Do not write to variable from global scope as that makes the variable local.

file_outputs_to_pass = file_outputs
if file_outputs_to_pass == {}:
file_outputs_to_pass = None

from . import _dsl_bridge
return _dsl_bridge._task_object_factory(
name=name,
container_image=container_image,
command=expanded_command,
arguments=expanded_args,
file_outputs=file_outputs_to_pass,
if component_ref is None:
component_ref = ComponentReference(name=component_spec.name or component_filename or _default_component_name)
component_ref._component_spec = component_spec

def create_task_from_component_and_arguments(pythonic_arguments):
#Converting the argument names and not passing None arguments
valid_argument_types = (str, int, float, bool, GraphInputArgument, TaskOutputArgument) #Hack for passed PipelineParams. TODO: Remove the hack once they're no longer passed here.
arguments = {
pythonic_name_to_input_name[k]: (v if isinstance(v, valid_argument_types) else str(v))
for k, v in pythonic_arguments.items()
if v is not None
}
task = TaskSpec(
component_ref=component_ref,
arguments=arguments,
)
if _created_task_transformation_handler:
task = _created_task_transformation_handler[-1](task)
return task

import inspect
from . import _dynamic
Expand All @@ -346,7 +240,7 @@ def expand_argument_list(argument_list):
factory_function_parameters = input_parameters #Outputs are no longer part of the task factory function signature. The paths are always generated by the system.

return _dynamic.create_function_from_parameters(
create_container_op_with_expanded_arguments,
create_task_from_component_and_arguments,
factory_function_parameters,
documentation=description,
func_name=name,
Expand Down
128 changes: 125 additions & 3 deletions sdk/python/kfp/components/_dsl_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,120 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import OrderedDict
from ._structures import ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec
from ._components import _generate_output_file_name, _default_component_name


def create_container_op_from_task(task_spec: TaskSpec):
argument_values = task_spec.arguments
component_spec = task_spec.component_ref._component_spec

inputs_dict = {input_spec.name: input_spec for input_spec in component_spec.inputs or []}
container_spec = component_spec.implementation.container

output_paths = OrderedDict() #Preserving the order to make the kubernetes output names deterministic
unconfigurable_output_paths = container_spec.file_outputs or {}
for output in component_spec.outputs or []:
if output.name in unconfigurable_output_paths:
output_paths[output.name] = unconfigurable_output_paths[output.name]

def expand_command_part(arg): #input values with original names
#(Union[str,Mapping[str, Any]]) -> Union[str,List[str]]
if arg is None:
return None
if isinstance(arg, (str, int, float, bool)):
return str(arg)

if isinstance(arg, InputValuePlaceholder):
input_name = arg.input_name
input_value = argument_values.get(input_name, None)
if input_value is not None:
return str(input_value)
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
return None
else:
raise ValueError('No value provided for input {}'.format(input_name))

if isinstance(arg, InputPathPlaceholder):
input_name = arg.input_name
input_value = argument_values.get(input_name, None)
if input_value is not None:
raise ValueError('ContainerOp does not support input artifacts - input {}'.format(input_name))
#return input_value
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
raise ValueError('No value provided for input {}'.format(input_name))

elif isinstance(arg, OutputPathPlaceholder):
output_name = arg.output_name
output_filename = _generate_output_file_name(output_name)
if arg.output_name in output_paths:
if output_paths[output_name] != output_filename:
raise ValueError('Conflicting output files specified for port {}: {} and {}'.format(output_name, output_paths[output_name], output_filename))
else:
output_paths[output_name] = output_filename

return output_filename

elif isinstance(arg, ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)

elif isinstance(arg, IfPlaceholder):
arg = arg.if_structure
condition_result = expand_command_part(arg.condition)
from distutils.util import strtobool
condition_result_bool = condition_result and strtobool(condition_result) #Python gotcha: bool('False') == True; Need to use strtobool; Also need to handle None and []
result_node = arg.then_value if condition_result_bool else arg.else_value
if result_node is None:
return []
if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
else:
expanded_result = expand_command_part(result_node)
return expanded_result

elif isinstance(arg, IsPresentPlaceholder):
argument_is_present = argument_values.get(arg.input_name, None) is not None
return str(argument_is_present)
else:
raise TypeError('Unrecognized argument type: {}'.format(arg))

def expand_argument_list(argument_list):
expanded_list = []
if argument_list is not None:
for part in argument_list:
expanded_part = expand_command_part(part)
if expanded_part is not None:
if isinstance(expanded_part, list):
expanded_list.extend(expanded_part)
else:
expanded_list.append(str(expanded_part))
return expanded_list

expanded_command = expand_argument_list(container_spec.command)
expanded_args = expand_argument_list(container_spec.args)

return _task_object_factory(
name=component_spec.name or _default_component_name,
container_image=container_spec.image,
command=expanded_command,
arguments=expanded_args,
output_paths=output_paths,
)


_dummy_pipeline=None

def _create_task_object(name:str, container_image:str, command=None, arguments=None, file_outputs=None):
def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, output_paths=None):
from .. import dsl
global _dummy_pipeline
need_dummy = dsl.Pipeline._default_pipeline is None
Expand All @@ -23,12 +134,23 @@ def _create_task_object(name:str, container_image:str, command=None, arguments=N
_dummy_pipeline = dsl.Pipeline('dummy pipeline')
_dummy_pipeline.__enter__()

from ._components import _sanitize_kubernetes_resource_name, _make_name_unique_by_adding_index
output_name_to_kubernetes = {}
kubernetes_name_to_output_name = {}
for output_name in (output_paths or {}).keys():
kubernetes_name = _sanitize_kubernetes_resource_name(output_name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_output_name, '-')
output_name_to_kubernetes[output_name] = kubernetes_name
kubernetes_name_to_output_name[kubernetes_name] = output_name

output_paths_for_container_op = {output_name_to_kubernetes[name]: path for name, path in output_paths.items()}

task = dsl.ContainerOp(
name=name,
image=container_image,
command=command,
arguments=arguments,
file_outputs=file_outputs,
file_outputs=output_paths_for_container_op,
)

if need_dummy:
Expand All @@ -37,4 +159,4 @@ def _create_task_object(name:str, container_image:str, command=None, arguments=N
return task


_task_object_factory=_create_task_object
_task_object_factory=_create_container_op_from_resolved_task