From 2b7a9d17fa48672eb3df32ce5f53625024f5ec9b Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 17 Jan 2020 15:54:23 -0800 Subject: [PATCH] Samples - Updated the Data passing in python tutorial --- .../Data passing in python components.ipynb | 118 ++++++++++++++---- 1 file changed, 95 insertions(+), 23 deletions(-) diff --git a/samples/tutorials/Data passing in python components.ipynb b/samples/tutorials/Data passing in python components.ipynb index 0a8f2a31499..a90010111d1 100644 --- a/samples/tutorials/Data passing in python components.ipynb +++ b/samples/tutorials/Data passing in python components.ipynb @@ -5,7 +5,7 @@ "metadata": {}, "source": [ "# Data passing tutorial\n", - "Data passing is the most importnt aspect of Pipelines.\n", + "Data passing is the most important aspect of Pipelines.\n", "\n", "In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together.\n", "\n", @@ -25,8 +25,18 @@ "metadata": {}, "outputs": [], "source": [ - "# Install Kubeflow Pipelines SDK\n", - "!PIP_DISABLE_PIP_VERSION_CHECK=1 pip3 install 'kfp>=0.1.31.1' --quiet" + "# Put your KFP cluster endpoint URL here if working from GCP notebooks (or local notebooks). ('https://xxxxx.notebooks.googleusercontent.com/')\n", + "kfp_endpoint='https://XXXXX.notebooks.googleusercontent.com/'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Install Kubeflow Pipelines SDK. Add the --user argument if you get permission errors.\n", + "!PIP_DISABLE_PIP_VERSION_CHECK=1 pip3 install 'kfp>=0.1.32.2' --quiet" ] }, { @@ -38,17 +48,29 @@ "from typing import NamedTuple\n", "\n", "import kfp\n", - "from kfp.components import InputPath, InputTextFile, InputBinaryFile, OutputPath, OutputTextFile, OutputBinaryFile\n", + "from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile\n", "from kfp.components import func_to_container_op" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initializing the client\n", + "client = kfp.Client()\n", + "\n", + "# ! Use kfp.Client(host='https://xxxxx.notebooks.googleusercontent.com/') if working from GCP notebooks (or local notebooks)" + ] + }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Small data\n", "\n", - "Small data is the data that you'll be comfortable passing as program's command-line argument. Small data size should not exceed few kilobytes.\n", + "Small data is the data that you'll be comfortable passing as program's command-line argument. Smal data size should not exceed few kilobytes.\n", "\n", "Some examples of typical types of small data are: number, URL, small string (e.g. column name).\n", "\n", @@ -71,15 +93,15 @@ "outputs": [], "source": [ "@func_to_container_op\n", - "def consume_one_argument(text: str):\n", + "def print_small_text(text: str):\n", " '''Print small text'''\n", " print(text)\n", "\n", "def constant_to_consumer_pipeline():\n", " '''Pipeline that passes small constant string to to consumer'''\n", - " consume_task = consume_one_argument('Hello world') # Passing constant as argument to consumer\n", + " consume_task = print_small_text('Hello world') # Passing constant as argument to consumer\n", "\n", - "kfp.Client().create_run_from_pipeline_func(constant_to_consumer_pipeline, arguments={})" + "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(constant_to_consumer_pipeline, arguments={})" ] }, { @@ -90,7 +112,7 @@ "source": [ "def pipeline_parameter_to_consumer_pipeline(text: str):\n", " '''Pipeline that passes small pipeline parameter string to to consumer'''\n", - " consume_task = consume_one_argument(text) # Passing pipeline parameter as argument to consumer\n", + " consume_task = print_small_text(text) # Passing pipeline parameter as argument to consumer\n", "\n", "kfp.Client().create_run_from_pipeline_func(\n", " pipeline_parameter_to_consumer_pipeline,\n", @@ -119,10 +141,10 @@ " '''Pipeline that passes small data from producer to consumer'''\n", " produce_task = produce_one_small_output()\n", " # Passing producer task output as argument to consumer\n", - " consume_task1 = consume_one_argument(produce_task.output) # task.output only works for single-output components\n", - " consume_task2 = consume_one_argument(produce_task.outputs['output']) # task.outputs[...] always works\n", + " consume_task1 = print_small_text(produce_task.output) # task.output only works for single-output components\n", + " consume_task2 = print_small_text(produce_task.outputs['output']) # task.outputs[...] always works\n", "\n", - "kfp.Client().create_run_from_pipeline_func(task_output_to_consumer_pipeline, arguments={})" + "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(task_output_to_consumer_pipeline, arguments={})" ] }, { @@ -157,7 +179,7 @@ " consume_task3 = consume_two_arguments(produce2_task.outputs['text'], produce2_task.outputs['number'])\n", "\n", "\n", - "kfp.Client().create_run_from_pipeline_func(producers_to_consumers_pipeline, arguments={})" + "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(producers_to_consumers_pipeline, arguments={})" ] }, { @@ -184,9 +206,10 @@ "def processing_pipeline(text: str = \"Hello world\"):\n", " truncate_task = truncate_text(text, max_length=5)\n", " get_item_task = get_item_from_list(list=[3, 1, truncate_task.output, 1, 5, 9, 2, 6, 7], index=2)\n", + " print_small_text(get_item_task.output)\n", "\n", "\n", - "kfp.Client().create_run_from_pipeline_func(processing_pipeline, arguments={})" + "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(processing_pipeline, arguments={})" ] }, { @@ -233,16 +256,17 @@ "\n", "# Reading bigger data\n", "@func_to_container_op\n", - "def print_text(text_path: InputPath(str)):\n", + "def print_text(text_path: InputPath()): # The \"text\" input is untyped so that any data can be printed\n", " '''Print text'''\n", " with open(text_path, 'r') as reader:\n", " for line in reader:\n", " print(line, end = '')\n", "\n", "def print_repeating_lines_pipeline():\n", - " print_text(repeat_line(line='Hello', count=5).output) # Don't forget .output !\n", + " repeat_lines_task = repeat_line(line='Hello', count=5000)\n", + " print_text(repeat_lines_task.output) # Don't forget .output !\n", "\n", - "kfp.Client().create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={})" + "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(print_repeating_lines_pipeline, arguments={})" ] }, { @@ -265,12 +289,10 @@ " with open(even_lines_path, 'w') as even_writer:\n", " while True:\n", " line = reader.readline()\n", - " print(line)\n", " if line == \"\":\n", " break\n", " odd_writer.write(line)\n", " line = reader.readline()\n", - " print(line)\n", " if line == \"\":\n", " break\n", " even_writer.write(line)\n", @@ -281,7 +303,7 @@ " print_text(split_text_task.outputs['odd_lines'])\n", " print_text(split_text_task.outputs['even_lines'])\n", "\n", - "kfp.Client().create_run_from_pipeline_func(text_splitting_pipeline, arguments={})" + "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline, arguments={})" ] }, { @@ -301,12 +323,10 @@ "def split_text_lines2(source_file: InputTextFile(str), odd_lines_file: OutputTextFile(str), even_lines_file: OutputTextFile(str)):\n", " while True:\n", " line = source_file.readline()\n", - " print(line)\n", " if line == \"\":\n", " break\n", " odd_lines_file.write(line)\n", " line = source_file.readline()\n", - " print(line)\n", " if line == \"\":\n", " break\n", " even_lines_file.write(line)\n", @@ -317,7 +337,59 @@ " print_text(split_text_task.outputs['odd_lines']).set_display_name('Odd lines')\n", " print_text(split_text_task.outputs['even_lines']).set_display_name('Even lines')\n", "\n", - "kfp.Client().create_run_from_pipeline_func(text_splitting_pipeline2, arguments={})" + "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(text_splitting_pipeline2, arguments={})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example: Pipeline that generates then sums many numbers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Writing many numbers\n", + "@func_to_container_op\n", + "def write_numbers(numbers_path: OutputPath(str), start: int = 0, count: int = 10):\n", + " with open(numbers_path, 'w') as writer:\n", + " for i in range(start, count):\n", + " writer.write(str(i) + '\\n')\n", + "\n", + "\n", + "# Reading and summing many numbers\n", + "@func_to_container_op\n", + "def sum_numbers(numbers_path: InputPath(str)) -> int:\n", + " sum = 0\n", + " with open(numbers_path, 'r') as reader:\n", + " for line in reader:\n", + " sum = sum + int(line)\n", + " return sum\n", + "\n", + "\n", + "\n", + "# Pipeline to sum 100000 numbers\n", + "def sum_pipeline(count: 'Integer' = 100000):\n", + " numbers_task = write_numbers(count=count)\n", + " print_text(numbers_task.output)\n", + "\n", + " sum_task = sum_numbers(numbers_task.outputs['numbers'])\n", + " print_text(sum_task.output)\n", + "\n", + "\n", + "# Running the pipeline\n", + "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(sum_pipeline, arguments={})" ] } ],