Skip to content

Commit

Permalink
fix(samples): Update resource_spec, retry, secret samples to v2 pipel…
Browse files Browse the repository at this point in the history
…ines (kubeflow#9876)

* Update resource_spec, retry, secret samples to v2 pipelines

* Update resource_spec, retry, secret samples to v2 pipelines
  • Loading branch information
junggil authored Mar 16, 2024
1 parent 54e15de commit a9a433c
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 133 deletions.
24 changes: 13 additions & 11 deletions samples/core/resource_spec/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp.deprecated import components
from kfp.deprecated import dsl
from kfp import dsl


@components.create_component_from_func
@dsl.component
def training_op(n: int) -> int:
# quickly allocate a lot of memory to verify memory is enough
a = [i for i in range(n)]
Expand All @@ -25,19 +24,22 @@ def training_op(n: int) -> int:

@dsl.pipeline(
name='pipeline-with-resource-spec',
description='A pipeline with resource specification.'
)
description='A pipeline with resource specification.')
def my_pipeline(n: int = 11234567):
# For units of these resource limits,
# refer to https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes
# 11234567 roughly needs 400Mi+ memory.
training_task = training_op(n=n).set_cpu_request('1').set_cpu_limit(
'1'
).set_memory_request('512Mi').set_memory_limit('512Mi')
#
# Note, with v2 python components, there's a larger memory overhead caused
# by installing KFP SDK in the component, so we had to increase memory limit to 650M.
training_task = training_op(n=n).set_cpu_limit('1').set_memory_limit('650M')

# TODO(gkcalat): enable requests once SDK implements the feature
# training_task = training_task.set_cpu_request('1').set_memory_request('650M')

# TODO(Bobgy): other resource specs like cpu requests, memory requests and
# GPU limits are not available yet: https://github.com/kubeflow/pipelines/issues/6354.
# There are other resource spec you can set.
# For example, to use TPU, add the following:
# .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'tpu-v3')
# .set_gpu_limit(1)

# Disable cache for KFP v1 mode.
training_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
19 changes: 4 additions & 15 deletions samples/core/resource_spec/resource_spec_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.deprecated as kfp
from .resource_spec import my_pipeline
from .resource_spec_v2 import my_pipeline as my_pipeline_v2
from kfp import dsl
from resource_spec import my_pipeline
from kfp.samples.test.utils import run_pipeline_func, TestCase


Expand All @@ -24,23 +23,13 @@ def EXPECTED_OOM(run_id, run, **kwargs):


run_pipeline_func([
TestCase(
pipeline_func=my_pipeline_v2,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
TestCase(
pipeline_func=my_pipeline_v2,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
arguments={'n': 21234567},
verify_func=EXPECTED_OOM,
),
TestCase(
pipeline_func=my_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
mode=dsl.PipelineExecutionMode.V2_ENGINE,
),
TestCase(
pipeline_func=my_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
mode=dsl.PipelineExecutionMode.V2_ENGINE,
arguments={'n': 21234567},
verify_func=EXPECTED_OOM,
),
Expand Down
50 changes: 0 additions & 50 deletions samples/core/resource_spec/resource_spec_v2.py

This file was deleted.

28 changes: 15 additions & 13 deletions samples/core/resource_spec/runtime_resource_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp.deprecated import dsl, components, compiler
from kfp import dsl, compiler
from typing import NamedTuple

@components.create_component_from_func
@dsl.component
def training_op(n: int) -> int:
# quickly allocate a lot of memory to verify memory is enough
a = [i for i in range(n)]
return len(a)

@components.create_component_from_func
def generate_resource_request() -> NamedTuple('output', [('memory', str), ('cpu', str)]):
@dsl.component
def generate_resource_request() -> NamedTuple('output', memory=str, cpu=str):
'''Returns the memory and cpu request'''
from collections import namedtuple

resource_output = namedtuple('output', ['memory', 'cpu'])
resource_output = NamedTuple('output', memory=str, cpu=str)
return resource_output('500Mi', '200m')

@dsl.pipeline(
Expand All @@ -35,13 +33,17 @@ def generate_resource_request() -> NamedTuple('output', [('memory', str), ('cpu'
)
def resource_request_pipeline(n: int = 11234567):
resource_task = generate_resource_request()
traning_task = training_op(n)\
.set_memory_limit(resource_task.outputs['memory'])\
.set_cpu_limit(resource_task.outputs['cpu'])\
.set_cpu_request('200m')

# Disable cache for KFP v1 mode.
traning_task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
# TODO: support PipelineParameterChannel for resource input
# TypeError: expected string or bytes-like object, got 'PipelineParameterChannel'
# traning_task = training_op(n=n)\
# .set_memory_limit(resource_task.outputs['memory'])\
# .set_cpu_limit(resource_task.outputs['cpu'])\
# .set_cpu_request('200m')
traning_task = training_op(n=n)\
.set_memory_limit('500Mi')\
.set_cpu_limit('200m')\
.set_cpu_request('200m')

if __name__ == '__main__':
compiler.Compiler().compile(resource_request_pipeline, __file__ + '.yaml')
31 changes: 15 additions & 16 deletions samples/core/resource_spec/runtime_resource_request_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp.deprecated import dsl, components, compiler
from kfp import dsl, compiler
from typing import NamedTuple


@dsl.component(base_image='pytorch/pytorch:1.7.1-cuda11.0-cudnn8-runtime')
def training_job():
import torch
use_cuda = torch.cuda.is_available()
Expand All @@ -24,19 +25,13 @@ def training_job():
raise ValueError('GPU not available')


training_comp = components.create_component_from_func(
training_job,
base_image='pytorch/pytorch:1.7.1-cuda11.0-cudnn8-runtime',
packages_to_install=[]
)

@components.create_component_from_func
def generate_resource_constraints_request() -> NamedTuple('output', [('gpu_vendor', str), ('nbr_gpus', str), ('constrain_type', str), ('constrain_value', str)]):
@dsl.component
def generate_resource_constraints_request() -> NamedTuple('output', nbr_gpus=str, accelerator=str):
"""Returns the gpu resource and constraints settings"""
from collections import namedtuple
output = namedtuple('output', ['gpu_vendor', 'nbr_gpu', 'constrain_type', 'constrain_value'])
output = NamedTuple('output', nbr_gpus=str, accelerator=str)

return output('1', 'NVIDIA_TESLA_K80')

return output( 'nvidia.com/gpu', '1', 'cloud.google.com/gke-accelerator', 'nvidia-tesla-p4')

@dsl.pipeline(
name='Runtime resource request pipeline',
Expand All @@ -45,10 +40,14 @@ def generate_resource_constraints_request() -> NamedTuple('output', [('gpu_vendo
def resource_constraint_request_pipeline():
resource_constraints_task = generate_resource_constraints_request()

traning_task = training_comp().set_gpu_limit(resource_constraints_task.outputs['nbr_gpus'], resource_constraints_task.outputs['gpu_vendor'])\
.add_node_selector_constraint(resource_constraints_task.outputs['constrain_type'], resource_constraints_task.outputs['constrain_value'])
# Disable cache for KFP v1 mode.
traning_task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
# TODO: support PipelineParameterChannel for .set_accelerator_type
# TypeError: expected string or bytes-like object, got 'PipelineParameterChannel'
# traning_task = training_job()\
# .set_accelerator_limit(resource_constraints_task.outputs['nbr_gpus'])\
# .set_accelerator_type(resource_constraints_task.outputs['accelerator'])\
traning_task = training_job()\
.set_accelerator_limit(resource_constraints_task.outputs['nbr_gpus'])\
.set_accelerator_type('NVIDIA_TESLA_K80')

if __name__ == '__main__':
compiler.Compiler().compile(resource_constraint_request_pipeline, __file__ + '.yaml')
7 changes: 3 additions & 4 deletions samples/core/resource_spec/runtime_resource_request_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.deprecated as kfp
from .runtime_resource_request import resource_request_pipeline
from runtime_resource_request import resource_request_pipeline
from kfp.samples.test.utils import run_pipeline_func, TestCase


Expand All @@ -25,11 +24,11 @@ def EXPECTED_OOM(run_id, run, **kwargs):
run_pipeline_func([
TestCase(
pipeline_func=resource_request_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
TestCase(
pipeline_func=resource_request_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
arguments={'n': 21234567},
verify_func=EXPECTED_OOM,
),
Expand Down
11 changes: 5 additions & 6 deletions samples/core/retry/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
# limitations under the License.


from kfp.deprecated import dsl, compiler
import kfp.deprecated.components as comp
from kfp import dsl, compiler


@comp.create_component_from_func
def random_failure_op(exit_codes):
@dsl.component
def random_failure_op(exit_codes: str):
"""A component that fails randomly."""
import random
import sys
Expand All @@ -34,8 +33,8 @@ def random_failure_op(exit_codes):
description='The pipeline includes two steps which fail randomly. It shows how to use ContainerOp(...).set_retry(...).'
)
def retry_sample_pipeline():
op1 = random_failure_op('0,1,2,3').set_retry(10)
op2 = random_failure_op('0,1').set_retry(5)
op1 = random_failure_op(exit_codes='0,1,2,3').set_retry(10)
op2 = random_failure_op(exit_codes='0,1').set_retry(5)


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions samples/core/retry/retry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.deprecated as kfp
import kfp
from kfp.samples.test.utils import TestCase, relative_path, run_pipeline_func

run_pipeline_func([
TestCase(
pipeline_file=relative_path(__file__, 'retry.py'),
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
])
26 changes: 12 additions & 14 deletions samples/core/secret/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
# limitations under the License.


from kfp.deprecated import dsl, compiler, components
from kfp import dsl, compiler
from kfp.components import load_component_from_text


# Accessing GCS using the Google Cloud SDK command-line programs
gcs_list_items_op = components.load_component_from_text('''
gcs_list_items_op = load_component_from_text(text='''
name: GCS - List items
inputs:
- {name: Uri}
- {name: url, type: STRING}
implementation:
container:
image: 'google/cloud-sdk:279.0.0'
Expand All @@ -35,11 +36,15 @@
fi
gcloud auth list
gsutil ls "$0"
- {inputValue: Uri}
- {inputValue: url}
''')


# Accessing GCS using the Google Cloud Python library
@dsl.component(
base_image='python:3.7',
packages_to_install=['google-cloud-storage==1.31.2']
)
def gcs_list_buckets():
from google.cloud import storage
storage_client = storage.Client()
Expand All @@ -49,23 +54,16 @@ def gcs_list_buckets():
print(bucket.name)


gcs_list_buckets_op = components.create_component_from_func(
gcs_list_buckets,
base_image='python:3.7',
packages_to_install=['google-cloud-storage==1.31.2'],
)


@dsl.pipeline(
name='secret-pipeline',
description='A pipeline to demonstrate mounting and use of secretes.'
)
def secret_op_pipeline(
url='gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt'):
url:str='gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt'):
"""A pipeline that uses secret to access cloud hosted resouces."""

gcs_list_items_task = gcs_list_items_op(url)
gcs_list_buckets_task = gcs_list_buckets_op()
gcs_list_items_task = gcs_list_items_op(url=url)
gcs_list_buckets_task = gcs_list_buckets()

if __name__ == '__main__':
compiler.Compiler().compile(secret_op_pipeline, __file__ + '.yaml')
4 changes: 2 additions & 2 deletions samples/core/secret/secret_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.deprecated as kfp
import kfp
from kfp.samples.test.utils import TestCase, relative_path, run_pipeline_func

run_pipeline_func([
TestCase(
pipeline_file=relative_path(__file__, 'secret.py'),
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
])

0 comments on commit a9a433c

Please sign in to comment.