Skip to content

Commit

Permalink
Samples - Updated the Data passing in python tutorial (kubeflow#2868)
Browse files Browse the repository at this point in the history
* Samples - Updated the Data passing in python tutorial

* Fixed the review issues

* Added missing kfp_endpoint

* Stopped shadowing the system list type

* Added added a bit more infor about size limitations

* Changed the package installation
  • Loading branch information
Ark-kun authored and Jeffwan committed Dec 9, 2020
1 parent 9084556 commit 150e245
Showing 1 changed file with 88 additions and 28 deletions.
116 changes: 88 additions & 28 deletions samples/tutorials/Data passing in python components.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"metadata": {},
"source": [
"# Data passing tutorial\n",
"Data passing is the most importnt aspect of Pipelines.\n",
"Data passing is the most important aspect of Pipelines.\n",
"\n",
"In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together.\n",
"\n",
Expand All @@ -25,8 +25,18 @@
"metadata": {},
"outputs": [],
"source": [
"# Install Kubeflow Pipelines SDK\n",
"!PIP_DISABLE_PIP_VERSION_CHECK=1 pip3 install 'kfp>=0.1.31.1' --quiet"
"# Put your KFP cluster endpoint URL here if working from GCP notebooks (or local notebooks). ('https://xxxxx.notebooks.googleusercontent.com/')\n",
"kfp_endpoint='https://XXXXX.{pipelines|notebooks}.googleusercontent.com/'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install Kubeflow Pipelines SDK. Add the --user argument if you get permission errors.\n",
"!PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install 'kfp>=0.1.32.2' --quiet --user"
]
},
{
Expand All @@ -38,7 +48,7 @@
"from typing import NamedTuple\n",
"\n",
"import kfp\n",
"from kfp.components import InputPath, InputTextFile, InputBinaryFile, OutputPath, OutputTextFile, OutputBinaryFile\n",
"from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile\n",
"from kfp.components import func_to_container_op"
]
},
Expand All @@ -52,7 +62,7 @@
"\n",
"Some examples of typical types of small data are: number, URL, small string (e.g. column name).\n",
"\n",
"Small lists, dictionaries and JSON structures are fine, but keep an eye on the size and consider switching to file-based data passing methods taht are more suitable for bigger data.\n",
"Small lists, dictionaries and JSON structures are fine, but keep an eye on the size and consider switching to file-based data passing methods taht are more suitable for bigger data (more than several kilobytes) or binary data.\n",
"\n",
"All small data outputs will be at some point serialized to strings and all small data input values will be at some point deserialized from strings (passed as command-line argumants). There are built-in serializers and deserializers for several common types (e.g. `str`, `int`, `float`, `bool`, `list`, `dict`). All other types of data need to be serialized manually before returning the data. Make sure to properly specify type annotations, otherwize there would be no automatic deserialization and the component function will receive strings instead of deserialized objects."
]
Expand All @@ -71,15 +81,15 @@
"outputs": [],
"source": [
"@func_to_container_op\n",
"def consume_one_argument(text: str):\n",
"def print_small_text(text: str):\n",
" '''Print small text'''\n",
" print(text)\n",
"\n",
"def constant_to_consumer_pipeline():\n",
" '''Pipeline that passes small constant string to to consumer'''\n",
" consume_task = consume_one_argument('Hello world') # Passing constant as argument to consumer\n",
" consume_task = print_small_text('Hello world') # Passing constant as argument to consumer\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(constant_to_consumer_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(constant_to_consumer_pipeline, arguments={})"
]
},
{
Expand All @@ -90,9 +100,9 @@
"source": [
"def pipeline_parameter_to_consumer_pipeline(text: str):\n",
" '''Pipeline that passes small pipeline parameter string to to consumer'''\n",
" consume_task = consume_one_argument(text) # Passing pipeline parameter as argument to consumer\n",
" consume_task = print_small_text(text) # Passing pipeline parameter as argument to consumer\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
" pipeline_parameter_to_consumer_pipeline,\n",
" arguments={'text': 'Hello world'}\n",
")"
Expand All @@ -119,10 +129,10 @@
" '''Pipeline that passes small data from producer to consumer'''\n",
" produce_task = produce_one_small_output()\n",
" # Passing producer task output as argument to consumer\n",
" consume_task1 = consume_one_argument(produce_task.output) # task.output only works for single-output components\n",
" consume_task2 = consume_one_argument(produce_task.outputs['output']) # task.outputs[...] always works\n",
" consume_task1 = print_small_text(produce_task.output) # task.output only works for single-output components\n",
" consume_task2 = print_small_text(produce_task.outputs['output']) # task.outputs[...] always works\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(task_output_to_consumer_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(task_output_to_consumer_pipeline, arguments={})"
]
},
{
Expand Down Expand Up @@ -157,7 +167,7 @@
" consume_task3 = consume_two_arguments(produce2_task.outputs['text'], produce2_task.outputs['number'])\n",
"\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(producers_to_consumers_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(producers_to_consumers_pipeline, arguments={})"
]
},
{
Expand All @@ -174,19 +184,20 @@
"outputs": [],
"source": [
"@func_to_container_op\n",
"def get_item_from_list(list: list, index: int) -> str:\n",
" return list[index]\n",
"def get_item_from_list(list_of_strings: list, index: int) -> str:\n",
" return list_of_strings[index]\n",
"\n",
"@func_to_container_op\n",
"def truncate_text(text: str, max_length: int) -> str:\n",
" return text[0:max_length]\n",
"\n",
"def processing_pipeline(text: str = \"Hello world\"):\n",
" truncate_task = truncate_text(text, max_length=5)\n",
" get_item_task = get_item_from_list(list=[3, 1, truncate_task.output, 1, 5, 9, 2, 6, 7], index=2)\n",
" get_item_task = get_item_from_list(list_of_strings=[3, 1, truncate_task.output, 1, 5, 9, 2, 6, 7], index=2)\n",
" print_small_text(get_item_task.output)\n",
"\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(processing_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(processing_pipeline, arguments={})"
]
},
{
Expand Down Expand Up @@ -233,16 +244,17 @@
"\n",
"# Reading bigger data\n",
"@func_to_container_op\n",
"def print_text(text_path: InputPath(str)):\n",
"def print_text(text_path: InputPath()): # The \"text\" input is untyped so that any data can be printed\n",
" '''Print text'''\n",
" with open(text_path, 'r') as reader:\n",
" for line in reader:\n",
" print(line, end = '')\n",
"\n",
"def print_repeating_lines_pipeline():\n",
" print_text(repeat_line(line='Hello', count=5).output) # Don't forget .output !\n",
" repeat_lines_task = repeat_line(line='Hello', count=5000)\n",
" print_text(repeat_lines_task.output) # Don't forget .output !\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={})"
]
},
{
Expand All @@ -265,12 +277,10 @@
" with open(even_lines_path, 'w') as even_writer:\n",
" while True:\n",
" line = reader.readline()\n",
" print(line)\n",
" if line == \"\":\n",
" break\n",
" odd_writer.write(line)\n",
" line = reader.readline()\n",
" print(line)\n",
" if line == \"\":\n",
" break\n",
" even_writer.write(line)\n",
Expand All @@ -281,7 +291,7 @@
" print_text(split_text_task.outputs['odd_lines'])\n",
" print_text(split_text_task.outputs['even_lines'])\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(text_splitting_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline, arguments={})"
]
},
{
Expand All @@ -301,12 +311,10 @@
"def split_text_lines2(source_file: InputTextFile(str), odd_lines_file: OutputTextFile(str), even_lines_file: OutputTextFile(str)):\n",
" while True:\n",
" line = source_file.readline()\n",
" print(line)\n",
" if line == \"\":\n",
" break\n",
" odd_lines_file.write(line)\n",
" line = source_file.readline()\n",
" print(line)\n",
" if line == \"\":\n",
" break\n",
" even_lines_file.write(line)\n",
Expand All @@ -317,7 +325,59 @@
" print_text(split_text_task.outputs['odd_lines']).set_display_name('Odd lines')\n",
" print_text(split_text_task.outputs['even_lines']).set_display_name('Even lines')\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(text_splitting_pipeline2, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline2, arguments={})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example: Pipeline that generates then sums many numbers"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Writing many numbers\n",
"@func_to_container_op\n",
"def write_numbers(numbers_path: OutputPath(str), start: int = 0, count: int = 10):\n",
" with open(numbers_path, 'w') as writer:\n",
" for i in range(start, count):\n",
" writer.write(str(i) + '\\n')\n",
"\n",
"\n",
"# Reading and summing many numbers\n",
"@func_to_container_op\n",
"def sum_numbers(numbers_path: InputPath(str)) -> int:\n",
" sum = 0\n",
" with open(numbers_path, 'r') as reader:\n",
" for line in reader:\n",
" sum = sum + int(line)\n",
" return sum\n",
"\n",
"\n",
"\n",
"# Pipeline to sum 100000 numbers\n",
"def sum_pipeline(count: 'Integer' = 100000):\n",
" numbers_task = write_numbers(count=count)\n",
" print_text(numbers_task.output)\n",
"\n",
" sum_task = sum_numbers(numbers_task.outputs['numbers'])\n",
" print_text(sum_task.output)\n",
"\n",
"\n",
"# Running the pipeline\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(sum_pipeline, arguments={})"
]
}
],
Expand All @@ -342,4 +402,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}

0 comments on commit 150e245

Please sign in to comment.