Skip to content

Commit

Permalink
feat(components): KFP - Run component (#5338)
Browse files Browse the repository at this point in the history
* Components - KFP - Run component

This can be useful for dynamically running components or pipelines.

* Added sample pipeline

* Printing the run URL
  • Loading branch information
Ark-kun committed Jul 15, 2021
1 parent 4cf420d commit e996cdb
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
20 changes: 20 additions & 0 deletions components/kfp/Run_component/_samples/sample_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from kfp import components

kfp_endpoint = None


run_component_or_pipeline_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/2d13b358690c761f64e5b59b70624de8f1f52a29/components/kfp/Run_component/component.yaml')


def my_pipeline():
run_component_or_pipeline_op(
component_url='https://raw.githubusercontent.com/kubeflow/pipelines/68a367de3d1cc435637b0b4e78dcb42600fbbc37/components/basics/Calculate_hash/component.yaml',
arguments=dict(
data='Hello world',
),
)


if __name__ == '__main__':
import kfp
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(my_pipeline, arguments={})
49 changes: 49 additions & 0 deletions components/kfp/Run_component/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import NamedTuple


def run_component_or_pipeline(
component_url: 'Url',
arguments: dict,
endpoint: str = None,
wait_timeout_seconds: float = None,
) -> NamedTuple('Outputs', [
('run_id', str),
('run_object', 'JsonObject'), # kfp.ApiRunDetails
]):
import json
import os
import kfp
from kfp_server_api import ApiClient
print('Loading component...')
op = kfp.components.load_component_from_url(component_url)
print('Loading component done.')
print('Submitting run...')
if not endpoint:
endpoint = 'http://' + os.environ['ML_PIPELINE_SERVICE_HOST'] + ':' + os.environ['ML_PIPELINE_SERVICE_PORT']
create_run_result = kfp.Client(host=endpoint).create_run_from_pipeline_func(op, arguments=arguments)
run_id = str(create_run_result.run_id)
print('Submitted run: ' + run_id)
run_url = f'{endpoint.rstrip("/")}/#/runs/details/{run_id}'
print(run_url)
print('Waiting for the run to finish...')
run_object = create_run_result.wait_for_run_completion(wait_timeout_seconds)
print('Run has finished.')
# sanitize_for_serialization uses correct field names and properly converts datetime values
run_dict = ApiClient().sanitize_for_serialization(run_object)
return (
run_id,
json.dumps(run_dict, indent=4),
)


if __name__ == '__main__':
from kfp.components import create_component_from_func
run_component_or_pipeline_op = create_component_from_func(
run_component_or_pipeline,
base_image='python:3.9',
packages_to_install=['kfp==1.4.0'],
output_component_file='component.yaml',
annotations={
"author": "Alexey Volkov <alexey.volkov@ark-kun.com>",
},
)
119 changes: 119 additions & 0 deletions components/kfp/Run_component/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
name: Run component or pipeline
metadata:
annotations: {author: Alexey Volkov <alexey.volkov@ark-kun.com>}
inputs:
- {name: component_url, type: Url}
- {name: arguments, type: JsonObject}
- {name: endpoint, type: String, optional: true}
- {name: wait_timeout_seconds, type: Float, optional: true}
outputs:
- {name: run_id, type: String}
- {name: run_object, type: JsonObject}
implementation:
container:
image: python:3.9
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'kfp==1.4.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==1.4.0' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def run_component_or_pipeline(
component_url,
arguments,
endpoint = None,
wait_timeout_seconds = None,
):
import json
import os
import kfp
from kfp_server_api import ApiClient
print('Loading component...')
op = kfp.components.load_component_from_url(component_url)
print('Loading component done.')
print('Submitting run...')
if not endpoint:
endpoint = 'http://' + os.environ['ML_PIPELINE_SERVICE_HOST'] + ':' + os.environ['ML_PIPELINE_SERVICE_PORT']
create_run_result = kfp.Client(host=endpoint).create_run_from_pipeline_func(op, arguments=arguments)
run_id = str(create_run_result.run_id)
print('Submitted run: ' + run_id)
run_url = f'{endpoint.rstrip("/")}/#/runs/details/{run_id}'
print(run_url)
print('Waiting for the run to finish...')
run_object = create_run_result.wait_for_run_completion(wait_timeout_seconds)
print('Run has finished.')
# sanitize_for_serialization uses correct field names and properly converts datetime values
run_dict = ApiClient().sanitize_for_serialization(run_object)
return (
run_id,
json.dumps(run_dict, indent=4),
)
def _serialize_json(obj) -> str:
if isinstance(obj, str):
return obj
import json
def default_serializer(obj):
if hasattr(obj, 'to_struct'):
return obj.to_struct()
else:
raise TypeError("Object of type '%s' is not JSON serializable and does not have .to_struct() method." % obj.__class__.__name__)
return json.dumps(obj, default=default_serializer, sort_keys=True)
def _serialize_str(str_value: str) -> str:
if not isinstance(str_value, str):
raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value))))
return str_value
import json
import argparse
_parser = argparse.ArgumentParser(prog='Run component or pipeline', description='')
_parser.add_argument("--component-url", dest="component_url", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--arguments", dest="arguments", type=json.loads, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--endpoint", dest="endpoint", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--wait-timeout-seconds", dest="wait_timeout_seconds", type=float, required=False, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = run_component_or_pipeline(**_parsed_args)
_output_serializers = [
_serialize_str,
_serialize_json,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --component-url
- {inputValue: component_url}
- --arguments
- {inputValue: arguments}
- if:
cond: {isPresent: endpoint}
then:
- --endpoint
- {inputValue: endpoint}
- if:
cond: {isPresent: wait_timeout_seconds}
then:
- --wait-timeout-seconds
- {inputValue: wait_timeout_seconds}
- '----output-paths'
- {outputPath: run_id}
- {outputPath: run_object}

0 comments on commit e996cdb

Please sign in to comment.