Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass meta to containerop and pipeline #905

Merged
merged 66 commits into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
592ca36
add core types and type checking function
gaoning777 Mar 1, 2019
3a3fc17
fix unit test bug
gaoning777 Mar 1, 2019
db45632
avoid defining dynamic classes
gaoning777 Mar 1, 2019
dcc3baf
typo fix
gaoning777 Mar 1, 2019
89c0907
add component metadata format
gaoning777 Mar 1, 2019
e55515e
add a construct for the component decorator
gaoning777 Mar 1, 2019
eed41fa
add default values for the meta classes
gaoning777 Mar 1, 2019
d74d4d8
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 1, 2019
18d5459
add input/output types to the metadata
gaoning777 Mar 1, 2019
9218ffe
add from_dict in TypeMeta
gaoning777 Mar 1, 2019
d28d555
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 1, 2019
aac35b1
small fix
gaoning777 Mar 1, 2019
5ccc857
add unit tests
gaoning777 Mar 1, 2019
3081d47
use python struct for the openapi schema
gaoning777 Mar 1, 2019
dcf2ecc
Merge branch 'add-core-types-and-checking' into add-component-metadata
gaoning777 Mar 1, 2019
a2c89de
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 1, 2019
b0a7314
add default in parameter
gaoning777 Mar 1, 2019
c51ccf7
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 1, 2019
b69b807
add default value
gaoning777 Mar 1, 2019
57232e4
remove the str restriction for the param default
gaoning777 Mar 1, 2019
eb7462a
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 1, 2019
04b86f2
bug fix
gaoning777 Mar 1, 2019
622ad68
add pipelinemeta
gaoning777 Mar 1, 2019
8834507
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 1, 2019
ac49e68
add pipeline metadata
gaoning777 Mar 2, 2019
e05e5e9
ignore annotation if it is not str/BaseType/dict
gaoning777 Mar 2, 2019
10ceb09
update param name in the check_type functions
gaoning777 Mar 4, 2019
6084865
Merge branch 'add-core-types-and-checking' into add-component-metadata
gaoning777 Mar 4, 2019
987e22f
remove default values for non-primitive types in the function signature
gaoning777 Mar 4, 2019
a37084b
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 4, 2019
1c4a96e
pass metadata from component decorator and task factory to containerOp
gaoning777 Mar 4, 2019
f140361
pass pipeline metadata to Pipeline
gaoning777 Mar 4, 2019
8961e22
Merge branch 'master' into pass-meta-to-containerop
gaoning777 Mar 4, 2019
4e577e3
fix unit test
gaoning777 Mar 5, 2019
3d625b3
typo in the comments
gaoning777 Mar 5, 2019
dd2601f
Merge branch 'master' into add-component-metadata
gaoning777 Mar 5, 2019
03c09d9
Merge branch 'master' into add-component-metadata
gaoning777 Mar 5, 2019
9c233b3
move the metadata classes to a separate module
gaoning777 Mar 5, 2019
ad66a1a
Merge branch 'master' into add-python-meta
gaoning777 Mar 5, 2019
25f6b33
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 5, 2019
8ddfbb2
fix unit test
gaoning777 Mar 5, 2019
29e0424
Merge branch 'add-python-meta' into pass-meta-to-containerop
gaoning777 Mar 5, 2019
a1f30a1
small change
gaoning777 Mar 5, 2019
637b921
add __eq__ to meta classes
gaoning777 Mar 5, 2019
40224da
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 5, 2019
0112b20
Merge branch 'add-python-meta' into pass-meta-to-containerop
gaoning777 Mar 5, 2019
2759d63
nothing
gaoning777 Mar 5, 2019
2392815
fix unit test
gaoning777 Mar 5, 2019
1621f0c
Merge branch 'add-component-metadata' into add-python-meta
gaoning777 Mar 5, 2019
36f89a0
Merge branch 'add-python-meta' into pass-meta-to-containerop
gaoning777 Mar 5, 2019
e51ecf7
unit test python component
gaoning777 Mar 5, 2019
f3f02c4
unit test python pipeline
gaoning777 Mar 5, 2019
8312845
fix bug: duplicate variable of args
gaoning777 Mar 5, 2019
ba764d3
Merge branch 'add-python-meta' into pass-meta-to-containerop
gaoning777 Mar 5, 2019
31d7b33
fix unit tests
gaoning777 Mar 5, 2019
7bf701a
move python_component and _component decorator in _component file
gaoning777 Mar 5, 2019
356cfc9
Merge branch 'add-python-meta' into pass-meta-to-containerop
gaoning777 Mar 5, 2019
094cf77
Merge branch 'master' into add-python-meta
gaoning777 Mar 6, 2019
16c2f8f
remove the print
gaoning777 Mar 6, 2019
58c0d91
Merge branch 'add-python-meta' into pass-meta-to-containerop
gaoning777 Mar 6, 2019
b74c527
change parameter default value to None
gaoning777 Mar 6, 2019
e1d4ae5
add functools wraps around _component decorator
gaoning777 Mar 6, 2019
803c038
TypeMeta accept both str and dict
gaoning777 Mar 6, 2019
37cdb04
Merge branch 'master' into pass-meta-to-containerop
gaoning777 Mar 6, 2019
3cc1cae
fix indent, add unit test for type as strings
gaoning777 Mar 6, 2019
b06abd3
do not set default value for the name field in ParameterMeta, Compone…
gaoning777 Mar 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def _compile(self, pipeline_func):
if pipeline_func not in registered_pipeline_functions:
raise ValueError('Please use a function with @dsl.pipeline decorator.')

pipeline_name, _ = dsl.Pipeline.get_pipeline_functions()[pipeline_func]
pipeline_name = dsl.Pipeline.get_pipeline_functions()[pipeline_func].name
pipeline_name = K8sHelper.sanitize_k8s_name(pipeline_name)

# Create the arg list with no default values and call pipeline function.
Expand Down
17 changes: 15 additions & 2 deletions sdk/python/kfp/components/_dsl_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Mapping
from ._structures import ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec
from ._components import _generate_output_file_name, _default_component_name

from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta

def create_container_op_from_task(task_spec: TaskSpec):
argument_values = task_spec.arguments
Expand Down Expand Up @@ -125,12 +125,13 @@ def expand_argument_list(argument_list):
arguments=expanded_args,
output_paths=output_paths,
env=container_spec.env,
component_spec=component_spec,
)


_dummy_pipeline=None

def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, output_paths=None, env : Mapping[str, str]=None):
def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, output_paths=None, env : Mapping[str, str]=None, component_spec=None):
from .. import dsl
global _dummy_pipeline
need_dummy = dsl.Pipeline._default_pipeline is None
Expand All @@ -150,6 +151,16 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma

output_paths_for_container_op = {output_name_to_kubernetes[name]: path for name, path in output_paths.items()}

# Construct the ComponentMeta
component_meta = ComponentMeta(name=component_spec.name, description=component_spec.description)
# Inputs
if component_spec.inputs is not None:
for input in component_spec.inputs:
component_meta.inputs.append(ParameterMeta(name=input.name, description=input.description, type=_annotation_to_typemeta(input.type), default=input.default))
if component_spec.outputs is not None:
for output in component_spec.outputs:
component_meta.outputs.append(ParameterMeta(name=output.name, description=output.description, type=_annotation_to_typemeta(input.type)))

task = dsl.ContainerOp(
name=name,
image=container_image,
Expand All @@ -162,6 +173,8 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma
for name, value in env.items():
task.add_env_variable(k8s_client.V1EnvVar(name=name, value=value))

task._set_metadata(component_meta)

if need_dummy:
_dummy_pipeline.__exit__()

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
from ._pipeline import Pipeline, pipeline, get_pipeline_conf
from ._container_op import ContainerOp
from ._ops_group import OpsGroup, ExitHandler, Condition
from ._python_component import python_component
from ._python_component import python_component
12 changes: 11 additions & 1 deletion sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
from . import _pipeline
from . import _pipeline_param
from ._pipeline_param import _extract_pipelineparams
from ._metadata import ComponentMeta
import re
from typing import Dict


class ContainerOp(object):
"""Represents an op implemented by a docker container image."""

Expand Down Expand Up @@ -64,6 +64,7 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
self.pod_annotations = {}
self.pod_labels = {}
self.num_retries = 0
self._metadata = ComponentMeta()

self.argument_inputs = _extract_pipelineparams([str(arg) for arg in (command or []) + (arguments or [])])

Expand Down Expand Up @@ -296,3 +297,12 @@ def set_retry(self, num_retries: int):

def __repr__(self):
return str({self.__class__.__name__: self.__dict__})

def _set_metadata(self, metadata):
'''_set_metadata passes the containerop the metadata information
Args:
metadata (ComponentMeta): component metadata
'''
if not isinstance(metadata, ComponentMeta):
raise ValueError('_set_medata is expecting ComponentMeta.')
self._metadata = metadata
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
126 changes: 126 additions & 0 deletions sdk/python/kfp/dsl/_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, List
from abc import ABCMeta, abstractmethod
from ._types import BaseType, _check_valid_type_dict, _str_to_dict, _instance_to_dict

class BaseMeta(object):
__metaclass__ = ABCMeta
def __init__(self):
pass

@abstractmethod
def to_dict(self):
pass

def serialize(self):
import yaml
return yaml.dump(self.to_dict())

def __eq__(self, other):
return self.__dict__ == other.__dict__

class TypeMeta(BaseMeta):
def __init__(self,
name: str = '',
properties: Dict = None):
self.name = name
self.properties = {} if properties is None else properties

def to_dict(self):
return {self.name: self.properties}

@staticmethod
def from_dict(json_dict):
if not _check_valid_type_dict(json_dict):
raise ValueError(json_dict + ' is not a valid type string')
type_meta = TypeMeta()
type_meta.name, type_meta.properties = list(json_dict.items())[0]
return type_meta

class ParameterMeta(BaseMeta):
def __init__(self,
name: str = '',
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
description: str = '',
param_type: TypeMeta = None,
default = ''):
self.name = name
self.description = description
self.param_type = TypeMeta() if param_type is None else param_type
self.default = default

def to_dict(self):
return {'name': self.name,
'description': self.description,
'type': self.param_type.to_dict(),
'default': self.default}

class ComponentMeta(BaseMeta):
def __init__(
self,
name: str = '',
description: str = '',
inputs: List[ParameterMeta] = None,
outputs: List[ParameterMeta] = None
):
self.name = name
self.description = description
self.inputs = [] if inputs is None else inputs
self.outputs = [] if outputs is None else outputs

def to_dict(self):
return {'name': self.name,
'description': self.description,
'inputs': [ input.to_dict() for input in self.inputs ],
'outputs': [ output.to_dict() for output in self.outputs ]
}

# Add a pipeline level metadata calss here.
# If one day we combine the component and pipeline yaml, ComponentMeta and PipelineMeta will become one, too.
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
class PipelineMeta(BaseMeta):
def __init__(
self,
name: str = '',
description: str = '',
inputs: List[ParameterMeta] = None
):
self.name = name
self.description = description
self.inputs = [] if inputs is None else inputs

def to_dict(self):
return {'name': self.name,
'description': self.description,
'inputs': [ input.to_dict() for input in self.inputs ]
}

def _annotation_to_typemeta(annotation):
'''_annotation_to_type_meta converts an annotation to an instance of TypeMeta
Args:
annotation(BaseType/str/dict): input/output annotations
Returns:
TypeMeta
'''
if isinstance(annotation, BaseType):
arg_type = TypeMeta.from_dict(_instance_to_dict(annotation))
elif isinstance(annotation, str):
arg_type = TypeMeta.from_dict(_str_to_dict(annotation))
elif isinstance(annotation, dict):
if not _check_valid_type_dict(annotation):
raise ValueError('Annotation ' + str(annotation) + ' is not a valid type dictionary.')
arg_type = TypeMeta.from_dict(annotation)
else:
return TypeMeta()
return arg_type
43 changes: 40 additions & 3 deletions sdk/python/kfp/dsl/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


from . import _container_op
from ._metadata import PipelineMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta
from . import _ops_group
import sys

Expand All @@ -32,7 +33,33 @@ def my_pipeline(a: PipelineParam, b: PipelineParam):
```
"""
def _pipeline(func):
Pipeline.add_pipeline(name, description, func)
import inspect
fullargspec = inspect.getfullargspec(func)
args = fullargspec.args
annotations = fullargspec.annotations

# defaults
arg_defaults = {}
if fullargspec.defaults:
for arg, default in zip(reversed(fullargspec.args), reversed(fullargspec.defaults)):
arg_defaults[arg] = default

# Construct the PipelineMeta
pipeline_meta = PipelineMeta(name=name, description=description)
# Inputs
for arg in args:
arg_type = TypeMeta()
arg_default = arg_defaults[arg] if arg in arg_defaults else ''
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
if arg in annotations:
arg_type = _annotation_to_typemeta(annotations[arg])
pipeline_meta.inputs.append(ParameterMeta(name=arg, description='', param_type=arg_type, default=arg_default))

#TODO: add descriptions to the metadata
#docstring parser:
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py

Pipeline.add_pipeline(pipeline_meta, func)
return func

return _pipeline
Expand Down Expand Up @@ -93,9 +120,9 @@ def get_pipeline_functions():
return Pipeline._pipeline_functions

@staticmethod
def add_pipeline(name, description, func):
def add_pipeline(pipeline_meta, func):
"""Add a pipeline function (decorated with @pipeline)."""
Pipeline._pipeline_functions[func] = (name, description)
Pipeline._pipeline_functions[func] = pipeline_meta

def __init__(self, name: str):
"""Create a new instance of Pipeline.
Expand All @@ -109,6 +136,7 @@ def __init__(self, name: str):
self.groups = [_ops_group.OpsGroup('pipeline', name=name)]
self.group_id = 0
self.conf = PipelineConf()
self._metadata = PipelineMeta()

def __enter__(self):
if Pipeline._default_pipeline:
Expand Down Expand Up @@ -163,4 +191,13 @@ def get_next_group_id(self):
self.group_id += 1
return self.group_id

def _set_metadata(self, metadata):
'''_set_metadata passes the containerop the metadata information
Args:
metadata (ComponentMeta): component metadata
'''
if not isinstance(metadata, PipelineMeta):
raise ValueError('_set_medata is expecting PipelineMeta.')
self._metadata = metadata


48 changes: 48 additions & 0 deletions sdk/python/kfp/dsl/_python_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._metadata import ComponentMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta

def python_component(name, description=None, base_image=None, target_component_file: str = None):
"""Decorator for Python component functions.
This decorator adds the metadata to the function object itself.
Expand Down Expand Up @@ -47,3 +49,49 @@ def _python_component(func):
return func

return _python_component

def component(func):
"""Decorator for component functions that use ContainerOp.
This is useful to enable type checking in the DSL compiler

Usage:
```python
@dsl.component
def foobar(model: TFModel(), step: MLStep()):
return dsl.ContainerOp()
"""
def _component(*args, **kargs):
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
import inspect
fullargspec = inspect.getfullargspec(func)
annotations = fullargspec.annotations

# defaults
arg_defaults = {}
if fullargspec.defaults:
for arg, default in zip(reversed(fullargspec.args), reversed(fullargspec.defaults)):
arg_defaults[arg] = default

# Construct the ComponentMeta
component_meta = ComponentMeta(name=func.__name__, description='')
# Inputs
for arg in fullargspec.args:
arg_type = TypeMeta()
arg_default = arg_defaults[arg] if arg in arg_defaults else ''
if arg in annotations:
arg_type = _annotation_to_typemeta(annotations[arg])
component_meta.inputs.append(ParameterMeta(name=arg, description='', param_type=arg_type, default=arg_default))
# Outputs
for output in annotations['return']:
arg_type = _annotation_to_typemeta(annotations['return'][output])
component_meta.outputs.append(ParameterMeta(name=output, description='', param_type=arg_type))

#TODO: add descriptions to the metadata
#docstring parser:
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py

container_op = func(*args, **kargs)
container_op._set_metadata(component_meta)
return container_op

return _component
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _str_to_dict(payload):
return json_dict

def _check_dict_types(checked_type, expected_type):
'''_check_type_types checks the type consistency.
'''_check_dict_types checks the type consistency.
Args:
checked_type (dict): A dict that describes a type from the upstream component output
expected_type (dict): A dict that describes a type from the downstream component input
Expand Down
1 change: 0 additions & 1 deletion sdk/python/tests/dsl/container_op_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from kfp.dsl import Pipeline, PipelineParam, ContainerOp
import unittest


class TestContainerOp(unittest.TestCase):

def test_basic(self):
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/tests/dsl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import container_op_tests
import ops_group_tests
import type_tests
import python_component_tests
import metadata_tests

if __name__ == '__main__':
suite = unittest.TestSuite()
Expand All @@ -29,6 +31,8 @@
suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(container_op_tests))
suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(ops_group_tests))
suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(type_tests))
suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(python_component_tests))
suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(metadata_tests))
runner = unittest.TextTestRunner()
if not runner.run(suite).wasSuccessful():
sys.exit(1)
Expand Down
Loading