Skip to content

Commit

Permalink
fix(sdk.v2): Fix the bug where pipeline_parameters is not used. (kube…
Browse files Browse the repository at this point in the history
…flow#5002)

* fix bug on pipeline parameters not being used.

* address review comments
  • Loading branch information
chensun authored Jan 19, 2021
1 parent 3b9fdff commit 0c600a2
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 28 deletions.
2 changes: 1 addition & 1 deletion sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _create_pipeline_job(
A PipelineJob proto representing the compiled pipeline.
"""
runtime_config = compiler_utils.build_runtime_config_spec(
pipeline_root=pipeline_root)
pipeline_root=pipeline_root, pipeline_parameters=pipeline_parameters)
pipeline_job = pipeline_spec_pb2.PipelineJob(runtime_config=runtime_config)
pipeline_job.pipeline_spec.update(json_format.MessageToDict(pipeline_spec))

Expand Down
80 changes: 60 additions & 20 deletions sdk/python/kfp/v2/compiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,37 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""KFP SDK compiler CLI tool."""

import argparse
import kfp.dsl as dsl
import kfp.v2.compiler as compiler
import json
import os
import shutil
import subprocess
import sys
import tempfile
from deprecated.sphinx import deprecated
from typing import Any, Callable, List, Mapping, Optional

import kfp.dsl as dsl
import kfp.v2.compiler as compiler


def parse_arguments():
def parse_arguments() -> argparse.Namespace:
"""Parse command line arguments."""

parser = argparse.ArgumentParser()
parser.add_argument(
'--py', type=str, help='local absolute path to a py file.')
'--py', type=str, required=True, help='local absolute path to a py file.')
parser.add_argument(
'--function',
type=str,
help='The name of the function to compile if there are multiple.')
parser.add_argument(
'--pipeline-root', type=str, help='The root of the pipeline outputs.')
'--pipeline-root',
type=str,
required=True,
help='The root of the pipeline outputs.')
parser.add_argument(
'--pipeline-parameters',
type=json.loads,
help='The pipeline parameters in JSON dict format.')
parser.add_argument(
'--namespace', type=str, help='The namespace for the pipeline function')
parser.add_argument(
Expand All @@ -51,8 +58,21 @@ def parse_arguments():
return args


def _compile_pipeline_function(pipeline_funcs, function_name, pipeline_root,
output_path, type_check):
def _compile_pipeline_function(pipeline_funcs: List[Callable],
function_name: Optional[str], pipeline_root: str,
pipeline_parameters: Optional[Mapping[str, Any]],
output_path: str, type_check: bool) -> None:
"""Compiles a pipeline function.
Args:
pipeline_funcs: A list of pipeline_functions.
function_name: The name of the pipeline function to compile if there were
multiple.
pipeline_root: The root output directory for pipeline runtime.
pipeline_parameters: The pipeline parameters as a dict of {name: value}.
output_path: The output path of the compiled result.
type_check: Whether to enable the type checking.
"""
if len(pipeline_funcs) == 0:
raise ValueError(
'A function with @dsl.pipeline decorator is required in the py file.')
Expand Down Expand Up @@ -81,6 +101,7 @@ def _compile_pipeline_function(pipeline_funcs, function_name, pipeline_root,
compiler.Compiler().compile(
pipeline_func=pipeline_func,
pipeline_root=pipeline_root,
pipeline_parameters=pipeline_parameters,
output_path=output_path,
type_check=type_check)

Expand All @@ -90,7 +111,7 @@ class PipelineCollectorContext():
def __enter__(self):
pipeline_funcs = []

def add_pipeline(func):
def add_pipeline(func: Callable) -> Callable:
pipeline_funcs.append(func)
return func

Expand All @@ -102,14 +123,32 @@ def __exit__(self, *args):
dsl._pipeline._pipeline_decorator_handler = self.old_handler


def compile_pyfile(pyfile, function_name, pipeline_root, output_path, type_check):
def compile_pyfile(pyfile: str, function_name: Optional[str],
pipeline_root: str,
pipeline_parameters: Optional[Mapping[str, Any]],
output_path: str, type_check: bool) -> None:
"""Compiles a pipeline written in a .py file.
Args:
pyfile: The path to the .py file that contains the pipeline definition.
function_name: The name of the pipeline function.
pipeline_root: The root output directory for pipeline runtime.
pipeline_parameters: The pipeline parameters as a dict of {name: value}.
output_path: The output path of the compiled result.
type_check: Whether to enable the type checking.
"""
sys.path.insert(0, os.path.dirname(pyfile))
try:
filename = os.path.basename(pyfile)
with PipelineCollectorContext() as pipeline_funcs:
__import__(os.path.splitext(filename)[0])
_compile_pipeline_function(pipeline_funcs, function_name, pipeline_root,
output_path, type_check)
_compile_pipeline_function(
pipeline_funcs=pipeline_funcs,
function_name=function_name,
pipeline_root=pipeline_root,
pipeline_parameters=pipeline_parameters,
output_path=output_path,
type_check=type_check)
finally:
del sys.path[0]

Expand All @@ -119,9 +158,10 @@ def main():
if args.py is None:
raise ValueError('The --py option must be specified.')
compile_pyfile(
args.py,
args.function,
args.pipeline_root,
args.output,
not args.disable_type_check,
pyfile=args.py,
function_name=args.function,
pipeline_root=args.pipeline_root,
pipeline_parameters=args.pipeline_parameters,
output_path=args.output,
type_check=not args.disable_type_check,
)
6 changes: 3 additions & 3 deletions sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

class CompilerCliTests(unittest.TestCase):

def _test_compile_py_to_json(self, file_base_name):
def _test_compile_py_to_json(self, file_base_name, additional_arguments = []):
test_data_dir = os.path.join(os.path.dirname(__file__), 'test_data')
py_file = os.path.join(test_data_dir, '{}.py'.format(file_base_name))
tmpdir = tempfile.mkdtemp()
Expand All @@ -32,7 +32,7 @@ def _test_compile_py_to_json(self, file_base_name):
subprocess.check_call([
'dsl-compile-v2', '--py', py_file, '--pipeline-root', 'dummy_root',
'--output', target_json
])
] + additional_arguments)
with open(os.path.join(test_data_dir, file_base_name + '.json'),
'r') as f:
golden = json.load(f)
Expand All @@ -55,7 +55,7 @@ def test_two_step_pipeline_with_importer(self):
self._test_compile_py_to_json('two_step_pipeline_with_importer')

def test_simple_pipeline_without_importer(self):
self._test_compile_py_to_json('simple_pipeline_without_importer')
self._test_compile_py_to_json('simple_pipeline_without_importer', ['--pipeline-parameters', '{"text":"Hello KFP!"}'])

def test_pipeline_with_ontology(self):
self._test_compile_py_to_json('pipeline_with_ontology')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
"schemaVersion": "v2alpha1"
},
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
}
}
"gcsOutputDirectory": "dummy_root",
"parameters": {
"text": {
"stringValue": "Hello KFP!"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@


@dsl.pipeline(name='simple-two-step-pipeline')
def my_pipeline(text='Hello world!',):
def my_pipeline(text='Hello world!'):
component_1 = component_op_1(text=text)
component_2 = component_op_2(
input_gcs_path=component_1.outputs['output_gcs_path'])
Expand All @@ -65,4 +65,5 @@ def my_pipeline(text='Hello world!',):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
pipeline_root='dummy_root',
pipeline_parameters={'text': 'Hello KFP!'},
output_path=__file__ + '.json')

0 comments on commit 0c600a2

Please sign in to comment.