Skip to content

Commit

Permalink
compiler for IR (#4529)
Browse files Browse the repository at this point in the history
* Compile IR proto in setup.py

* compile to IR

* Fix importer node logic and lint

* cleanup and lint

* merge, undo setup.py change

* cleanup and lint

* remove currently unused code

* format _component_bridge.py

* cleanup and format

* cleanup

* upgrade protobuf in test

* restructure and test

* address review comments

* fix bug

* avoid f-strings formatting

* address review comments

* address review comments

* limit the primitive types to only int, double, and string.

* Fix test for python3.5

* use instance_schema instead of schema_title

* add v2 to setup.py

* address review comments

* move the tests closer to the code

* add more tests

* cleanup and linting

* add more tests

* fix bug on input paramter connection

* linting

* restructure tests

* fix python3.5 test failure

* support outputs.parameters placeholder

* remove pipeline decorator from v2.dsl
  • Loading branch information
chensun committed Oct 14, 2020
1 parent b601832 commit 5020fd1
Show file tree
Hide file tree
Showing 24 changed files with 1,616 additions and 2 deletions.
11 changes: 11 additions & 0 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ def _resolve_command_line_and_paths(
arguments: Mapping[str, str],
input_path_generator=_generate_input_file_name,
output_path_generator=_generate_output_file_name,
output_value_generator=None,
argument_serializer=serialize_value,
) -> _ResolvedCommandLineAndPaths:
"""Resolves the command line argument placeholders. Also produces the maps of the generated inpuit/output paths."""
Expand All @@ -399,6 +400,7 @@ def _resolve_command_line_and_paths(
raise TypeError('Only container components have command line to resolve')

inputs_dict = {input_spec.name: input_spec for input_spec in component_spec.inputs or []}
outputs_dict = {output_spec.name: output_spec for output_spec in component_spec.outputs or []}
container_spec = component_spec.implementation.container

output_paths = OrderedDict() #Preserving the order to make the kubernetes output names deterministic
Expand Down Expand Up @@ -455,6 +457,15 @@ def expand_command_part(arg) -> Union[str, List[str], None]:
else:
output_paths[output_name] = output_filename

# output_value_generator is only used by the v2 (experimental) compiler.
# TODO: maybe fork the file so that we can split the logic for v1 vs. v2.
if output_value_generator:
output_spec = outputs_dict[output_name]
# TODO: move the import to the top of the file (once fixed circular dependency).
from kfp.v2.dsl import type_utils
if type_utils.is_parameter_type(output_spec.type):
return output_value_generator(output_name)

return output_filename

elif isinstance(arg, ConcatPlaceholder):
Expand Down
13 changes: 13 additions & 0 deletions sdk/python/kfp/v2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
15 changes: 15 additions & 0 deletions sdk/python/kfp/v2/compiler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp.v2.compiler.compiler import Compiler
230 changes: 230 additions & 0 deletions sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""KFP DSL v2 compiler.
This is an experimental implementation of KFP compiler that compiles KFP
pipeline into Pipeline IR:
https://docs.google.com/document/d/1PUDuSQ8vmeKSBloli53mp7GIvzekaY7sggg6ywy35Dk/
"""

import inspect
from typing import Any, Callable, List, Optional

import kfp
from kfp.compiler._k8s_helper import sanitize_k8s_name
from kfp.components import _python_op
from kfp.components import structures
from kfp.v2 import dsl
from kfp.v2.compiler import importer_node
from kfp.v2.dsl import type_utils
from kfp.v2.proto import pipeline_spec_pb2

from google.protobuf import json_format


class Compiler(object):
"""Experimental DSL compiler that targets the PipelineSpec IR.
It compiles pipeline function into PipelineSpec json string.
PipelineSpec is the IR protobuf message that defines a pipeline:
https://github.com/kubeflow/pipelines/blob/237795539f7b85bac77435e2464367226ee19391/api/v2alpha1/pipeline_spec.proto#L8
In this initial implementation, we only support components authored through
Component yaml spec. And we don't support advanced features like conditions,
static and dynamic loops, etc.
Example:
How to use the compiler to construct pipeline_spec json:
@dsl.pipeline(
name='name',
description='description'
)
def my_pipeline(a: int = 1, b: str = "default value"):
...
kfp.v2.compiler.Compiler().compile(my_pipeline, 'path/to/pipeline.json')
"""

def _create_pipeline_spec(
self,
args: List[dsl.PipelineParam],
pipeline: dsl.Pipeline,
) -> pipeline_spec_pb2.PipelineSpec:
"""Creates the pipeline spec object.
Args:
args: The list of pipeline arguments.
pipeline: The instantiated pipeline object.
Returns:
A PipelineSpec proto representing the compiled pipeline.
Raises:
NotImplementedError if the argument is of unsupported types.
"""
pipeline_spec = pipeline_spec_pb2.PipelineSpec()
if not pipeline.name:
raise ValueError('Pipeline name is required.')
pipeline_spec.pipeline_info.name = pipeline.name
pipeline_spec.sdk_version = 'kfp-{}'.format(kfp.__version__)
pipeline_spec.schema_version = 'v2alpha1'

# Pipeline Parameters
for arg in args:
if arg.value is not None:
if isinstance(arg.value, int):
pipeline_spec.runtime_parameters[
arg.name].type = pipeline_spec_pb2.PrimitiveType.INT
pipeline_spec.runtime_parameters[
arg.name].default_value.int_value = arg.value
elif isinstance(arg.value, float):
pipeline_spec.runtime_parameters[
arg.name].type = pipeline_spec_pb2.PrimitiveType.DOUBLE
pipeline_spec.runtime_parameters[
arg.name].default_value.double_value = arg.value
elif isinstance(arg.value, str):
pipeline_spec.runtime_parameters[
arg.name].type = pipeline_spec_pb2.PrimitiveType.STRING
pipeline_spec.runtime_parameters[
arg.name].default_value.string_value = arg.value
else:
raise NotImplementedError(
'Unexpected parameter type with: "{}".'.format(str(arg.value)))

deployment_config = pipeline_spec_pb2.PipelineDeploymentConfig()
importer_tasks = []

for op in pipeline.ops.values():
component_spec = op._metadata
task = pipeline_spec.tasks.add()
task.CopyFrom(op.task_spec)
deployment_config.executors[task.executor_label].container.CopyFrom(
op.container_spec)

# Check if need to insert importer node
for input_name in task.inputs.artifacts:
if not task.inputs.artifacts[input_name].producer_task:
artifact_type = type_utils.get_input_artifact_type_schema(
input_name, component_spec.inputs)

importer_task, importer_spec = importer_node.build_importer_spec(
task, input_name, artifact_type)
importer_tasks.append(importer_task)

task.inputs.artifacts[
input_name].producer_task = importer_task.task_info.name
task.inputs.artifacts[
input_name].output_artifact_key = importer_node.OUTPUT_KEY

deployment_config.executors[
importer_task.executor_label].importer.CopyFrom(importer_spec)

pipeline_spec.deployment_config.Pack(deployment_config)
pipeline_spec.tasks.extend(importer_tasks)

return pipeline_spec

def _create_pipeline(
self,
pipeline_func: Callable[..., Any],
pipeline_name: Optional[str] = None,
) -> pipeline_spec_pb2.PipelineSpec:
"""Creates a pipeline instance and constructs the pipeline spec from it.
Args:
pipeline_func: Pipeline function with @dsl.pipeline decorator.
pipeline_name: The name of the pipeline. Optional.
Returns:
The IR representation (pipeline spec) of the pipeline.
"""

# Create the arg list with no default values and call pipeline function.
# Assign type information to the PipelineParam
pipeline_meta = _python_op._extract_component_interface(pipeline_func)
pipeline_name = pipeline_name or pipeline_meta.name

args_list = []
signature = inspect.signature(pipeline_func)
for arg_name in signature.parameters:
arg_type = None
for pipeline_input in pipeline_meta.inputs or []:
if arg_name == pipeline_input.name:
arg_type = pipeline_input.type
break
args_list.append(
dsl.PipelineParam(
sanitize_k8s_name(arg_name, True), param_type=arg_type))

with dsl.Pipeline(pipeline_name) as dsl_pipeline:
pipeline_func(*args_list)

# Fill in the default values.
args_list_with_defaults = []
if pipeline_meta.inputs:
args_list_with_defaults = [
dsl.PipelineParam(
sanitize_k8s_name(input_spec.name, True),
value=input_spec.default) for input_spec in pipeline_meta.inputs
]

pipeline_spec = self._create_pipeline_spec(
args_list_with_defaults,
dsl_pipeline,
)

return pipeline_spec

def compile(self,
pipeline_func: Callable[..., Any],
output_path: str,
type_check: bool = True,
pipeline_name: str = None) -> None:
"""Compile the given pipeline function into workflow yaml.
Args:
pipeline_func: Pipeline function with @dsl.pipeline decorator.
output_path: The output pipeline spec .json file path. for example,
"~/a.json"
type_check: Whether to enable the type check or not, default: True.
pipeline_name: The name of the pipeline. Optional.
"""
type_check_old_value = kfp.TYPE_CHECK
try:
kfp.TYPE_CHECK = type_check
pipeline = self._create_pipeline(pipeline_func, pipeline_name)
self._write_pipeline(pipeline, output_path)
finally:
kfp.TYPE_CHECK = type_check_old_value

def _write_pipeline(self, pipeline_spec: pipeline_spec_pb2.PipelineSpec,
output_path: str) -> None:
"""Dump pipeline spec into json file.
Args:
pipeline_spec: IR pipeline spec.
ouput_path: The file path to be written.
Raises:
ValueError: if the specified output path doesn't end with the acceptable
extentions.
"""
json_text = json_format.MessageToJson(pipeline_spec)

if output_path.endswith('.json'):
with open(output_path, 'w') as json_file:
json_file.write(json_text)
else:
raise ValueError(
'The output path {} should ends with ".json".'.format(output_path))
75 changes: 75 additions & 0 deletions sdk/python/kfp/v2/compiler/compiler_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import tempfile
import unittest
from kfp import components
from kfp import dsl
from kfp.v2 import compiler


class CompilerTest(unittest.TestCase):

def test_compile_simple_pipeline(self):

tmpdir = tempfile.mkdtemp()
try:
producer_op = components.load_component_from_text("""
name: producer
inputs:
- {name: input_param, type: String}
outputs:
- {name: output_model, type: Model}
- {name: output_value, type: Integer}
implementation:
container:
image: gcr.io/my-project/my-image:tag
args:
- {inputValue: input_param}
- {outputPath: output_model}
- {outputPath: output_value}
""")

consumer_op = components.load_component_from_text("""
name: consumer
inputs:
- {name: input_model, type: Model}
- {name: input_value, type: Integer}
implementation:
container:
image: gcr.io/my-project/my-image:tag
args:
- {inputPath: input_model}
- {inputValue: input_value}
""")

@dsl.pipeline(name='two-step-pipeline')
def simple_pipeline(pipeline_input='Hello KFP!',):
producer = producer_op(input_param=pipeline_input)
consumer = consumer_op(
input_model=producer.outputs['output_model'],
input_value=producer.outputs['output_value'])

target_json_file = os.path.join(tmpdir, 'result.json')
compiler.Compiler().compile(simple_pipeline, target_json_file)

self.assertTrue(os.path.exists(target_json_file))
finally:
shutil.rmtree(tmpdir)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 5020fd1

Please sign in to comment.