Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK - Make it easier to compile and submit a pipeline run #1484

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import json
import os
import tarfile
import tempfile
import zipfile
import yaml
from datetime import datetime
from typing import Mapping, Callable

import kfp_server_api

Expand Down Expand Up @@ -255,6 +257,43 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para
IPython.display.display(IPython.display.HTML(html))
return response.run

def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None):
'''Runs pipeline on KFP-enabled Kubernetes cluster.
This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution.

Args:
pipeline_func: A function that describes a pipeline by calling components and composing them into execution graph.
arguments: Arguments to the pipeline function provided as a dict.
run_name: Optional. Name of the run to be shown in the UI.
experiment_name: Optional. Name of the experiment to add the run to.
'''

class RunPipelineResult:
def __init__(self, client, run_info):
self._client = client
self.run_info = run_info
self.run_id = run_info.id

def wait_for_run_completion(self, timeout=None):
timeout = timeout or datetime.datetime.max - datetime.datetime.min
return self._client.wait_for_run_completion(timeout)

def __str__(self):
return '<RunPipelineResult(run_id={})>'.format(self.run_id)

#TODO: Check arguments against the pipeline function
pipeline_name = pipeline_func.__name__
experiment_name = experiment_name or 'Default'
run_name = run_name or pipeline_name + ' ' + datetime.now().strftime('%Y-%m-%d %H-%M-%S')
experiment = self.create_experiment(name=experiment_name)
try:
(_, pipeline_package_path) = tempfile.mkstemp(suffix='.zip')
kfp.compiler.Compiler().compile(pipeline_func, pipeline_package_path)
run_info = self.run_pipeline(experiment.id, run_name, pipeline_package_path, arguments)
return RunPipelineResult(self, run_info)
finally:
os.remove(pipeline_package_path)

def list_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None):
"""List runs.
Args:
Expand Down