Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding multiple outputs into sdk with sample #1667

Merged
merged 6 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions samples/notebooks/Multiple outputs - basics.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Multiple outputs example\n",
"This notebook is a simple example of how to make a component with multiple outputs using the Pipelines SDK.\n",
"\n",
"## Before running notebook:\n",
"\n",
"### Setup notebook server\n",
"This pipeline requires you to [setup a notebook server](https://www.kubeflow.org/docs/notebooks/setup/) in the Kubeflow UI. After you are setup, *upload the notebook in the Kubeflow UI* and then run it in the notebook server.\n",
"\n",
"### Create a GCS bucket\n",
"This pipeline requires a GCS bucket. If you haven't already, [create a GCS bucket](https://cloud.google.com/storage/docs/creating-buckets) to run the notebook. Make sure to create the storage bucket in the same project that you are running Kubeflow on to have the proper permissions by default. You can also create a GCS bucket by running `gsutil mb -p <project_name> gs://<bucket_name>`.\n",
"\n",
"### Upload the notebook in the Kubeflow UI\n",
"In order to run this pipeline, make sure to upload the notebook to your notebook server in the Kubeflow UI. You can clone this repo in the Jupyter notebook server by connecting to the notebook server and then selecting New > Terminal. In the terminal type `git clone https://github.com/kubeflow/pipelines.git`.\n",
"\n",
"### Install Kubeflow pipelines\n",
"Install the `kfp` package if you haven't already."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"!pip install kfp --upgrade"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup project info and imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"GCS_BUCKET = 'gs://[BUCKET-NAME]' # GCS bucket name\n",
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
"PROJECT_NAME = '[PROJECT-NAME]' # GCP project name\n",
"\n",
"STAGING_GCS_PATH = GCS_BUCKET + '/multiple-output-sample'\n",
"TARGET_IMAGE = 'gcr.io/%s/multi-output:latest' % PROJECT_NAME\n",
"\n",
"BASE_IMAGE = 'tensorflow/tensorflow:1.11.0-py3'\n",
"EXPERIMENT_NAME = 'Multiple Outputs Sample'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp \n",
"import kfp.dsl as dsl\n",
"from kfp import compiler\n",
"from typing import NamedTuple"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create component\n",
"In order to create a component with multiple outputs, use `NamedTuple` with the same syntax as below."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Returns a*b and a+b\n",
"@dsl.python_component(\n",
" name='product_sum',\n",
" description='Calculates the product and the sum',\n",
" base_image=BASE_IMAGE\n",
")\n",
"def product_sum(a: float, b: float) -> NamedTuple(\n",
" 'output', [('product', float), ('sum', float)]):\n",
" '''Returns the product and sum of two numbers'''\n",
" from collections import namedtuple\n",
" \n",
" product_sum_output = namedtuple('output', ['product', 'sum'])\n",
" return product_sum_output(a*b, a+b)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"product_sum_op = compiler.build_python_component(\n",
" component_func=product_sum,\n",
" staging_gcs_path=STAGING_GCS_PATH,\n",
" base_image=BASE_IMAGE,\n",
" target_image=TARGET_IMAGE)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create and run pipeline\n",
"### Create pipeline\n",
"The pipeline parameters are specified in the `pipeline` function signature."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" name='Multiple Outputs Pipeline',\n",
" description='Sample pipeline to showcase multiple outputs'\n",
")\n",
"def pipeline(a=2.0, b=2.5, c=3.0):\n",
" prod_sum_task = product_sum_op(a, b)\n",
" prod_sum_task2 = product_sum_op(b, c)\n",
" prod_sum_task3 = product_sum_op(prod_sum_task.outputs['product'],\n",
" prod_sum_task2.outputs['sum'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_filename = 'multiple-outputs.pipelines.zip'\n",
"compiler.Compiler().compile(pipeline, pipeline_filename)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client = kfp.Client()\n",
"experiment = client.create_experiment(EXPERIMENT_NAME)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"arguments = {\n",
" 'a': 2.0,\n",
" 'b': 2.5,\n",
" 'c': 3.0,\n",
"}\n",
"run_name = 'multiple output run'\n",
"run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename,\n",
" params=arguments)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
47 changes: 38 additions & 9 deletions sdk/python/kfp/compiler/_component_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ def end(self):
line_sep = '\n'
return line_sep.join(self._code) + line_sep

#TODO: currently it supports single output, future support for multiple return values
def _func_to_entrypoint(component_func, python_version='python3'):
'''
args:
Expand All @@ -182,15 +181,27 @@ def _func_to_entrypoint(component_func, python_version='python3'):
annotations = fullargspec[6]
input_args = fullargspec[0]
inputs = {}
output = None
if 'return' in annotations.keys():
output = annotations['return']
output_is_named_tuple = hasattr(output, '_fields')

for key, value in annotations.items():
if key != 'return':
inputs[key] = value
if len(input_args) != len(inputs):
raise Exception('Some input arguments do not contain annotations.')
if 'return' in annotations and annotations['return'] not in [int, float, str, bool]:
if 'return' in annotations and annotations['return'] not in [int,
float, str, bool] and not output_is_named_tuple:
raise Exception('Output type not supported and supported types are [int, float, str, bool]')
if output_is_named_tuple:
types = output._field_types
for field in output._fields: #Make sure all elements are supported
if types[field] not in [int, float, str, bool]:
raise Exception('Output type not supported and supported types are [int, float, str, bool]')

# inputs is a dictionary with key of argument name and value of type class
# output is a type class, e.g. str and int.
# output is a type class, e.g. int, str, bool, float, NamedTuple.

# Follow the same indentation with the component source codes.
component_src = inspect.getsource(component_func)
Expand All @@ -204,12 +215,15 @@ def _func_to_entrypoint(component_func, python_version='python3'):
func_signature = 'def ' + new_func_name + '('
for input_arg in input_args:
func_signature += input_arg + ','
func_signature += '_output_file):'
func_signature = func_signature + '_output_files' if output_is_named_tuple else func_signature + '_output_file'
func_signature += '):'
codegen.writeline(func_signature)

# Call user function
codegen.indent()
call_component_func = 'output = ' + component_func.__name__ + '('
if output_is_named_tuple:
call_component_func = call_component_func.replace('output', 'outputs')
for input_arg in input_args:
call_component_func += inputs[input_arg].__name__ + '(' + input_arg + '),'
call_component_func = call_component_func.rstrip(',')
Expand All @@ -218,6 +232,9 @@ def _func_to_entrypoint(component_func, python_version='python3'):

# Serialize output
codegen.writeline('import os')
if output_is_named_tuple:
codegen.writeline('for _output_file, output in zip(_output_files, outputs):')
codegen.indent()
codegen.writeline('os.makedirs(os.path.dirname(_output_file))')
codegen.writeline('with open(_output_file, "w") as data:')
codegen.indent()
Expand All @@ -230,7 +247,10 @@ def _func_to_entrypoint(component_func, python_version='python3'):
codegen.writeline('parser = argparse.ArgumentParser(description="Parsing arguments")')
for input_arg in input_args:
codegen.writeline('parser.add_argument("' + input_arg + '", type=' + inputs[input_arg].__name__ + ')')
codegen.writeline('parser.add_argument("_output_file", type=str)')
if output_is_named_tuple:
codegen.writeline('parser.add_argument("_output_files", type=str, nargs=' + str(len(annotations['return']._fields)) + ')')
else:
codegen.writeline('parser.add_argument("_output_file", type=str)')
codegen.writeline('args = vars(parser.parse_args())')
codegen.writeline('')
codegen.writeline('if __name__ == "__main__":')
Expand All @@ -247,6 +267,8 @@ def _func_to_entrypoint(component_func, python_version='python3'):
if python_version == 'python2':
src_lines[start_line_num] = 'def ' + component_func.__name__ + '(' + ', '.join((inspect.getfullargspec(component_func).args)) + '):'
dedecorated_component_src = '\n'.join(src_lines[start_line_num:])
if output_is_named_tuple:
dedecorated_component_src = 'from typing import NamedTuple\n' + dedecorated_component_src

complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end()
return complete_component_code
Expand Down Expand Up @@ -422,17 +444,24 @@ def _generate_pythonop(component_func, target_image, target_component_file=None)
#TODO: Humanize the input/output names
input_names = inspect.getfullargspec(component_func)[0]

output_name = 'output'
return_ann = inspect.signature(component_func).return_annotation
output_is_named_tuple = hasattr(return_ann, '_fields')

output_names = ['output']
if output_is_named_tuple:
output_names = return_ann._fields

component_spec = ComponentSpec(
name=component_name,
description=component_description,
inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Chnage type to actual type
outputs=[OutputSpec(name=output_name)],
inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Change type to actual type
outputs=[OutputSpec(name=output_name, type='str') for output_name in output_names],
implementation=ContainerImplementation(
container=ContainerSpec(
image=target_image,
#command=['python3', program_file], #TODO: Include the command line
args=[InputValuePlaceholder(input_name) for input_name in input_names] + [OutputPathPlaceholder(output_name)],
args=[InputValuePlaceholder(input_name) for input_name in input_names] +
[OutputPathPlaceholder(output_name) for output_name in output_names],
)
)
)
Expand Down
38 changes: 36 additions & 2 deletions sdk/python/tests/compiler/component_builder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pathlib import Path
import inspect
from collections import OrderedDict
from typing import NamedTuple

GCS_BASE = 'gs://kfp-testing/'

Expand Down Expand Up @@ -131,6 +132,12 @@ def sample_component_func_two(a: str, b: int) -> float:
def sample_component_func_three() -> float:
return 1.0

def sample_component_func_four() -> NamedTuple(
'output', [('a', float), ('b', str)]):
from collections import namedtuple
output = namedtuple('output', ['a', 'b'])
return output(1.0, 'test')

class TestGenerator(unittest.TestCase):
def test_generate_dockerfile(self):
""" Test generate dockerfile """
Expand Down Expand Up @@ -201,7 +208,7 @@ def test_generate_requirement(self):
self.assertEqual(target_payload, golden_payload)
os.remove(temp_file)

def test_generate_entrypoint(self):
def test_func_to_entrypoint(self):
""" Test entrypoint generation """

# prepare
Expand Down Expand Up @@ -284,7 +291,34 @@ def wrapper_sample_component_func_three(_output_file):
'''
self.assertEqual(golden, generated_codes)

def test_generate_entrypoint_python2(self):
generated_codes = _func_to_entrypoint(component_func=sample_component_func_four)
golden = '''\
from typing import NamedTuple
def sample_component_func_four() -> NamedTuple(
'output', [('a', float), ('b', str)]):
from collections import namedtuple
output = namedtuple('output', ['a', 'b'])
return output(1.0, 'test')

def wrapper_sample_component_func_four(_output_files):
outputs = sample_component_func_four()
import os
for _output_file, output in zip(_output_files, outputs):
os.makedirs(os.path.dirname(_output_file))
with open(_output_file, "w") as data:
data.write(str(output))

import argparse
parser = argparse.ArgumentParser(description="Parsing arguments")
parser.add_argument("_output_files", type=str, nargs=2)
args = vars(parser.parse_args())

if __name__ == "__main__":
wrapper_sample_component_func_four(**args)
'''
self.assertEqual(golden, generated_codes)

def test_func_to_entrypoint_python2(self):
""" Test entrypoint generation for python2"""

# prepare
Expand Down