Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK - Compiler - Make it possible to create more portable pipelines #2271

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from datetime import datetime
from typing import Mapping, Callable

import kfp
import kfp_server_api

from kfp.compiler import compiler
Expand Down Expand Up @@ -318,7 +319,7 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para
IPython.display.display(IPython.display.HTML(html))
return response.run

def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None):
def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None, pipeline_conf: kfp.dsl.PipelineConf = None):
'''Runs pipeline on KFP-enabled Kubernetes cluster.
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.

Expand All @@ -333,7 +334,7 @@ def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapp
run_name = run_name or pipeline_name + ' ' + datetime.now().strftime('%Y-%m-%d %H-%M-%S')
try:
(_, pipeline_package_path) = tempfile.mkstemp(suffix='.zip')
compiler.Compiler().compile(pipeline_func, pipeline_package_path)
compiler.Compiler().compile(pipeline_func, pipeline_package_path, pipeline_conf=pipeline_conf)
return self.create_run_from_pipeline_package(pipeline_package_path, arguments, run_name, experiment_name)
finally:
os.remove(pipeline_package_path)
Expand Down
6 changes: 4 additions & 2 deletions sdk/python/kfp/_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
from typing import Mapping, Callable

from . import Client
from . import dsl


def run_pipeline_func_on_cluster(pipeline_func: Callable, arguments: Mapping[str, str], run_name : str = None, experiment_name : str = None, kfp_client : Client = None):
def run_pipeline_func_on_cluster(pipeline_func: Callable, arguments: Mapping[str, str], run_name : str = None, experiment_name : str = None, kfp_client : Client = None, pipeline_conf: dsl.PipelineConf = None):
'''Runs pipeline on KFP-enabled Kubernetes cluster.
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.

Expand All @@ -32,6 +33,7 @@ def run_pipeline_func_on_cluster(pipeline_func: Callable, arguments: Mapping[str
run_name: Optional. Name of the run to be shown in the UI.
experiment_name: Optional. Name of the experiment to add the run to.
kfp_client: Optional. An instance of kfp.Client configured for the desired KFP cluster.
pipeline_conf: Optional. kfp.dsl.PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options.ta
'''
kfp_client = kfp_client or Client()
return kfp_client.create_run_from_pipeline_func(pipeline_func, arguments, run_name, experiment_name)
return kfp_client.create_run_from_pipeline_func(pipeline_func, arguments, run_name, experiment_name, pipeline_conf)
41 changes: 24 additions & 17 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ def _create_volumes(self, pipeline):
volumes.sort(key=lambda x: x['name'])
return volumes

def _create_pipeline_workflow(self, args, pipeline, op_transformers=None):
def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeline_conf=None):
Ark-kun marked this conversation as resolved.
Show resolved Hide resolved
"""Create workflow for the pipeline."""

# Input Parameters
Expand Down Expand Up @@ -650,17 +650,17 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None):
}
}
# set ttl after workflow finishes
if pipeline.conf.ttl_seconds_after_finished >= 0:
workflow['spec']['ttlSecondsAfterFinished'] = pipeline.conf.ttl_seconds_after_finished
if pipeline_conf.ttl_seconds_after_finished >= 0:
workflow['spec']['ttlSecondsAfterFinished'] = pipeline_conf.ttl_seconds_after_finished

if len(pipeline.conf.image_pull_secrets) > 0:
if len(pipeline_conf.image_pull_secrets) > 0:
image_pull_secrets = []
for image_pull_secret in pipeline.conf.image_pull_secrets:
for image_pull_secret in pipeline_conf.image_pull_secrets:
image_pull_secrets.append(K8sHelper.convert_k8s_obj_to_json(image_pull_secret))
workflow['spec']['imagePullSecrets'] = image_pull_secrets

if pipeline.conf.timeout:
workflow['spec']['activeDeadlineSeconds'] = pipeline.conf.timeout
if pipeline_conf.timeout:
workflow['spec']['activeDeadlineSeconds'] = pipeline_conf.timeout

if exit_handler:
workflow['spec']['onExit'] = exit_handler.name
Expand Down Expand Up @@ -688,13 +688,13 @@ def _validate_exit_handler_helper(group, exiting_op_names, handler_exists):

return _validate_exit_handler_helper(pipeline.groups[0], [], False)

def _sanitize_and_inject_artifact(self, pipeline: dsl.Pipeline):
def _sanitize_and_inject_artifact(self, pipeline: dsl.Pipeline, pipeline_conf=None):
"""Sanitize operator/param names and inject pipeline artifact location."""

# Sanitize operator names and param names
sanitized_ops = {}
# pipeline level artifact location
artifact_location = pipeline.conf.artifact_location
artifact_location = pipeline_conf.artifact_location

for op in pipeline.ops.values():
# inject pipeline level artifact location into if the op does not have
Expand Down Expand Up @@ -732,7 +732,9 @@ def create_workflow(self,
pipeline_func: Callable,
pipeline_name: Text=None,
pipeline_description: Text=None,
params_list: List[dsl.PipelineParam]=None) -> Dict[Text, Any]:
params_list: List[dsl.PipelineParam]=None,
pipeline_conf: dsl.PipelineConf = None,
) -> Dict[Text, Any]:
""" Create workflow spec from pipeline function and specified pipeline
params/metadata. Currently, the pipeline params are either specified in
the signature of the pipeline function or by passing a list of
Expand All @@ -742,6 +744,7 @@ def create_workflow(self,
:param pipeline_name:
:param pipeline_description:
:param params_list: list of pipeline params to append to the pipeline.
:param pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options.
:return: workflow dict.
"""
params_list = params_list or []
Expand Down Expand Up @@ -778,8 +781,10 @@ def create_workflow(self,
with dsl.Pipeline(pipeline_name) as dsl_pipeline:
pipeline_func(*args_list)

pipeline_conf = pipeline_conf or dsl_pipeline.conf # Configuration passed to the compiler is overriding. Unfortunately, it's not trivial to detect whether the dsl_pipeline.conf was ever modified.
Ark-kun marked this conversation as resolved.
Show resolved Hide resolved

self._validate_exit_handler(dsl_pipeline)
self._sanitize_and_inject_artifact(dsl_pipeline)
self._sanitize_and_inject_artifact(dsl_pipeline, pipeline_conf)

# Fill in the default values.
args_list_with_defaults = []
Expand All @@ -802,12 +807,14 @@ def create_workflow(self,
default=param.value) for param in params_list]

op_transformers = [add_pod_env]
op_transformers.extend(dsl_pipeline.conf.op_transformers)
op_transformers.extend(pipeline_conf.op_transformers)

workflow = self._create_pipeline_workflow(
args_list_with_defaults,
dsl_pipeline,
op_transformers)
op_transformers,
pipeline_conf,
)

from ._data_passing_rewriter import fix_big_data_passing
workflow = fix_big_data_passing(workflow)
Expand All @@ -817,11 +824,11 @@ def create_workflow(self,

return workflow

def _compile(self, pipeline_func):
def _compile(self, pipeline_func, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow."""
return self.create_workflow(pipeline_func=pipeline_func)
return self.create_workflow(pipeline_func=pipeline_func, pipeline_conf=pipeline_conf)

def compile(self, pipeline_func, package_path, type_check=True):
def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None):
"""Compile the given pipeline function into workflow yaml.

Args:
Expand All @@ -833,7 +840,7 @@ def compile(self, pipeline_func, package_path, type_check=True):
type_check_old_value = kfp.TYPE_CHECK
try:
kfp.TYPE_CHECK = type_check
workflow = self._compile(pipeline_func)
workflow = self._compile(pipeline_func, pipeline_conf)
yaml.Dumper.ignore_aliases = lambda *args : True
yaml_text = yaml.dump(workflow, default_flow_style=False)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from ._pipeline_param import PipelineParam, match_serialized_pipelineparam
from ._pipeline import Pipeline, pipeline, get_pipeline_conf
from ._pipeline import Pipeline, pipeline, get_pipeline_conf, PipelineConf
from ._container_op import ContainerOp, InputArgumentPath, UserContainer, Sidecar
from ._resource_op import ResourceOp
from ._volume_op import (
Expand Down