Skip to content

Commit

Permalink
Samples - Updated the Data passing in python tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark-kun committed Jan 18, 2020
1 parent c0153a0 commit 2b7a9d1
Showing 1 changed file with 95 additions and 23 deletions.
118 changes: 95 additions & 23 deletions samples/tutorials/Data passing in python components.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"metadata": {},
"source": [
"# Data passing tutorial\n",
"Data passing is the most importnt aspect of Pipelines.\n",
"Data passing is the most important aspect of Pipelines.\n",
"\n",
"In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together.\n",
"\n",
Expand All @@ -25,8 +25,18 @@
"metadata": {},
"outputs": [],
"source": [
"# Install Kubeflow Pipelines SDK\n",
"!PIP_DISABLE_PIP_VERSION_CHECK=1 pip3 install 'kfp>=0.1.31.1' --quiet"
"# Put your KFP cluster endpoint URL here if working from GCP notebooks (or local notebooks). ('https://xxxxx.notebooks.googleusercontent.com/')\n",
"kfp_endpoint='https://XXXXX.notebooks.googleusercontent.com/'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install Kubeflow Pipelines SDK. Add the --user argument if you get permission errors.\n",
"!PIP_DISABLE_PIP_VERSION_CHECK=1 pip3 install 'kfp>=0.1.32.2' --quiet"
]
},
{
Expand All @@ -38,17 +48,29 @@
"from typing import NamedTuple\n",
"\n",
"import kfp\n",
"from kfp.components import InputPath, InputTextFile, InputBinaryFile, OutputPath, OutputTextFile, OutputBinaryFile\n",
"from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile\n",
"from kfp.components import func_to_container_op"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Initializing the client\n",
"client = kfp.Client()\n",
"\n",
"# ! Use kfp.Client(host='https://xxxxx.notebooks.googleusercontent.com/') if working from GCP notebooks (or local notebooks)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Small data\n",
"\n",
"Small data is the data that you'll be comfortable passing as program's command-line argument. Small data size should not exceed few kilobytes.\n",
"Small data is the data that you'll be comfortable passing as program's command-line argument. Smal data size should not exceed few kilobytes.\n",
"\n",
"Some examples of typical types of small data are: number, URL, small string (e.g. column name).\n",
"\n",
Expand All @@ -71,15 +93,15 @@
"outputs": [],
"source": [
"@func_to_container_op\n",
"def consume_one_argument(text: str):\n",
"def print_small_text(text: str):\n",
" '''Print small text'''\n",
" print(text)\n",
"\n",
"def constant_to_consumer_pipeline():\n",
" '''Pipeline that passes small constant string to to consumer'''\n",
" consume_task = consume_one_argument('Hello world') # Passing constant as argument to consumer\n",
" consume_task = print_small_text('Hello world') # Passing constant as argument to consumer\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(constant_to_consumer_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(constant_to_consumer_pipeline, arguments={})"
]
},
{
Expand All @@ -90,7 +112,7 @@
"source": [
"def pipeline_parameter_to_consumer_pipeline(text: str):\n",
" '''Pipeline that passes small pipeline parameter string to to consumer'''\n",
" consume_task = consume_one_argument(text) # Passing pipeline parameter as argument to consumer\n",
" consume_task = print_small_text(text) # Passing pipeline parameter as argument to consumer\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(\n",
" pipeline_parameter_to_consumer_pipeline,\n",
Expand Down Expand Up @@ -119,10 +141,10 @@
" '''Pipeline that passes small data from producer to consumer'''\n",
" produce_task = produce_one_small_output()\n",
" # Passing producer task output as argument to consumer\n",
" consume_task1 = consume_one_argument(produce_task.output) # task.output only works for single-output components\n",
" consume_task2 = consume_one_argument(produce_task.outputs['output']) # task.outputs[...] always works\n",
" consume_task1 = print_small_text(produce_task.output) # task.output only works for single-output components\n",
" consume_task2 = print_small_text(produce_task.outputs['output']) # task.outputs[...] always works\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(task_output_to_consumer_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(task_output_to_consumer_pipeline, arguments={})"
]
},
{
Expand Down Expand Up @@ -157,7 +179,7 @@
" consume_task3 = consume_two_arguments(produce2_task.outputs['text'], produce2_task.outputs['number'])\n",
"\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(producers_to_consumers_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(producers_to_consumers_pipeline, arguments={})"
]
},
{
Expand All @@ -184,9 +206,10 @@
"def processing_pipeline(text: str = \"Hello world\"):\n",
" truncate_task = truncate_text(text, max_length=5)\n",
" get_item_task = get_item_from_list(list=[3, 1, truncate_task.output, 1, 5, 9, 2, 6, 7], index=2)\n",
" print_small_text(get_item_task.output)\n",
"\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(processing_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(processing_pipeline, arguments={})"
]
},
{
Expand Down Expand Up @@ -233,16 +256,17 @@
"\n",
"# Reading bigger data\n",
"@func_to_container_op\n",
"def print_text(text_path: InputPath(str)):\n",
"def print_text(text_path: InputPath()): # The \"text\" input is untyped so that any data can be printed\n",
" '''Print text'''\n",
" with open(text_path, 'r') as reader:\n",
" for line in reader:\n",
" print(line, end = '')\n",
"\n",
"def print_repeating_lines_pipeline():\n",
" print_text(repeat_line(line='Hello', count=5).output) # Don't forget .output !\n",
" repeat_lines_task = repeat_line(line='Hello', count=5000)\n",
" print_text(repeat_lines_task.output) # Don't forget .output !\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={})"
]
},
{
Expand All @@ -265,12 +289,10 @@
" with open(even_lines_path, 'w') as even_writer:\n",
" while True:\n",
" line = reader.readline()\n",
" print(line)\n",
" if line == \"\":\n",
" break\n",
" odd_writer.write(line)\n",
" line = reader.readline()\n",
" print(line)\n",
" if line == \"\":\n",
" break\n",
" even_writer.write(line)\n",
Expand All @@ -281,7 +303,7 @@
" print_text(split_text_task.outputs['odd_lines'])\n",
" print_text(split_text_task.outputs['even_lines'])\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(text_splitting_pipeline, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline, arguments={})"
]
},
{
Expand All @@ -301,12 +323,10 @@
"def split_text_lines2(source_file: InputTextFile(str), odd_lines_file: OutputTextFile(str), even_lines_file: OutputTextFile(str)):\n",
" while True:\n",
" line = source_file.readline()\n",
" print(line)\n",
" if line == \"\":\n",
" break\n",
" odd_lines_file.write(line)\n",
" line = source_file.readline()\n",
" print(line)\n",
" if line == \"\":\n",
" break\n",
" even_lines_file.write(line)\n",
Expand All @@ -317,7 +337,59 @@
" print_text(split_text_task.outputs['odd_lines']).set_display_name('Odd lines')\n",
" print_text(split_text_task.outputs['even_lines']).set_display_name('Even lines')\n",
"\n",
"kfp.Client().create_run_from_pipeline_func(text_splitting_pipeline2, arguments={})"
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline2, arguments={})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example: Pipeline that generates then sums many numbers"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Writing many numbers\n",
"@func_to_container_op\n",
"def write_numbers(numbers_path: OutputPath(str), start: int = 0, count: int = 10):\n",
" with open(numbers_path, 'w') as writer:\n",
" for i in range(start, count):\n",
" writer.write(str(i) + '\\n')\n",
"\n",
"\n",
"# Reading and summing many numbers\n",
"@func_to_container_op\n",
"def sum_numbers(numbers_path: InputPath(str)) -> int:\n",
" sum = 0\n",
" with open(numbers_path, 'r') as reader:\n",
" for line in reader:\n",
" sum = sum + int(line)\n",
" return sum\n",
"\n",
"\n",
"\n",
"# Pipeline to sum 100000 numbers\n",
"def sum_pipeline(count: 'Integer' = 100000):\n",
" numbers_task = write_numbers(count=count)\n",
" print_text(numbers_task.output)\n",
"\n",
" sum_task = sum_numbers(numbers_task.outputs['numbers'])\n",
" print_text(sum_task.output)\n",
"\n",
"\n",
"# Running the pipeline\n",
"kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(sum_pipeline, arguments={})"
]
}
],
Expand Down

0 comments on commit 2b7a9d1

Please sign in to comment.