diff --git a/sdk/python/kfp/v2/compiler/compiler.py b/sdk/python/kfp/v2/compiler/compiler.py index 5bff39132e8..989f5962d83 100644 --- a/sdk/python/kfp/v2/compiler/compiler.py +++ b/sdk/python/kfp/v2/compiler/compiler.py @@ -23,9 +23,11 @@ import kfp from kfp.compiler._k8s_helper import sanitize_k8s_name +from kfp.components import _python_op from kfp.v2 import dsl from kfp.v2.compiler import compiler_utils -from kfp.components import _python_op +from kfp.v2.dsl import component_spec as dsl_component_spec +from kfp.v2.dsl import dsl_utils from kfp.v2.dsl import importer_node from kfp.v2.dsl import type_utils from kfp.pipeline_spec import pipeline_spec_pb2 @@ -75,68 +77,95 @@ def _create_pipeline_spec( """ compiler_utils.validate_pipeline_name(pipeline.name) - pipeline_spec = pipeline_spec_pb2.PipelineSpec( - runtime_parameters=compiler_utils.build_runtime_parameter_spec(args)) + pipeline_spec = pipeline_spec_pb2.PipelineSpec() pipeline_spec.pipeline_info.name = pipeline.name pipeline_spec.sdk_version = 'kfp-{}'.format(kfp.__version__) - pipeline_spec.schema_version = 'v2alpha1' + # Schema version 2.0.0 is required for kfp-pipeline-spec>0.1.3.1 + pipeline_spec.schema_version = '2.0.0' + + pipeline_spec.root.CopyFrom( + dsl_component_spec.build_root_spec_from_pipeline_params(args)) deployment_config = pipeline_spec_pb2.PipelineDeploymentConfig() - importer_tasks = [] for op in pipeline.ops.values(): - component_spec = op._metadata - task = pipeline_spec.tasks.add() - task.CopyFrom(op.task_spec) - deployment_config.executors[task.executor_label].container.CopyFrom( + task_name = op.task_spec.task_info.name + component_name = op.task_spec.component_ref.name + executor_label = op.component_spec.executor_label + + pipeline_spec.root.dag.tasks[task_name].CopyFrom(op.task_spec) + pipeline_spec.components[component_name].CopyFrom(op.component_spec) + deployment_config.executors[executor_label].container.CopyFrom( op.container_spec) + task = pipeline_spec.root.dag.tasks[task_name] # A task may have explicit depdency on other tasks even though they may # not have inputs/outputs dependency. e.g.: op2.after(op1) if op.dependent_names: + op.dependent_names = [ + dsl_utils.sanitize_task_name(name) for name in op.dependent_names + ] task.dependent_tasks.extend(op.dependent_names) # Check if need to insert importer node for input_name in task.inputs.artifacts: - if not task.inputs.artifacts[input_name].producer_task: + if not task.inputs.artifacts[ + input_name].task_output_artifact.producer_task: type_schema = type_utils.get_input_artifact_type_schema( - input_name, component_spec.inputs) - - importer_task = importer_node.build_importer_task_spec( - dependent_task=task, + input_name, op._metadata.inputs) + + importer_name = importer_node.generate_importer_base_name( + dependent_task_name=task_name, input_name=input_name) + importer_task_spec = importer_node.build_importer_task_spec( + importer_name) + importer_comp_spec = importer_node.build_importer_component_spec( + importer_base_name=importer_name, input_name=input_name, input_type_schema=type_schema) - importer_tasks.append(importer_task) + importer_task_name = importer_task_spec.task_info.name + importer_comp_name = importer_task_spec.component_ref.name + importer_exec_label = importer_comp_spec.executor_label + pipeline_spec.root.dag.tasks[importer_task_name].CopyFrom( + importer_task_spec) + pipeline_spec.components[importer_comp_name].CopyFrom( + importer_comp_spec) task.inputs.artifacts[ - input_name].producer_task = importer_task.task_info.name + input_name].task_output_artifact.producer_task = ( + importer_task_name) task.inputs.artifacts[ - input_name].output_artifact_key = importer_node.OUTPUT_KEY + input_name].task_output_artifact.output_artifact_key = ( + importer_node.OUTPUT_KEY) # Retrieve the pre-built importer spec - importer_spec = op.importer_spec[input_name] - deployment_config.executors[ - importer_task.executor_label].importer.CopyFrom(importer_spec) + importer_spec = op.importer_specs[input_name] + deployment_config.executors[importer_exec_label].importer.CopyFrom( + importer_spec) - pipeline_spec.deployment_config.Pack(deployment_config) - pipeline_spec.tasks.extend(importer_tasks) + pipeline_spec.deployment_spec.update( + json_format.MessageToDict(deployment_config)) return pipeline_spec def _create_pipeline( self, pipeline_func: Callable[..., Any], + output_directory: str, pipeline_name: Optional[str] = None, - ) -> pipeline_spec_pb2.PipelineSpec: + pipeline_parameters_override: Optional[Mapping[str, Any]] = None, + ) -> pipeline_spec_pb2.PipelineJob: """Creates a pipeline instance and constructs the pipeline spec from it. Args: pipeline_func: Pipeline function with @dsl.pipeline decorator. pipeline_name: The name of the pipeline. Optional. + output_directory: The root of the pipeline outputs. + pipeline_parameters_override: The mapping from parameter names to values. + Optional. Returns: - The IR representation (pipeline spec) of the pipeline. + A PipelineJob proto representing the compiled pipeline. """ # Create the arg list with no default values and call pipeline function. @@ -174,26 +203,14 @@ def _create_pipeline( dsl_pipeline, ) - return pipeline_spec - - def _create_pipeline_job( - self, - pipeline_spec: pipeline_spec_pb2.PipelineSpec, - pipeline_root: str, - pipeline_parameters: Optional[Mapping[str, Any]] = None, - ) -> pipeline_spec_pb2.PipelineJob: - """Creates the pipeline job spec object. - - Args: - pipeline_spec: The pipeline spec object. - pipeline_root: The root of the pipeline outputs. - pipeline_parameters: The mapping from parameter names to values. Optional. - - Returns: - A PipelineJob proto representing the compiled pipeline. - """ + pipeline_parameters = { + arg.name: arg.value for arg in args_list_with_defaults + } + # Update pipeline parameters override if there were any. + pipeline_parameters.update(pipeline_parameters_override or {}) runtime_config = compiler_utils.build_runtime_config_spec( - pipeline_root=pipeline_root, pipeline_parameters=pipeline_parameters) + output_directory=output_directory, + pipeline_parameters=pipeline_parameters) pipeline_job = pipeline_spec_pb2.PipelineJob(runtime_config=runtime_config) pipeline_job.pipeline_spec.update(json_format.MessageToDict(pipeline_spec)) @@ -220,11 +237,11 @@ def compile(self, type_check_old_value = kfp.TYPE_CHECK try: kfp.TYPE_CHECK = type_check - pipeline = self._create_pipeline(pipeline_func, pipeline_name) - pipeline_job = self._create_pipeline_job( - pipeline_spec=pipeline, - pipeline_root=pipeline_root, - pipeline_parameters=pipeline_parameters) + pipeline_job = self._create_pipeline( + pipeline_func=pipeline_func, + output_directory=pipeline_root, + pipeline_name=pipeline_name, + pipeline_parameters_override=pipeline_parameters) self._write_pipeline(pipeline_job, output_path) finally: kfp.TYPE_CHECK = type_check_old_value diff --git a/sdk/python/kfp/v2/compiler/compiler_utils.py b/sdk/python/kfp/v2/compiler/compiler_utils.py index 35ee446d5e5..1785f220f91 100644 --- a/sdk/python/kfp/v2/compiler/compiler_utils.py +++ b/sdk/python/kfp/v2/compiler/compiler_utils.py @@ -20,62 +20,22 @@ from kfp.pipeline_spec import pipeline_spec_pb2 -def build_runtime_parameter_spec( - pipeline_params: List[dsl.PipelineParam] -) -> Mapping[str, pipeline_spec_pb2.PipelineSpec.RuntimeParameter]: - """Converts pipeine parameters to runtime parameters mapping. - - Args: - pipeline_params: The list of pipeline parameters. - - Returns: - A map of pipeline parameter name to its runtime parameter message. - """ - - def to_message(param: dsl.PipelineParam): - result = pipeline_spec_pb2.PipelineSpec.RuntimeParameter() - if param.param_type == 'Integer' or (param.param_type is None and - isinstance(param.value, int)): - - result.type = pipeline_spec_pb2.PrimitiveType.INT - if param.value is not None: - result.default_value.int_value = int(param.value) - elif param.param_type == 'Float' or (param.param_type is None and - isinstance(param.value, float)): - result.type = pipeline_spec_pb2.PrimitiveType.DOUBLE - if param.value is not None: - result.default_value.double_value = float(param.value) - elif param.param_type == 'String' or param.param_type is None: - result.type = pipeline_spec_pb2.PrimitiveType.STRING - if param.value is not None: - result.default_value.string_value = str(param.value) - else: - raise TypeError('Unsupported type "{}" for argument "{}".'.format( - param.param_type, param.name)) - return result - - return {param.name: to_message(param) for param in pipeline_params} - - def build_runtime_config_spec( - pipeline_root: str, + output_directory: str, pipeline_parameters: Optional[Mapping[str, Any]] = None, ) -> pipeline_spec_pb2.PipelineJob.RuntimeConfig: """Converts pipeine parameters to runtime parameters mapping. Args: - pipeline_root: The root of pipeline outputs. + output_directory: The root of pipeline outputs. pipeline_parameters: The mapping from parameter names to values. Optional. Returns: A pipeline job RuntimeConfig object. """ - def _get_value( - value: Optional[Union[int, float, - str]]) -> Optional[pipeline_spec_pb2.Value]: - if value is None: - return None + def _get_value(value: Union[int, float, str]) -> pipeline_spec_pb2.Value: + assert value is not None, 'None values should be filterd out.' result = pipeline_spec_pb2.Value() if isinstance(value, int): @@ -91,8 +51,10 @@ def _get_value( parameter_values = pipeline_parameters or {} return pipeline_spec_pb2.PipelineJob.RuntimeConfig( - gcs_output_directory=pipeline_root, - parameters={k: _get_value(v) for k, v in parameter_values.items()}) + gcs_output_directory=output_directory, + parameters={ + k: _get_value(v) for k, v in parameter_values.items() if v is not None + }) def validate_pipeline_name(name: str) -> None: diff --git a/sdk/python/kfp/v2/compiler/compiler_utils_test.py b/sdk/python/kfp/v2/compiler/compiler_utils_test.py index a4852a4da20..9378303dbb1 100644 --- a/sdk/python/kfp/v2/compiler/compiler_utils_test.py +++ b/sdk/python/kfp/v2/compiler/compiler_utils_test.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Tests for kfp.v2.compiler.compiler_utils.""" import unittest @@ -22,56 +23,6 @@ class CompilerUtilsTest(unittest.TestCase): - def test_build_runtime_parameter_spec(self): - pipeline_params = [ - dsl.PipelineParam(name='input1', param_type='Integer', value=99), - dsl.PipelineParam(name='input2', param_type='String', value='hello'), - dsl.PipelineParam(name='input3', param_type='Float', value=3.1415926), - dsl.PipelineParam(name='input4', param_type=None, value=None), - ] - expected_dict = { - 'runtimeParameters': { - 'input1': { - 'type': 'INT', - 'defaultValue': { - 'intValue': '99' - } - }, - 'input2': { - 'type': 'STRING', - 'defaultValue': { - 'stringValue': 'hello' - } - }, - 'input3': { - 'type': 'DOUBLE', - 'defaultValue': { - 'doubleValue': '3.1415926' - } - }, - 'input4': { - 'type': 'STRING' - } - } - } - expected_spec = pipeline_spec_pb2.PipelineSpec() - json_format.ParseDict(expected_dict, expected_spec) - - pipeline_spec = pipeline_spec_pb2.PipelineSpec( - runtime_parameters=compiler_utils.build_runtime_parameter_spec( - pipeline_params)) - self.maxDiff = None - self.assertEqual(expected_spec, pipeline_spec) - - def test_build_runtime_parameter_spec_with_unsupported_type_should_fail(self): - pipeline_params = [ - dsl.PipelineParam(name='input1', param_type='Dict'), - ] - - with self.assertRaisesRegexp( - TypeError, 'Unsupported type "Dict" for argument "input1"'): - compiler_utils.build_runtime_parameter_spec(pipeline_params) - def test_build_runtime_config_spec(self): expected_dict = { 'gcsOutputDirectory': 'gs://path', @@ -85,7 +36,7 @@ def test_build_runtime_config_spec(self): json_format.ParseDict(expected_dict, expected_spec) runtime_config = compiler_utils.build_runtime_config_spec( - 'gs://path', {'input1': 'test'}) + 'gs://path', {'input1': 'test', 'input2': None}) self.assertEqual(expected_spec, runtime_config) def test_validate_pipeline_name(self): diff --git a/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py b/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py index 33b20a86297..95464273a5d 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py +++ b/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py @@ -38,13 +38,9 @@ def _test_compile_py_to_json(self, file_base_name, additional_arguments = []): golden = json.load(f) # Correct the sdkVersion golden['pipelineSpec']['sdkVersion'] = 'kfp-{}'.format(kfp.__version__) - # Need to sort the list items before comparison - golden['pipelineSpec']['tasks'].sort(key=lambda x: x['executorLabel']) with open(os.path.join(test_data_dir, target_json), 'r') as f: compiled = json.load(f) - # Need to sort the list items before comparison - compiled['pipelineSpec']['tasks'].sort(key=lambda x: x['executorLabel']) self.maxDiff = None self.assertEqual(golden, compiled) diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_after.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_after.json index a108299b712..5bc4b246159 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_after.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_after.json @@ -1,108 +1,107 @@ { "pipelineSpec": { - "deploymentConfig": { - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig", - "executors": { - "Print Text": { - "container": { - "command": [ - "sh", - "-c", - "set -e -x\necho \"$0\"\n", - "{{$.inputs.parameters['text']}}" - ], - "image": "alpine" - } - }, - "Print Text 3": { - "container": { - "image": "alpine", - "command": [ - "sh", - "-c", - "set -e -x\necho \"$0\"\n", - "{{$.inputs.parameters['text']}}" + "root": { + "dag": { + "tasks": { + "task-print-text-2": { + "inputs": { + "parameters": { + "text": { + "runtimeValue": { + "constantValue": { + "stringValue": "2nd task" + } + } + } + } + }, + "componentRef": { + "name": "comp-print-text" + }, + "taskInfo": { + "name": "task-print-text-2" + }, + "dependentTasks": [ + "task-print-text" + ] + }, + "task-print-text": { + "taskInfo": { + "name": "task-print-text" + }, + "inputs": { + "parameters": { + "text": { + "runtimeValue": { + "constantValue": { + "stringValue": "1st task" + } + } + } + } + }, + "componentRef": { + "name": "comp-print-text" + } + }, + "task-print-text-3": { + "taskInfo": { + "name": "task-print-text-3" + }, + "componentRef": { + "name": "comp-print-text" + }, + "inputs": { + "parameters": { + "text": { + "runtimeValue": { + "constantValue": { + "stringValue": "3rd task" + } + } + } + } + }, + "dependentTasks": [ + "task-print-text", + "task-print-text-2" ] - } - }, - "Print Text 2": { - "container": { - "command": [ - "sh", - "-c", - "set -e -x\necho \"$0\"\n", - "{{$.inputs.parameters['text']}}" - ], - "image": "alpine" } } } }, + "sdkVersion": "kfp-1.3.0", + "schemaVersion": "2.0.0", "pipelineInfo": { "name": "pipeline-with-after" }, - "sdkVersion": "kfp-1.1.1", - "tasks": [ - { - "executorLabel": "Print Text", - "taskInfo": { - "name": "Print Text" - }, - "inputs": { + "components": { + "comp-print-text": { + "inputDefinitions": { "parameters": { "text": { - "runtimeValue": { - "constantValue": { - "stringValue": "1st task" - } - } - } - } - } - }, - { - "inputs": { - "parameters": { - "text": { - "runtimeValue": { - "constantValue": { - "stringValue": "2nd task" - } - } + "type": "STRING" } } }, - "dependentTasks": [ - "Print Text" - ], - "taskInfo": { - "name": "Print Text 2" - }, - "executorLabel": "Print Text 2" - }, - { - "dependentTasks": [ - "Print Text", - "Print Text 2" - ], - "inputs": { - "parameters": { - "text": { - "runtimeValue": { - "constantValue": { - "stringValue": "3rd task" - } - } - } + "executorLabel": "exec-print-text" + } + }, + "deploymentSpec": { + "executors": { + "exec-print-text": { + "container": { + "image": "alpine", + "command": [ + "sh", + "-c", + "set -e -x\necho \"$0\"\n", + "{{$.inputs.parameters['text']}}" + ] } - }, - "executorLabel": "Print Text 3", - "taskInfo": { - "name": "Print Text 3" } } - ], - "schemaVersion": "v2alpha1" + } }, "runtimeConfig": { "gcsOutputDirectory": "dummy_root" diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_concat_placeholder.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_concat_placeholder.json index b97f7f2c2f7..ad6a903b149 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_concat_placeholder.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_concat_placeholder.json @@ -1,43 +1,60 @@ { "pipelineSpec": { - "pipelineInfo": { - "name": "one-step-pipeline-with-concat-placeholder" + "root": { + "dag": { + "tasks": { + "task-component-with-concat-placeholder": { + "taskInfo": { + "name": "task-component-with-concat-placeholder" + }, + "componentRef": { + "name": "comp-component-with-concat-placeholder" + }, + "inputs": { + "parameters": { + "input_prefix": { + "runtimeValue": { + "constantValue": { + "stringValue": "some prefix:" + } + } + } + } + } + } + } + } }, - "deploymentConfig": { - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig", + "schemaVersion": "2.0.0", + "deploymentSpec": { "executors": { - "Component with concat placeholder": { + "exec-component-with-concat-placeholder": { "container": { - "image": "gcr.io/my-project/my-image", "args": [ "--arg0", "{{$.inputs.parameters['input_prefix']}}some value" - ] + ], + "image": "gcr.io/my-project/my-image" } } } }, - "schemaVersion": "v2alpha1", - "sdkVersion": "kfp-1.1.0-dev20201106", - "tasks": [ - { - "taskInfo": { - "name": "Component with concat placeholder" - }, - "inputs": { + "sdkVersion": "kfp-1.3.0", + "components": { + "comp-component-with-concat-placeholder": { + "executorLabel": "exec-component-with-concat-placeholder", + "inputDefinitions": { "parameters": { "input_prefix": { - "runtimeValue": { - "constantValue": { - "stringValue": "some prefix:" - } - } + "type": "STRING" } } - }, - "executorLabel": "Component with concat placeholder" + } } - ] + }, + "pipelineInfo": { + "name": "one-step-pipeline-with-concat-placeholder" + } }, "runtimeConfig": { "gcsOutputDirectory": "dummy_root" diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_if_placeholder.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_if_placeholder.json index 4f45cbb1583..f447cbd879f 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_if_placeholder.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_if_placeholder.json @@ -1,10 +1,57 @@ { "pipelineSpec": { - "deploymentConfig": { + "pipelineInfo": { + "name": "one-step-pipeline-with-if-placeholder" + }, + "sdkVersion": "kfp-1.3.0", + "root": { + "inputDefinitions": { + "artifacts": { + "input2": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input1": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input0": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + } + } + }, + "dag": { + "tasks": { + "task-component-with-optional-inputs": { + "inputs": { + "parameters": { + "required_input": { + "componentInputParameter": "input0" + }, + "optional_input_1": { + "componentInputParameter": "input1" + } + } + }, + "componentRef": { + "name": "comp-component-with-optional-inputs" + }, + "taskInfo": { + "name": "task-component-with-optional-inputs" + } + } + } + } + }, + "schemaVersion": "2.0.0", + "deploymentSpec": { "executors": { - "Component with optional inputs": { + "exec-component-with-optional-inputs": { "container": { - "image": "gcr.io/my-project/my-image", "args": [ "--arg0", "{{$.inputs.parameters['required_input']}}", @@ -12,50 +59,30 @@ "{{$.inputs.parameters['optional_input_1']}}", "--arg3", "default value" - ] + ], + "image": "gcr.io/my-project/my-image" } } - }, - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig" - }, - "schemaVersion": "v2alpha1", - "pipelineInfo": { - "name": "one-step-pipeline-with-if-placeholder" - }, - "runtimeParameters": { - "input2": { - "type": "STRING" - }, - "input0": { - "type": "STRING" - }, - "input1": { - "type": "STRING" } }, - "tasks": [ - { - "executorLabel": "Component with optional inputs", - "taskInfo": { - "name": "Component with optional inputs" - }, - "inputs": { + "components": { + "comp-component-with-optional-inputs": { + "inputDefinitions": { "parameters": { "optional_input_1": { - "runtimeValue": { - "runtimeParameter": "input1" - } + "type": "STRING" }, "required_input": { - "runtimeValue": { - "runtimeParameter": "input0" - } + "type": "STRING" + }, + "optional_input_2": { + "type": "STRING" } } - } + }, + "executorLabel": "exec-component-with-optional-inputs" } - ], - "sdkVersion": "kfp-1.1.0-dev20201106" + } }, "runtimeConfig": { "gcsOutputDirectory": "dummy_root" diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_ontology.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_ontology.json index 2463c995305..3e5b504afbe 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_ontology.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_ontology.json @@ -1,107 +1,77 @@ { "pipelineSpec": { - "schemaVersion": "v2alpha1", - "tasks": [ - { - "executorLabel": "Ingestion", - "inputs": { - "parameters": { - "input_location": { - "runtimeValue": { - "runtimeParameter": "input_location" + "root": { + "dag": { + "tasks": { + "task-trainer": { + "taskInfo": { + "name": "task-trainer" + }, + "inputs": { + "parameters": { + "n_epochs": { + "componentInputParameter": "n_epochs" + }, + "optimizer": { + "componentInputParameter": "optimizer" + } + }, + "artifacts": { + "examples": { + "taskOutputArtifact": { + "outputArtifactKey": "examples", + "producerTask": "task-ingestion" + } + } } + }, + "componentRef": { + "name": "comp-trainer" } - } - }, - "outputs": { - "artifacts": { - "examples": { - "artifactType": { - "instanceSchema": "properties:\ntitle: kfp.Dataset\ntype: object\n" + }, + "task-ingestion": { + "inputs": { + "parameters": { + "input_location": { + "componentInputParameter": "input_location" + } } + }, + "componentRef": { + "name": "comp-ingestion" + }, + "taskInfo": { + "name": "task-ingestion" } } - }, - "taskInfo": { - "name": "Ingestion" } }, - { - "outputs": { - "artifacts": { - "model": { - "artifactType": { - "instanceSchema": "properties:\ntitle: kfp.Model\ntype: object\n" - } + "inputDefinitions": { + "artifacts": { + "input_location": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" } } }, - "executorLabel": "Trainer", - "inputs": { - "parameters": { - "optimizer": { - "runtimeValue": { - "runtimeParameter": "optimizer" - } - }, - "n_epochs": { - "runtimeValue": { - "runtimeParameter": "n_epochs" - } - } + "parameters": { + "n_epochs": { + "type": "INT" }, - "artifacts": { - "examples": { - "producerTask": "Ingestion", - "outputArtifactKey": "examples" - } + "optimizer": { + "type": "STRING" } - }, - "taskInfo": { - "name": "Trainer" - } - } - ], - "runtimeParameters": { - "input_location": { - "type": "STRING", - "defaultValue": { - "stringValue": "gs://test-bucket/pipeline_root" - } - }, - "optimizer": { - "type": "STRING", - "defaultValue": { - "stringValue": "sgd" - } - }, - "n_epochs": { - "type": "INT", - "defaultValue": { - "intValue": "200" } } }, "pipelineInfo": { "name": "two-step-pipeline-with-ontology" }, - "sdkVersion": "kfp-1.1.0-dev20201106", - "deploymentConfig": { + "schemaVersion": "2.0.0", + "deploymentSpec": { "executors": { - "Ingestion": { - "container": { - "args": [ - "--input-location", - "{{$.inputs.parameters['input_location']}}", - "--output-examples", - "{{$.outputs.artifacts['examples'].uri}}" - ], - "image": "gcr.io/my-project/my-ingestor" - } - }, - "Trainer": { + "exec-trainer": { "container": { - "image": "gcr.io/my-project/my-fancy-trainer", "args": [ "--input-examples", "{{$.inputs.artifacts['examples'].uri}}", @@ -111,14 +81,87 @@ "{{$.inputs.parameters['n_epochs']}}", "--output-model", "{{$.outputs.artifacts['model'].uri}}" - ] + ], + "image": "gcr.io/my-project/my-fancy-trainer" + } + }, + "exec-ingestion": { + "container": { + "args": [ + "--input-location", + "{{$.inputs.parameters['input_location']}}", + "--output-examples", + "{{$.outputs.artifacts['examples'].uri}}" + ], + "image": "gcr.io/my-project/my-ingestor" + } + } + } + }, + "components": { + "comp-ingestion": { + "inputDefinitions": { + "parameters": { + "input_location": { + "type": "STRING" + } + } + }, + "executorLabel": "exec-ingestion", + "outputDefinitions": { + "artifacts": { + "examples": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Dataset\ntype: object\n" + } + } } } }, - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig" - } + "comp-trainer": { + "inputDefinitions": { + "parameters": { + "n_epochs": { + "type": "INT" + }, + "optimizer": { + "type": "STRING" + } + }, + "artifacts": { + "examples": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Dataset\ntype: object\n" + } + } + } + }, + "executorLabel": "exec-trainer", + "outputDefinitions": { + "artifacts": { + "model": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Model\ntype: object\n" + } + } + } + } + } + }, + "sdkVersion": "kfp-1.3.0" }, "runtimeConfig": { + "parameters": { + "optimizer": { + "stringValue": "sgd" + }, + "input_location": { + "stringValue": "gs://test-bucket/pipeline_root" + }, + "n_epochs": { + "stringValue": "200" + } + }, "gcsOutputDirectory": "dummy_root" } } \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_resource_spec.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_resource_spec.json index ac505b3d7f7..30dca3b3767 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_resource_spec.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_resource_spec.json @@ -1,20 +1,59 @@ { "pipelineSpec": { - "tasks": [ - { - "taskInfo": { - "name": "Ingestion" + "deploymentSpec": { + "executors": { + "exec-ingestion": { + "container": { + "args": [ + "--input-location", + "{{$.inputs.parameters['input_location']}}", + "--output-examples", + "{{$.outputs.artifacts['examples'].uri}}" + ], + "image": "gcr.io/my-project/my-ingestor" + } }, - "inputs": { + "exec-trainer": { + "container": { + "resources": { + "accelerator": { + "count": "1", + "type": "TPU_V3" + }, + "cpuLimit": 4.0, + "memoryLimit": 15.032385536 + }, + "image": "gcr.io/my-project/my-fancy-trainer", + "args": [ + "--input-examples", + "{{$.inputs.artifacts['examples'].uri}}", + "--optimizer", + "{{$.inputs.parameters['optimizer']}}", + "--n_epochs", + "{{$.inputs.parameters['n_epochs']}}", + "--output-model", + "{{$.outputs.artifacts['model'].uri}}" + ] + } + } + } + }, + "sdkVersion": "kfp-1.3.0", + "pipelineInfo": { + "name": "two-step-pipeline-with-resource-spec" + }, + "schemaVersion": "2.0.0", + "components": { + "comp-ingestion": { + "inputDefinitions": { "parameters": { "input_location": { - "runtimeValue": { - "runtimeParameter": "input_location" - } + "type": "STRING" } } }, - "outputs": { + "executorLabel": "exec-ingestion", + "outputDefinitions": { "artifacts": { "examples": { "artifactType": { @@ -22,11 +61,10 @@ } } } - }, - "executorLabel": "Ingestion" + } }, - { - "outputs": { + "comp-trainer": { + "outputDefinitions": { "artifacts": { "model": { "artifactType": { @@ -35,98 +73,103 @@ } } }, - "taskInfo": { - "name": "Trainer" - }, - "executorLabel": "Trainer", - "inputs": { + "executorLabel": "exec-trainer", + "inputDefinitions": { "artifacts": { "examples": { - "outputArtifactKey": "examples", - "producerTask": "Ingestion" + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Dataset\ntype: object\n" + } } }, "parameters": { + "n_epochs": { + "type": "INT" + }, "optimizer": { - "runtimeValue": { - "runtimeParameter": "optimizer" + "type": "STRING" + } + } + } + } + }, + "root": { + "dag": { + "tasks": { + "task-ingestion": { + "componentRef": { + "name": "comp-ingestion" + }, + "inputs": { + "parameters": { + "input_location": { + "componentInputParameter": "input_location" + } } }, - "n_epochs": { - "runtimeValue": { - "runtimeParameter": "n_epochs" + "taskInfo": { + "name": "task-ingestion" + } + }, + "task-trainer": { + "taskInfo": { + "name": "task-trainer" + }, + "componentRef": { + "name": "comp-trainer" + }, + "inputs": { + "artifacts": { + "examples": { + "taskOutputArtifact": { + "producerTask": "task-ingestion", + "outputArtifactKey": "examples" + } + } + }, + "parameters": { + "optimizer": { + "componentInputParameter": "optimizer" + }, + "n_epochs": { + "componentInputParameter": "n_epochs" + } } } } } - } - ], - "deploymentConfig": { - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig", - "executors": { - "Ingestion": { - "container": { - "image": "gcr.io/my-project/my-ingestor", - "args": [ - "--input-location", - "{{$.inputs.parameters['input_location']}}", - "--output-examples", - "{{$.outputs.artifacts['examples'].uri}}" - ] + }, + "inputDefinitions": { + "parameters": { + "n_epochs": { + "type": "INT" + }, + "optimizer": { + "type": "STRING" } }, - "Trainer": { - "container": { - "args": [ - "--input-examples", - "{{$.inputs.artifacts['examples'].uri}}", - "--optimizer", - "{{$.inputs.parameters['optimizer']}}", - "--n_epochs", - "{{$.inputs.parameters['n_epochs']}}", - "--output-model", - "{{$.outputs.artifacts['model'].uri}}" - ], - "resources": { - "accelerator": { - "count": "1", - "type": "TPU_V3" - }, - "memoryLimit": 15.032385536, - "cpuLimit": 4.0 - }, - "image": "gcr.io/my-project/my-fancy-trainer" + "artifacts": { + "input_location": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } } } } - }, - "runtimeParameters": { + } + }, + "runtimeConfig": { + "parameters": { + "input_location": { + "stringValue": "gs://test-bucket/pipeline_root" + }, "optimizer": { - "type": "STRING", - "defaultValue": { - "stringValue": "sgd" - } + "stringValue": "sgd" }, "n_epochs": { - "type": "INT", - "defaultValue": { - "intValue": "200" - } - }, - "input_location": { - "defaultValue": { - "stringValue": "gs://test-bucket/pipeline_root" - }, - "type": "STRING" + "stringValue": "200" } }, - "pipelineInfo": { - "name": "two-step-pipeline-with-resource-spec" - }, - "schemaVersion": "v2alpha1", - "sdkVersion": "kfp-1.1.0-dev20201106" - }, - "runtimeConfig": { "gcsOutputDirectory": "dummy_root" } } \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_reused_component.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_reused_component.json index aa74c3c0f00..3fcd1febd4a 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_reused_component.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_reused_component.json @@ -1,159 +1,143 @@ { "pipelineSpec": { - "tasks": [ - { - "inputs": { - "parameters": { - "op1": { - "runtimeValue": { - "runtimeParameter": "a" - } - }, - "op2": { - "runtimeValue": { - "constantValue": { - "intValue": "3" - } - } - } + "sdkVersion": "kfp-1.3.0", + "deploymentSpec": { + "executors": { + "exec-add": { + "container": { + "image": "google/cloud-sdk:latest", + "command": [ + "sh", + "-c", + "set -e -x\necho \"$(($0+$1))\" | gsutil cp - \"$2\"\n", + "{{$.inputs.parameters['op1']}}", + "{{$.inputs.parameters['op2']}}", + "{{$.outputs.parameters['sum'].output_file}}" + ] } - }, - "outputs": { - "parameters": { - "sum": { - "type": "INT" - } + } + } + }, + "root": { + "inputDefinitions": { + "parameters": { + "b": { + "type": "INT" + }, + "a": { + "type": "INT" } - }, - "taskInfo": { - "name": "Add" - }, - "executorLabel": "Add" + } }, - { - "executorLabel": "Add 2", - "outputs": { - "parameters": { - "sum": { - "type": "INT" + "dag": { + "tasks": { + "task-add-2": { + "taskInfo": { + "name": "task-add-2" + }, + "componentRef": { + "name": "comp-add" + }, + "inputs": { + "parameters": { + "op2": { + "componentInputParameter": "b" + }, + "op1": { + "taskOutputParameter": { + "outputParameterKey": "sum", + "producerTask": "task-add" + } + } + } } - } - }, - "taskInfo": { - "name": "Add 2" - }, - "inputs": { - "parameters": { - "op1": { - "taskOutputParameter": { - "outputParameterKey": "sum", - "producerTask": "Add" + }, + "task-add": { + "inputs": { + "parameters": { + "op1": { + "componentInputParameter": "a" + }, + "op2": { + "runtimeValue": { + "constantValue": { + "intValue": "3" + } + } + } } }, - "op2": { - "runtimeValue": { - "runtimeParameter": "b" + "taskInfo": { + "name": "task-add" + }, + "componentRef": { + "name": "comp-add" + } + }, + "task-add-3": { + "inputs": { + "parameters": { + "op2": { + "runtimeValue": { + "constantValue": { + "intValue": "7" + } + } + }, + "op1": { + "taskOutputParameter": { + "outputParameterKey": "sum", + "producerTask": "task-add-2" + } + } } + }, + "taskInfo": { + "name": "task-add-3" + }, + "componentRef": { + "name": "comp-add" } } } - }, - { - "outputs": { + } + }, + "pipelineInfo": { + "name": "add-pipeline" + }, + "schemaVersion": "2.0.0", + "components": { + "comp-add": { + "executorLabel": "exec-add", + "outputDefinitions": { "parameters": { "sum": { "type": "INT" } } }, - "executorLabel": "Add 3", - "inputs": { + "inputDefinitions": { "parameters": { "op1": { - "taskOutputParameter": { - "outputParameterKey": "sum", - "producerTask": "Add 2" - } + "type": "INT" }, "op2": { - "runtimeValue": { - "constantValue": { - "intValue": "7" - } - } + "type": "INT" } } - }, - "taskInfo": { - "name": "Add 3" } } - ], - "pipelineInfo": { - "name": "add-pipeline" - }, - "deploymentConfig": { - "executors": { - "Add": { - "container": { - "image": "google/cloud-sdk:latest", - "command": [ - "sh", - "-c", - "set -e -x\necho \"$(($0+$1))\" | gsutil cp - \"$2\"\n", - "{{$.inputs.parameters['op1']}}", - "{{$.inputs.parameters['op2']}}", - "{{$.outputs.parameters['sum'].output_file}}" - ] - } - }, - "Add 3": { - "container": { - "command": [ - "sh", - "-c", - "set -e -x\necho \"$(($0+$1))\" | gsutil cp - \"$2\"\n", - "{{$.inputs.parameters['op1']}}", - "{{$.inputs.parameters['op2']}}", - "{{$.outputs.parameters['sum'].output_file}}" - ], - "image": "google/cloud-sdk:latest" - } - }, - "Add 2": { - "container": { - "image": "google/cloud-sdk:latest", - "command": [ - "sh", - "-c", - "set -e -x\necho \"$(($0+$1))\" | gsutil cp - \"$2\"\n", - "{{$.inputs.parameters['op1']}}", - "{{$.inputs.parameters['op2']}}", - "{{$.outputs.parameters['sum'].output_file}}" - ] - } - } - }, - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig" - }, - "runtimeParameters": { + } + }, + "runtimeConfig": { + "parameters": { "b": { - "defaultValue": { - "intValue": "5" - }, - "type": "INT" + "stringValue": "5" }, "a": { - "defaultValue": { - "intValue": "2" - }, - "type": "INT" + "stringValue": "2" } }, - "schemaVersion": "v2alpha1", - "sdkVersion": "kfp-1.1.1" - }, - "runtimeConfig": { "gcsOutputDirectory": "dummy_root" } -} +} \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_various_io_types.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_various_io_types.json index 32692ce9093..d8fddf9e23e 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_various_io_types.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_various_io_types.json @@ -1,138 +1,219 @@ { "pipelineSpec": { - "runtimeParameters": { - "input8": { - "type": "STRING", - "defaultValue": { - "stringValue": "gs://path2" - } - }, - "input3": { - "type": "STRING" - }, - "input7": { - "type": "STRING", - "defaultValue": { - "stringValue": "arbitrary value" - } - }, - "input5": { - "type": "STRING", - "defaultValue": { - "stringValue": "gs://bucket/metrics" + "root": { + "inputDefinitions": { + "artifacts": { + "input1": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input7": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input5": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input6": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input4": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input3": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input8": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + } } }, - "input4": { - "type": "STRING" - }, - "input6": { - "defaultValue": { - "stringValue": "gs://bucket/dataset" - }, - "type": "STRING" - }, - "input1": { - "type": "STRING" - } - }, - "schemaVersion": "v2alpha1", - "tasks": [ - { - "taskInfo": { - "name": "upstream" - }, - "executorLabel": "upstream", - "outputs": { - "parameters": { - "output_1": { - "type": "INT" + "dag": { + "tasks": { + "task-importer-task-upstream-input-3": { + "taskInfo": { + "name": "task-importer-task-upstream-input-3" + }, + "componentRef": { + "name": "comp-importer-task-upstream-input-3" } }, - "artifacts": { - "output_3": { - "artifactType": { - "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" - } + "task-importer-task-upstream-input-7": { + "taskInfo": { + "name": "task-importer-task-upstream-input-7" }, - "output_2": { - "artifactType": { - "instanceSchema": "properties:\ntitle: kfp.Model\ntype: object\n" - } + "componentRef": { + "name": "comp-importer-task-upstream-input-7" } - } - }, - "inputs": { - "parameters": { - "input_1": { - "runtimeValue": { - "runtimeParameter": "input1" - } + }, + "task-importer-task-upstream-input-4": { + "taskInfo": { + "name": "task-importer-task-upstream-input-4" }, - "input_2": { - "runtimeValue": { - "constantValue": { - "doubleValue": 3.1415926 + "componentRef": { + "name": "comp-importer-task-upstream-input-4" + } + }, + "task-importer-task-upstream-input-8": { + "taskInfo": { + "name": "task-importer-task-upstream-input-8" + }, + "componentRef": { + "name": "comp-importer-task-upstream-input-8" + } + }, + "task-upstream": { + "componentRef": { + "name": "comp-upstream" + }, + "taskInfo": { + "name": "task-upstream" + }, + "inputs": { + "parameters": { + "input_1": { + "componentInputParameter": "input1" + }, + "input_2": { + "runtimeValue": { + "constantValue": { + "doubleValue": 3.1415926 + } + } + } + }, + "artifacts": { + "input_7": { + "taskOutputArtifact": { + "producerTask": "task-importer-task-upstream-input-7", + "outputArtifactKey": "result" + } + }, + "input_6": { + "taskOutputArtifact": { + "outputArtifactKey": "result", + "producerTask": "task-importer-task-upstream-input-6" + } + }, + "input_4": { + "taskOutputArtifact": { + "producerTask": "task-importer-task-upstream-input-4", + "outputArtifactKey": "result" + } + }, + "input_3": { + "taskOutputArtifact": { + "producerTask": "task-importer-task-upstream-input-3", + "outputArtifactKey": "result" + } + }, + "input_5": { + "taskOutputArtifact": { + "outputArtifactKey": "result", + "producerTask": "task-importer-task-upstream-input-5" + } + }, + "input_8": { + "taskOutputArtifact": { + "producerTask": "task-importer-task-upstream-input-8", + "outputArtifactKey": "result" + } } } } }, - "artifacts": { - "input_3": { - "outputArtifactKey": "result", - "producerTask": "upstream_input_3_importer" + "task-importer-task-upstream-input-5": { + "componentRef": { + "name": "comp-importer-task-upstream-input-5" }, - "input_4": { - "producerTask": "upstream_input_4_importer", - "outputArtifactKey": "result" + "taskInfo": { + "name": "task-importer-task-upstream-input-5" + } + }, + "task-downstream": { + "taskInfo": { + "name": "task-downstream" }, - "input_8": { - "producerTask": "upstream_input_8_importer", - "outputArtifactKey": "result" + "componentRef": { + "name": "comp-downstream" }, - "input_7": { - "producerTask": "upstream_input_7_importer", - "outputArtifactKey": "result" - }, - "input_5": { - "producerTask": "upstream_input_5_importer", - "outputArtifactKey": "result" + "inputs": { + "parameters": { + "input_a": { + "taskOutputParameter": { + "outputParameterKey": "output_1", + "producerTask": "task-upstream" + } + } + }, + "artifacts": { + "input_b": { + "taskOutputArtifact": { + "outputArtifactKey": "output_2", + "producerTask": "task-upstream" + } + }, + "input_c": { + "taskOutputArtifact": { + "outputArtifactKey": "output_3", + "producerTask": "task-upstream" + } + } + } + } + }, + "task-importer-task-upstream-input-6": { + "componentRef": { + "name": "comp-importer-task-upstream-input-6" }, - "input_6": { - "producerTask": "upstream_input_6_importer", - "outputArtifactKey": "result" + "taskInfo": { + "name": "task-importer-task-upstream-input-6" } } } - }, - { - "executorLabel": "downstream", - "taskInfo": { - "name": "downstream" - }, - "inputs": { + } + }, + "components": { + "comp-importer-task-upstream-input-8": { + "inputDefinitions": { "parameters": { - "input_a": { - "taskOutputParameter": { - "outputParameterKey": "output_1", - "producerTask": "upstream" - } + "input_8": { + "type": "STRING" } - }, + } + }, + "outputDefinitions": { "artifacts": { - "input_b": { - "producerTask": "upstream", - "outputArtifactKey": "output_2" - }, - "input_c": { - "producerTask": "upstream", - "outputArtifactKey": "output_3" + "result": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } } } - } + }, + "executorLabel": "exec-importer-task-upstream-input-8" }, - { - "executorLabel": "upstream_input_8_importer", - "outputs": { + "comp-importer-task-upstream-input-5": { + "inputDefinitions": { + "parameters": { + "input_5": { + "type": "STRING" + } + } + }, + "outputDefinitions": { "artifacts": { "result": { "artifactType": { @@ -141,16 +222,11 @@ } } }, - "taskInfo": { - "name": "upstream_input_8_importer" - } + "executorLabel": "exec-importer-task-upstream-input-5" }, - { - "taskInfo": { - "name": "upstream_input_7_importer" - }, - "executorLabel": "upstream_input_7_importer", - "outputs": { + "comp-importer-task-upstream-input-4": { + "executorLabel": "exec-importer-task-upstream-input-4", + "outputDefinitions": { "artifacts": { "result": { "artifactType": { @@ -158,13 +234,46 @@ } } } + }, + "inputDefinitions": { + "parameters": { + "input_4": { + "type": "STRING" + } + } } }, - { - "taskInfo": { - "name": "upstream_input_3_importer" + "comp-downstream": { + "inputDefinitions": { + "parameters": { + "input_a": { + "type": "INT" + } + }, + "artifacts": { + "input_c": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input_b": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Model\ntype: object\n" + } + } + } + }, + "executorLabel": "exec-downstream" + }, + "comp-importer-task-upstream-input-3": { + "inputDefinitions": { + "parameters": { + "input_3": { + "type": "STRING" + } + } }, - "outputs": { + "outputDefinitions": { "artifacts": { "result": { "artifactType": { @@ -173,14 +282,18 @@ } } }, - "executorLabel": "upstream_input_3_importer" + "executorLabel": "exec-importer-task-upstream-input-3" }, - { - "executorLabel": "upstream_input_4_importer", - "taskInfo": { - "name": "upstream_input_4_importer" + "comp-importer-task-upstream-input-7": { + "inputDefinitions": { + "parameters": { + "input_7": { + "type": "STRING" + } + } }, - "outputs": { + "executorLabel": "exec-importer-task-upstream-input-7", + "outputDefinitions": { "artifacts": { "result": { "artifactType": { @@ -190,14 +303,63 @@ } } }, - { - "taskInfo": { - "name": "upstream_input_5_importer" + "comp-upstream": { + "executorLabel": "exec-upstream", + "inputDefinitions": { + "artifacts": { + "input_4": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input_6": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input_3": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input_8": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input_7": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + }, + "input_5": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + } + }, + "parameters": { + "input_1": { + "type": "STRING" + }, + "input_2": { + "type": "DOUBLE" + } + } }, - "executorLabel": "upstream_input_5_importer", - "outputs": { + "outputDefinitions": { + "parameters": { + "output_1": { + "type": "INT" + } + }, "artifacts": { - "result": { + "output_2": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Model\ntype: object\n" + } + }, + "output_3": { "artifactType": { "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" } @@ -205,12 +367,16 @@ } } }, - { - "taskInfo": { - "name": "upstream_input_6_importer" + "comp-importer-task-upstream-input-6": { + "executorLabel": "exec-importer-task-upstream-input-6", + "inputDefinitions": { + "parameters": { + "input_6": { + "type": "STRING" + } + } }, - "executorLabel": "upstream_input_6_importer", - "outputs": { + "outputDefinitions": { "artifacts": { "result": { "artifactType": { @@ -220,15 +386,14 @@ } } } - ], - "sdkVersion": "kfp-1.1.0-alpha.1", + }, "pipelineInfo": { "name": "pipeline-with-various-types" }, - "deploymentConfig": { - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig", + "schemaVersion": "2.0.0", + "deploymentSpec": { "executors": { - "upstream_input_3_importer": { + "exec-importer-task-upstream-input-3": { "importer": { "artifactUri": { "runtimeParameter": "input3" @@ -238,47 +403,17 @@ } } }, - "upstream_input_4_importer": { - "importer": { - "artifactUri": { - "runtimeParameter": "input4" - }, - "typeSchema": { - "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" - } - } - }, - "downstream": { + "exec-downstream": { "container": { - "image": "gcr.io/image", "args": [ "{{$.inputs.parameters['input_a']}}", "{{$.inputs.artifacts['input_b'].uri}}", "{{$.inputs.artifacts['input_c'].path}}" - ] + ], + "image": "gcr.io/image" } }, - "upstream_input_6_importer": { - "importer": { - "artifactUri": { - "runtimeParameter": "input6" - }, - "typeSchema": { - "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" - } - } - }, - "upstream_input_8_importer": { - "importer": { - "typeSchema": { - "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" - }, - "artifactUri": { - "runtimeParameter": "input8" - } - } - }, - "upstream_input_5_importer": { + "exec-importer-task-upstream-input-5": { "importer": { "typeSchema": { "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" @@ -290,19 +425,18 @@ } } }, - "upstream_input_7_importer": { + "exec-importer-task-upstream-input-4": { "importer": { + "artifactUri": { + "runtimeParameter": "input4" + }, "typeSchema": { "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" - }, - "artifactUri": { - "runtimeParameter": "input7" } } }, - "upstream": { + "exec-upstream": { "container": { - "image": "gcr.io/image", "args": [ "{{$.inputs.parameters['input_1']}}", "{{$.inputs.parameters['input_2']}}", @@ -315,13 +449,59 @@ "{{$.outputs.parameters['output_1'].output_file}}", "{{$.outputs.artifacts['output_2'].uri}}", "{{$.outputs.artifacts['output_3'].path}}" - ] + ], + "image": "gcr.io/image" + } + }, + "exec-importer-task-upstream-input-6": { + "importer": { + "typeSchema": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + }, + "artifactUri": { + "runtimeParameter": "input6" + } + } + }, + "exec-importer-task-upstream-input-8": { + "importer": { + "artifactUri": { + "runtimeParameter": "input8" + }, + "typeSchema": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + } + }, + "exec-importer-task-upstream-input-7": { + "importer": { + "typeSchema": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + }, + "artifactUri": { + "runtimeParameter": "input7" + } } } } - } + }, + "sdkVersion": "kfp-1.3.0" }, "runtimeConfig": { + "parameters": { + "input5": { + "stringValue": "gs://bucket/metrics" + }, + "input8": { + "stringValue": "gs://path2" + }, + "input6": { + "stringValue": "gs://bucket/dataset" + }, + "input7": { + "stringValue": "arbitrary value" + } + }, "gcsOutputDirectory": "dummy_root" } } \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/simple_pipeline_without_importer.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/simple_pipeline_without_importer.json index 932963fe669..d3726fa069c 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/simple_pipeline_without_importer.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/simple_pipeline_without_importer.json @@ -3,50 +3,59 @@ "pipelineInfo": { "name": "simple-two-step-pipeline" }, - "tasks": [ - { - "inputs": { - "parameters": { - "text": { - "runtimeValue": { - "runtimeParameter": "text" + "schemaVersion": "2.0.0", + "root": { + "dag": { + "tasks": { + "task-write-to-gcs": { + "taskInfo": { + "name": "task-write-to-gcs" + }, + "inputs": { + "parameters": { + "text": { + "componentInputParameter": "text" + } } + }, + "componentRef": { + "name": "comp-write-to-gcs" } - } - }, - "taskInfo": { - "name": "Write to GCS" - }, - "executorLabel": "Write to GCS", - "outputs": { - "artifacts": { - "output_gcs_path": { - "artifactType": { - "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + }, + "task-read-from-gcs": { + "inputs": { + "artifacts": { + "input_gcs_path": { + "taskOutputArtifact": { + "outputArtifactKey": "output_gcs_path", + "producerTask": "task-write-to-gcs" + } + } } + }, + "componentRef": { + "name": "comp-read-from-gcs" + }, + "taskInfo": { + "name": "task-read-from-gcs" } } } }, - { - "taskInfo": { - "name": "Read from GCS" - }, - "executorLabel": "Read from GCS", - "inputs": { - "artifacts": { - "input_gcs_path": { - "producerTask": "Write to GCS", - "outputArtifactKey": "output_gcs_path" + "inputDefinitions": { + "artifacts": { + "text": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" } } } } - ], - "deploymentConfig": { - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig", + }, + "sdkVersion": "kfp-1.3.0", + "deploymentSpec": { "executors": { - "Read from GCS": { + "exec-read-from-gcs": { "container": { "command": [ "sh", @@ -57,7 +66,7 @@ "image": "google/cloud-sdk:slim" } }, - "Write to GCS": { + "exec-write-to-gcs": { "container": { "image": "google/cloud-sdk:slim", "command": [ @@ -71,23 +80,46 @@ } } }, - "sdkVersion": "kfp-1.1.0-dev20201106", - "runtimeParameters": { - "text": { - "defaultValue": { - "stringValue": "Hello world!" + "components": { + "comp-read-from-gcs": { + "executorLabel": "exec-read-from-gcs", + "inputDefinitions": { + "artifacts": { + "input_gcs_path": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + } + } + } + }, + "comp-write-to-gcs": { + "inputDefinitions": { + "parameters": { + "text": { + "type": "STRING" + } + } }, - "type": "STRING" + "outputDefinitions": { + "artifacts": { + "output_gcs_path": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + } + } + }, + "executorLabel": "exec-write-to-gcs" } - }, - "schemaVersion": "v2alpha1" + } }, "runtimeConfig": { - "gcsOutputDirectory": "dummy_root", "parameters": { "text": { "stringValue": "Hello KFP!" } - } - } -} + }, + "gcsOutputDirectory": "dummy_root" + } +} \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/two_step_pipeline_with_importer.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/two_step_pipeline_with_importer.json index d916e9b33bc..e51f0e6d99d 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/two_step_pipeline_with_importer.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/two_step_pipeline_with_importer.json @@ -1,122 +1,116 @@ { "pipelineSpec": { - "sdkVersion": "kfp-1.1.0-dev20201106", - "tasks": [ - { - "outputs": { - "artifacts": { - "model_output": { - "artifactType": { - "instanceSchema": "properties:\ntitle: kfp.Model\ntype: object\n" + "sdkVersion": "kfp-1.3.0", + "root": { + "dag": { + "tasks": { + "task-trainer": { + "componentRef": { + "name": "comp-trainer" + }, + "taskInfo": { + "name": "task-trainer" + }, + "inputs": { + "parameters": { + "num_epochs": { + "componentInputParameter": "epochs" + }, + "train_optimizer": { + "componentInputParameter": "optimizer" + } + }, + "artifacts": { + "input_location": { + "taskOutputArtifact": { + "producerTask": "task-importer-task-trainer-input-location", + "outputArtifactKey": "result" + } + } } } }, - "parameters": { - "model_config": { - "type": "STRING" - } - } - }, - "taskInfo": { - "name": "Trainer" - }, - "inputs": { - "artifacts": { - "input_location": { - "producerTask": "Trainer_input_location_importer", - "outputArtifactKey": "result" - } - }, - "parameters": { - "train_optimizer": { - "runtimeValue": { - "runtimeParameter": "optimizer" - } + "task-serving": { + "taskInfo": { + "name": "task-serving" }, - "num_epochs": { - "runtimeValue": { - "runtimeParameter": "epochs" - } - } - } - }, - "executorLabel": "Trainer" - }, - { - "taskInfo": { - "name": "Serving" - }, - "executorLabel": "Serving", - "inputs": { - "parameters": { - "model_cfg": { - "taskOutputParameter": { - "producerTask": "Trainer", - "outputParameterKey": "model_config" + "componentRef": { + "name": "comp-serving" + }, + "inputs": { + "parameters": { + "model_cfg": { + "taskOutputParameter": { + "producerTask": "task-trainer", + "outputParameterKey": "model_config" + } + } + }, + "artifacts": { + "model": { + "taskOutputArtifact": { + "producerTask": "task-trainer", + "outputArtifactKey": "model_output" + } + } } } }, - "artifacts": { - "model": { - "outputArtifactKey": "model_output", - "producerTask": "Trainer" + "task-importer-task-trainer-input-location": { + "taskInfo": { + "name": "task-importer-task-trainer-input-location" + }, + "componentRef": { + "name": "comp-importer-task-trainer-input-location" } } } }, - { - "taskInfo": { - "name": "Trainer_input_location_importer" - }, - "outputs": { - "artifacts": { - "result": { - "artifactType": { - "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" - } + "inputDefinitions": { + "artifacts": { + "input_gcs": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" } } }, - "executorLabel": "Trainer_input_location_importer" - } - ], - "runtimeParameters": { - "epochs": { - "type": "INT", - "defaultValue": { - "intValue": "200" - } - }, - "optimizer": { - "type": "STRING", - "defaultValue": { - "stringValue": "sgd" - } - }, - "input_gcs": { - "type": "STRING", - "defaultValue": { - "stringValue": "gs://test-bucket/pipeline_root" + "parameters": { + "epochs": { + "type": "INT" + }, + "optimizer": { + "type": "STRING" + } } } }, "pipelineInfo": { "name": "two-step-pipeline-with-importer" }, - "deploymentConfig": { - "@type": "type.googleapis.com/ml_pipelines.PipelineDeploymentConfig", + "deploymentSpec": { "executors": { - "Trainer_input_location_importer": { + "exec-importer-task-trainer-input-location": { "importer": { - "typeSchema": { - "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" - }, "artifactUri": { "runtimeParameter": "input_gcs" + }, + "typeSchema": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" } } }, - "Trainer": { + "exec-serving": { + "container": { + "args": [ + "--model-to-serve", + "{{$.inputs.artifacts['model'].uri}}", + "--model-config", + "{{$.inputs.parameters['model_cfg']}}" + ], + "image": "gcr.io/my-project/my-server" + } + }, + "exec-trainer": { "container": { "image": "gcr.io/my-project/my-training", "args": [ @@ -132,23 +126,95 @@ "{{$.outputs.parameters['model_config'].output_file}}" ] } + } + } + }, + "components": { + "comp-serving": { + "inputDefinitions": { + "parameters": { + "model_cfg": { + "type": "STRING" + } + }, + "artifacts": { + "model": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Model\ntype: object\n" + } + } + } }, - "Serving": { - "container": { - "image": "gcr.io/my-project/my-server", - "args": [ - "--model-to-serve", - "{{$.inputs.artifacts['model'].uri}}", - "--model-config", - "{{$.inputs.parameters['model_cfg']}}" - ] + "executorLabel": "exec-serving" + }, + "comp-trainer": { + "executorLabel": "exec-trainer", + "outputDefinitions": { + "artifacts": { + "model_output": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Model\ntype: object\n" + } + } + }, + "parameters": { + "model_config": { + "type": "STRING" + } + } + }, + "inputDefinitions": { + "parameters": { + "num_epochs": { + "type": "INT" + }, + "train_optimizer": { + "type": "STRING" + } + }, + "artifacts": { + "input_location": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + } + } + } + }, + "comp-importer-task-trainer-input-location": { + "executorLabel": "exec-importer-task-trainer-input-location", + "inputDefinitions": { + "parameters": { + "input_location": { + "type": "STRING" + } + } + }, + "outputDefinitions": { + "artifacts": { + "result": { + "artifactType": { + "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n" + } + } } } } }, - "schemaVersion": "v2alpha1" + "schemaVersion": "2.0.0" }, "runtimeConfig": { + "parameters": { + "epochs": { + "stringValue": "200" + }, + "optimizer": { + "stringValue": "sgd" + }, + "input_gcs": { + "stringValue": "gs://test-bucket/pipeline_root" + } + }, "gcsOutputDirectory": "dummy_root" } } \ No newline at end of file diff --git a/sdk/python/kfp/v2/dsl/component_bridge.py b/sdk/python/kfp/v2/dsl/component_bridge.py index e32d4246f03..608594a2958 100644 --- a/sdk/python/kfp/v2/dsl/component_bridge.py +++ b/sdk/python/kfp/v2/dsl/component_bridge.py @@ -17,13 +17,15 @@ from typing import Any, Mapping, Optional from kfp import dsl -from kfp.components._naming import _sanitize_python_function_name -from kfp.components._naming import generate_unique_name_conversion_table -from kfp.dsl import types from kfp.components import _structures as structures from kfp.components._components import _default_component_name from kfp.components._components import _resolve_command_line_and_paths +from kfp.components._naming import _sanitize_python_function_name +from kfp.components._naming import generate_unique_name_conversion_table +from kfp.dsl import types +from kfp.v2.dsl import component_spec as dsl_component_spec from kfp.v2.dsl import container_op +from kfp.v2.dsl import dsl_utils from kfp.v2.dsl import importer_node from kfp.v2.dsl import type_utils from kfp.pipeline_spec import pipeline_spec_pb2 @@ -49,7 +51,7 @@ def create_container_op_from_component_and_arguments( pipeline_task_spec = pipeline_spec_pb2.PipelineTaskSpec() # Keep track of auto-injected importer spec. - importer_spec = {} + importer_specs = {} # Check types of the reference arguments and serialize PipelineParams arguments = arguments.copy() @@ -68,27 +70,29 @@ def create_container_op_from_component_and_arguments( if argument_value.op_name: pipeline_task_spec.inputs.parameters[ input_name].task_output_parameter.producer_task = ( - argument_value.op_name) + dsl_utils.sanitize_task_name(argument_value.op_name)) pipeline_task_spec.inputs.parameters[ input_name].task_output_parameter.output_parameter_key = ( argument_value.name) else: pipeline_task_spec.inputs.parameters[ - input_name].runtime_value.runtime_parameter = argument_value.name + input_name].component_input_parameter = argument_value.name else: if argument_value.op_name: - pipeline_task_spec.inputs.artifacts[input_name].producer_task = ( - argument_value.op_name) pipeline_task_spec.inputs.artifacts[ - input_name].output_artifact_key = ( + input_name].task_output_artifact.producer_task = ( + dsl_utils.sanitize_task_name(argument_value.op_name)) + pipeline_task_spec.inputs.artifacts[ + input_name].task_output_artifact.output_artifact_key = ( argument_value.name) else: # argument_value.op_name could be none, in which case an importer node # will be inserted later. - pipeline_task_spec.inputs.artifacts[input_name].producer_task = '' + pipeline_task_spec.inputs.artifacts[ + input_name].task_output_artifact.producer_task = '' type_schema = type_utils.get_input_artifact_type_schema( input_name, component_spec.inputs) - importer_spec[input_name] = importer_node.build_importer_spec( + importer_specs[input_name] = importer_node.build_importer_spec( input_type_schema=type_schema, pipeline_param_name=argument_value.name) elif isinstance(argument_value, str): @@ -99,10 +103,11 @@ def create_container_op_from_component_and_arguments( argument_value) else: # An importer node with constant value artifact_uri will be inserted. - pipeline_task_spec.inputs.artifacts[input_name].producer_task = '' + pipeline_task_spec.inputs.artifacts[ + input_name].task_output_artifact.producer_task = '' type_schema = type_utils.get_input_artifact_type_schema( input_name, component_spec.inputs) - importer_spec[input_name] = importer_node.build_importer_spec( + importer_specs[input_name] = importer_node.build_importer_spec( input_type_schema=type_schema, constant_value=argument_value) elif isinstance(argument_value, int): pipeline_task_spec.inputs.parameters[ @@ -119,15 +124,6 @@ def create_container_op_from_component_and_arguments( 'Input argument supports only the following types: PipelineParam' ', str, int, float. Got: "{}".'.format(argument_value)) - for output in component_spec.outputs or []: - if type_utils.is_parameter_type(output.type): - pipeline_task_spec.outputs.parameters[ - output.name].type = type_utils.get_parameter_type(output.type) - else: - pipeline_task_spec.outputs.artifacts[ - output.name].artifact_type.instance_schema = ( - type_utils.get_artifact_type_schema(output.type)) - inputs_dict = { input_spec.name: input_spec for input_spec in component_spec.inputs or [] } @@ -149,15 +145,15 @@ def _input_artifact_path_placeholder(input_key: str) -> str: raise TypeError( 'Input "{}" with type "{}" cannot be paired with InputPathPlaceholder.' .format(input_key, inputs_dict[input_key].type)) - elif input_key in importer_spec: + elif input_key in importer_specs: raise TypeError( 'Input "{}" with type "{}" is not connected to any upstream output. ' 'However it is used with InputPathPlaceholder. ' 'If you want to import an existing artifact using a system-connected ' 'importer node, use InputUriPlaceholder instead. ' 'Or if you just want to pass a string parameter, use string type and ' - 'InputValuePlaceholder instead.' - .format(input_key, inputs_dict[input_key].type)) + 'InputValuePlaceholder instead.'.format(input_key, + inputs_dict[input_key].type)) else: return "{{{{$.inputs.artifacts['{}'].path}}}}".format(input_key) @@ -201,12 +197,6 @@ def _resolve_output_path_placeholder(output_key: str) -> str: container_spec = component_spec.implementation.container - pipeline_container_spec = ( - pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec()) - pipeline_container_spec.image = container_spec.image - pipeline_container_spec.command.extend(resolved_cmd.command) - pipeline_container_spec.args.extend(resolved_cmd.args) - output_uris_and_paths = resolved_cmd.output_uris.copy() output_uris_and_paths.update(resolved_cmd.output_paths) input_uris_and_paths = resolved_cmd.input_uris.copy() @@ -230,12 +220,20 @@ def _resolve_output_path_placeholder(output_key: str) -> str: ) # task.name is unique at this point. - pipeline_task_spec.task_info.name = task.name - pipeline_task_spec.executor_label = task.name + pipeline_task_spec.task_info.name = (dsl_utils.sanitize_task_name(task.name)) + pipeline_task_spec.component_ref.name = ( + dsl_utils.sanitize_component_name(component_spec.name)) task.task_spec = pipeline_task_spec - task.importer_spec = importer_spec - task.container_spec = pipeline_container_spec + task.importer_specs = importer_specs + task.component_spec = dsl_component_spec.build_component_spec_from_structure( + component_spec) + task.container_spec = ( + pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec( + image=container_spec.image, + command=resolved_cmd.command, + args=resolved_cmd.args)) + dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = old_warn_value component_meta = copy.copy(component_spec) diff --git a/sdk/python/kfp/v2/dsl/component_spec.py b/sdk/python/kfp/v2/dsl/component_spec.py new file mode 100644 index 00000000000..a3c820158ef --- /dev/null +++ b/sdk/python/kfp/v2/dsl/component_spec.py @@ -0,0 +1,86 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Functions for creating IR ComponentSpec instance.""" + +from typing import List + +from kfp.components import _structures as structures +from kfp.v2 import dsl +from kfp.v2.dsl import dsl_utils +from kfp.v2.dsl import type_utils +from kfp.pipeline_spec import pipeline_spec_pb2 + + +def build_component_spec_from_structure( + component_spec: structures.ComponentSpec, +) -> pipeline_spec_pb2.ComponentSpec: + """Builds an IR ComponentSpec instance from structures.ComponentSpec. + + Args: + component_spec: The structure component spec. + + Returns: + An instance of IR ComponentSpec. + """ + result = pipeline_spec_pb2.ComponentSpec() + result.executor_label = dsl_utils.sanitize_executor_label(component_spec.name) + + for input_spec in component_spec.inputs or []: + if type_utils.is_parameter_type(input_spec.type): + result.input_definitions.parameters[ + input_spec.name].type = type_utils.get_parameter_type(input_spec.type) + else: + result.input_definitions.artifacts[ + input_spec.name].artifact_type.instance_schema = ( + type_utils.get_artifact_type_schema(input_spec.type)) + + for output_spec in component_spec.outputs or []: + if type_utils.is_parameter_type(output_spec.type): + result.output_definitions.parameters[ + output_spec.name].type = type_utils.get_parameter_type( + output_spec.type) + else: + result.output_definitions.artifacts[ + output_spec.name].artifact_type.instance_schema = ( + type_utils.get_artifact_type_schema(output_spec.type)) + + return result + + +def build_root_spec_from_pipeline_params( + pipeline_params: List[dsl.PipelineParam], +) -> pipeline_spec_pb2.ComponentSpec: + """Builds the root component spec instance from pipeline params. + + This is useful when building the component spec for a pipeline (aka piipeline + root). Such a component spec doesn't need output_definitions, and its + implementation field will be filled in later. + + Args: + pipeline_params: The list of pipeline params. + + Returns: + An instance of IR ComponentSpec. + """ + result = pipeline_spec_pb2.ComponentSpec() + for param in pipeline_params or []: + if type_utils.is_parameter_type(param.param_type): + result.input_definitions.parameters[ + param.name].type = type_utils.get_parameter_type(param.param_type) + else: + result.input_definitions.artifacts[ + param.name].artifact_type.instance_schema = ( + type_utils.get_artifact_type_schema(param.param_type)) + + return result diff --git a/sdk/python/kfp/v2/dsl/component_spec_test.py b/sdk/python/kfp/v2/dsl/component_spec_test.py new file mode 100644 index 00000000000..17828767a7c --- /dev/null +++ b/sdk/python/kfp/v2/dsl/component_spec_test.py @@ -0,0 +1,125 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for kfp.v2.dsl.component_spec.""" + +import unittest + +from kfp.components import _structures as structures +from kfp.v2 import dsl +from kfp.v2.dsl import component_spec as dsl_component_spec +from kfp.pipeline_spec import pipeline_spec_pb2 + +from google.protobuf import json_format + + +class ComponentSpecTest(unittest.TestCase): + + def test_build_component_spec_from_structure(self): + structure_component_spec = structures.ComponentSpec( + name='component1', + description='component1 desc', + inputs=[ + structures.InputSpec( + name='input1', description='input1 desc', type='Dataset'), + structures.InputSpec( + name='input2', description='input2 desc', type='String'), + structures.InputSpec( + name='input3', description='input3 desc', type='Integer'), + ], + outputs=[ + structures.OutputSpec( + name='output1', description='output1 desc', type='Model') + ]) + expected_dict = { + 'inputDefinitions': { + 'artifacts': { + 'input1': { + 'artifactType': { + 'instanceSchema': + 'properties:\ntitle: kfp.Dataset\ntype: object\n' + } + } + }, + 'parameters': { + 'input2': { + 'type': 'STRING' + }, + 'input3': { + 'type': 'INT' + } + } + }, + 'outputDefinitions': { + 'artifacts': { + 'output1': { + 'artifactType': { + 'instanceSchema': + 'properties:\ntitle: kfp.Model\ntype: object\n' + } + } + } + }, + 'executorLabel': 'exec-component1' + } + expected_spec = pipeline_spec_pb2.ComponentSpec() + json_format.ParseDict(expected_dict, expected_spec) + + component_spec = ( + dsl_component_spec.build_component_spec_from_structure( + structure_component_spec)) + + self.assertEqual(expected_spec, component_spec) + + def test_build_root_spec_from_pipeline_params(self): + pipeline_params = [ + dsl.PipelineParam(name='input1', param_type='Dataset'), + dsl.PipelineParam(name='input2', param_type='Integer'), + dsl.PipelineParam(name='input3', param_type='String'), + dsl.PipelineParam(name='input4', param_type='Float'), + ] + expected_dict = { + 'inputDefinitions': { + 'artifacts': { + 'input1': { + 'artifactType': { + 'instanceSchema': + 'properties:\ntitle: kfp.Dataset\ntype: object\n' + } + } + }, + 'parameters': { + 'input2': { + 'type': 'INT' + }, + 'input3': { + 'type': 'STRING' + }, + 'input4': { + 'type': 'DOUBLE' + } + } + } + } + expected_spec = pipeline_spec_pb2.ComponentSpec() + json_format.ParseDict(expected_dict, expected_spec) + + component_spec = ( + dsl_component_spec.build_root_spec_from_pipeline_params( + pipeline_params)) + + self.assertEqual(expected_spec, component_spec) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/v2/dsl/dsl_utils.py b/sdk/python/kfp/v2/dsl/dsl_utils.py new file mode 100644 index 00000000000..6575d85072c --- /dev/null +++ b/sdk/python/kfp/v2/dsl/dsl_utils.py @@ -0,0 +1,41 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Utilities functions KFP DSL.""" + +import re + + +def sanitize_component_name(name: str): + """Sanitizes component name.""" + return 'comp-{}'.format(_sanitize_name(name)) + + +def sanitize_task_name(name: str): + """Sanitizes task name.""" + return 'task-{}'.format(_sanitize_name(name)) + + +def sanitize_executor_label(label: str): + """Sanitizes executor label.""" + return 'exec-{}'.format(_sanitize_name(label)) + + +def _sanitize_name(name: str): + """Sanitizes name to comply with IR naming convention. + + The sanitized name contains only lower case alphanumeric characters and + dashes. + """ + return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', + name.lower())).lstrip('-').rstrip('-') diff --git a/sdk/python/kfp/v2/dsl/dsl_utils_test.py b/sdk/python/kfp/v2/dsl/dsl_utils_test.py new file mode 100644 index 00000000000..12c4e314540 --- /dev/null +++ b/sdk/python/kfp/v2/dsl/dsl_utils_test.py @@ -0,0 +1,37 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for kfp.v2.dsl.dsl_utils.""" + +import unittest + +from kfp.v2.dsl import dsl_utils + + +class DslUtilsTest(unittest.TestCase): + + def test_sanitize_component_name(self): + self.assertEqual('comp-my-component', + dsl_utils.sanitize_component_name('My component')) + + def test_sanitize_executor_label(self): + self.assertEqual('exec-my-component', + dsl_utils.sanitize_executor_label('My component')) + + def test_sanitize_task_name(self): + self.assertEqual('task-my-component-1', + dsl_utils.sanitize_task_name('My component 1')) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/v2/dsl/importer_node.py b/sdk/python/kfp/v2/dsl/importer_node.py index 714cf92acc1..44293d06a79 100644 --- a/sdk/python/kfp/v2/dsl/importer_node.py +++ b/sdk/python/kfp/v2/dsl/importer_node.py @@ -13,6 +13,9 @@ # limitations under the License. """Utility function for building Importer Node spec.""" +from typing import Optional + +from kfp.v2.dsl import dsl_utils from kfp.pipeline_spec import pipeline_spec_pb2 OUTPUT_KEY = 'result' @@ -20,8 +23,8 @@ def build_importer_spec( input_type_schema: str, - pipeline_param_name: str = None, - constant_value: str = None + pipeline_param_name: Optional[str] = None, + constant_value: Optional[str] = None ) -> pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec: """Builds an importer executor spec. @@ -42,6 +45,7 @@ def build_importer_spec( 'constant_value.') importer_spec = pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec() importer_spec.type_schema.instance_schema = input_type_schema + # TODO: subject to IR change on artifact_uri message type. if pipeline_param_name: importer_spec.artifact_uri.runtime_parameter = pipeline_param_name elif constant_value: @@ -50,27 +54,62 @@ def build_importer_spec( def build_importer_task_spec( - dependent_task: pipeline_spec_pb2.PipelineTaskSpec, - input_name: str, - input_type_schema: str, + importer_base_name: str, ) -> pipeline_spec_pb2.PipelineTaskSpec: """Builds an importer task spec. Args: + importer_base_name: The base name of the importer node. + + Returns: + An importer node task spec. + """ + result = pipeline_spec_pb2.PipelineTaskSpec() + result.task_info.name = dsl_utils.sanitize_task_name(importer_base_name) + result.component_ref.name = dsl_utils.sanitize_component_name( + importer_base_name) + + return result + + +def build_importer_component_spec( + importer_base_name: str, + input_name: str, + input_type_schema: str, +) -> pipeline_spec_pb2.ComponentSpec: + """Builds an importer component spec. + + Args: + importer_base_name: The base name of the importer node. dependent_task: The task requires importer node. input_name: The name of the input artifact needs to be imported. input_type_schema: The type of the input artifact. Returns: - An importer node task spec. + An importer node component spec. """ - dependent_task_name = dependent_task.task_info.name + result = pipeline_spec_pb2.ComponentSpec() + result.executor_label = dsl_utils.sanitize_executor_label(importer_base_name) + result.input_definitions.parameters[ + input_name].type = pipeline_spec_pb2.PrimitiveType.STRING + result.output_definitions.artifacts[ + OUTPUT_KEY].artifact_type.instance_schema = input_type_schema + + return result + + +def generate_importer_base_name(dependent_task_name: str, + input_name: str) -> str: + """Generates the base name of an importer node. - task_spec = pipeline_spec_pb2.PipelineTaskSpec() - task_spec.task_info.name = '{}_{}_importer'.format(dependent_task_name, - input_name) - task_spec.outputs.artifacts[OUTPUT_KEY].artifact_type.instance_schema = ( - input_type_schema) - task_spec.executor_label = task_spec.task_info.name + The base name is formed by connecting the dependent task name and the input + artifact name. It's used to form task name, component ref, and executor label. - return task_spec + Args: + dependent_task_name: The name of the task requires importer node. + input_name: The name of the input artifact needs to be imported. + + Returns: + A base importer node name. + """ + return 'importer-{}-{}'.format(dependent_task_name, input_name) diff --git a/sdk/python/kfp/v2/dsl/importer_node_test.py b/sdk/python/kfp/v2/dsl/importer_node_test.py index 62a93590b01..23c2170a0e6 100644 --- a/sdk/python/kfp/v2/dsl/importer_node_test.py +++ b/sdk/python/kfp/v2/dsl/importer_node_test.py @@ -20,45 +20,20 @@ class ImporterNodeTest(unittest.TestCase): - def test_build_importer_task(self): - dependent_task = { - 'taskInfo': { - 'name': 'task1' - }, - 'inputs': { - 'artifacts': { - 'input1': { - 'producerTask': '', - } - } - }, - 'executorLabel': 'task1_input1_importer' - } - dependent_task_spec = pb.PipelineTaskSpec() - json_format.ParseDict(dependent_task, dependent_task_spec) - + def test_build_importer_task_spec(self): expected_task = { 'taskInfo': { - 'name': 'task1_input1_importer' + 'name': 'task-importer-task0-input1' }, - 'outputs': { - 'artifacts': { - 'result': { - 'artifactType': { - 'instanceSchema': 'title: kfp.Artifact' - } - } - } + 'componentRef': { + 'name': 'comp-importer-task0-input1' }, - 'executorLabel': 'task1_input1_importer' } expected_task_spec = pb.PipelineTaskSpec() json_format.ParseDict(expected_task, expected_task_spec) task_spec = importer_node.build_importer_task_spec( - dependent_task=dependent_task_spec, - input_name='input1', - input_type_schema='title: kfp.Artifact') + importer_base_name='importer-task0-input1') self.maxDiff = None self.assertEqual(expected_task_spec, task_spec) @@ -100,7 +75,7 @@ def test_build_importer_spec_from_constant_value(self): self.assertEqual(expected_importer_spec, importer_spec) def test_build_importer_spec_with_invalid_inputs_should_fail(self): - with self.assertRaisesRegexp( + with self.assertRaisesRegex( AssertionError, 'importer spec should be built using either pipeline_param_name or ' 'constant_value'): @@ -109,12 +84,49 @@ def test_build_importer_spec_with_invalid_inputs_should_fail(self): pipeline_param_name='param1', constant_value='some_uri') - with self.assertRaisesRegexp( + with self.assertRaisesRegex( AssertionError, 'importer spec should be built using either pipeline_param_name or ' 'constant_value'): importer_node.build_importer_spec(input_type_schema='title: kfp.Artifact') + def test_build_importer_component_spec(self): + expected_importer_component = { + 'inputDefinitions': { + 'parameters': { + 'input1': { + 'type': 'STRING' + } + } + }, + 'outputDefinitions': { + 'artifacts': { + 'result': { + 'artifactType': { + 'instanceSchema': 'title: kfp.Artifact' + } + } + } + }, + 'executorLabel': 'exec-importer-task0-input1' + } + expected_importer_comp_spec = pb.ComponentSpec() + json_format.ParseDict(expected_importer_component, + expected_importer_comp_spec) + importer_comp_spec = importer_node.build_importer_component_spec( + importer_base_name='importer-task0-input1', + input_name='input1', + input_type_schema='title: kfp.Artifact') + + self.maxDiff = None + self.assertEqual(expected_importer_comp_spec, importer_comp_spec) + + def test_generate_importer_base_name(self): + self.assertEqual( + 'importer-task0-input1', + importer_node.generate_importer_base_name( + dependent_task_name='task0', input_name='input1')) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/setup.py b/sdk/python/setup.py index d06b0487185..714ab3b3b1c 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -38,13 +38,13 @@ 'Deprecated', 'strip-hints', 'docstring-parser>=0.7.3', - 'kfp-pipeline-spec>=0.1.0, <0.2.0', + 'kfp-pipeline-spec>=0.1.4, <0.2.0', 'fire>=0.3.1' ] TESTS_REQUIRE = [ 'mock', - 'kfp-pipeline-spec>=0.1.0, <0.2.0', + 'kfp-pipeline-spec>=0.1.4, <0.2.0', ]