Skip to content

Commit

Permalink
feat(sdk): adds support for ephemeral-storage in container-op (#4504)
Browse files Browse the repository at this point in the history
* add type annotation to container class

* add support for ephemeral storage in container op
  • Loading branch information
munagekar authored Sep 20, 2020
1 parent 5613db0 commit 0653e7c
Showing 1 changed file with 52 additions and 32 deletions.
84 changes: 52 additions & 32 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _wrapped(*args, **kwargs):
def _create_getter_setter(prop):
"""Create a tuple of getter and setter methods for a property in `Container`."""
def _getter(self):
return getattr(self._container, prop)
return getattr(self._container, prop)
def _setter(self, value):
return setattr(self._container, prop, value)
return _getter, _setter
Expand Down Expand Up @@ -166,17 +166,17 @@ def __init__(self, image: str, command: List[str], args: List[str],
super(Container, self).__init__(
image=image, command=command, args=args, **kwargs)

def _validate_memory_string(self, memory_string):
"""Validate a given string is valid for memory request or limit."""
def _validate_size_string(self, size_string):
"""Validate a given string is valid for memory/ephemeral-storage request or limit."""

if isinstance(memory_string, _pipeline_param.PipelineParam):
if memory_string.value:
memory_string = memory_string.value
if isinstance(size_string, _pipeline_param.PipelineParam):
if size_string.value:
size_string = size_string.value
else:
return

if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory_string) is None:
size_string) is None:
raise ValueError(
'Invalid memory string. Should be an integer, or integer followed '
'by one of "E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki"')
Expand Down Expand Up @@ -218,7 +218,7 @@ def _validate_positive_number(self, str_value, param_name):
if int_value <= 0:
raise ValueError('{} must be positive integer.'.format(param_name))

def add_resource_limit(self, resource_name, value):
def add_resource_limit(self, resource_name, value) -> 'Container':
"""Add the resource limit of the container.
Args:
Expand All @@ -231,7 +231,7 @@ def add_resource_limit(self, resource_name, value):
self.resources.limits.update({resource_name: value})
return self

def add_resource_request(self, resource_name, value):
def add_resource_request(self, resource_name, value) -> 'Container':
"""Add the resource request of the container.
Args:
Expand All @@ -244,28 +244,48 @@ def add_resource_request(self, resource_name, value):
self.resources.requests.update({resource_name: value})
return self

def set_memory_request(self, memory):
def set_memory_request(self, memory) -> 'Container':
"""Set memory request (minimum) for this operator.
Args:
memory: a string which can be a number or a number followed by one of
"E", "P", "T", "G", "M", "K".
"""

self._validate_memory_string(memory)
self._validate_size_string(memory)
return self.add_resource_request("memory", memory)

def set_memory_limit(self, memory):
def set_memory_limit(self, memory) -> 'Container':
"""Set memory limit (maximum) for this operator.
Args:
memory: a string which can be a number or a number followed by one of
"E", "P", "T", "G", "M", "K".
"""
self._validate_memory_string(memory)
self._validate_size_string(memory)
return self.add_resource_limit("memory", memory)

def set_cpu_request(self, cpu):
def set_ephemeral_storage_request(self, size) -> 'Container':
"""Set ephemeral-storage request (minimum) for this operator.
Args:
size: a string which can be a number or a number followed by one of
"E", "P", "T", "G", "M", "K".
"""
self._validate_size_string(size)
return self.add_resource_request("ephemeral-storage", size)

def set_ephemeral_storage_limit(self, size) -> 'Container':
"""Set ephemeral-storage request (maximum) for this operator.
Args:
size: a string which can be a number or a number followed by one of
"E", "P", "T", "G", "M", "K".
"""
self._validate_size_string(size)
return self.add_resource_limit("ephemeral-storage", size)

def set_cpu_request(self, cpu) -> 'Container':
"""Set cpu request (minimum) for this operator.
Args:
Expand All @@ -275,7 +295,7 @@ def set_cpu_request(self, cpu):
self._validate_cpu_string(cpu)
return self.add_resource_request("cpu", cpu)

def set_cpu_limit(self, cpu):
def set_cpu_limit(self, cpu) -> 'Container':
"""Set cpu limit (maximum) for this operator.
Args:
Expand All @@ -285,7 +305,7 @@ def set_cpu_limit(self, cpu):
self._validate_cpu_string(cpu)
return self.add_resource_limit("cpu", cpu)

def set_gpu_limit(self, gpu, vendor="nvidia"):
def set_gpu_limit(self, gpu, vendor="nvidia") -> 'Container':
"""Set gpu limit for the operator. This function add '<vendor>.com/gpu' into resource limit.
Note that there is no need to add GPU request. GPUs are only supposed to be specified in
the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/.
Expand All @@ -302,7 +322,7 @@ def set_gpu_limit(self, gpu, vendor="nvidia"):

return self.add_resource_limit("%s.com/gpu" % vendor, gpu)

def add_volume_mount(self, volume_mount):
def add_volume_mount(self, volume_mount) -> 'Container':
"""Add volume to the container
Args:
Expand All @@ -319,7 +339,7 @@ def add_volume_mount(self, volume_mount):
volume_mount)
return self

def add_volume_devices(self, volume_device):
def add_volume_devices(self, volume_device) -> 'Container':
"""
Add a block device to be used by the container.
Expand All @@ -337,7 +357,7 @@ def add_volume_devices(self, volume_device):
volume_device)
return self

def add_env_variable(self, env_variable):
def add_env_variable(self, env_variable) -> 'Container':
"""Add environment variable to the container.
Args:
Expand All @@ -353,7 +373,7 @@ def add_env_variable(self, env_variable):
self.env = create_and_append(self.env, env_variable)
return self

def add_env_from(self, env_from):
def add_env_from(self, env_from) -> 'Container':
"""Add a source to populate environment variables int the container.
Args:
Expand All @@ -369,7 +389,7 @@ def add_env_from(self, env_from):
self.env_from = create_and_append(self.env_from, env_from)
return self

def set_image_pull_policy(self, image_pull_policy):
def set_image_pull_policy(self, image_pull_policy) -> 'Container':
"""Set image pull policy for the container.
Args:
Expand All @@ -383,7 +403,7 @@ def set_image_pull_policy(self, image_pull_policy):
self.image_pull_policy = image_pull_policy
return self

def add_port(self, container_port):
def add_port(self, container_port) -> 'Container':
"""Add a container port to the container.
Args:
Expand All @@ -399,7 +419,7 @@ def add_port(self, container_port):
self.ports = create_and_append(self.ports, container_port)
return self

def set_security_context(self, security_context):
def set_security_context(self, security_context) -> 'Container':
"""Set security configuration to be applied on the container.
Args:
Expand All @@ -415,7 +435,7 @@ def set_security_context(self, security_context):
self.security_context = security_context
return self

def set_stdin(self, stdin=True):
def set_stdin(self, stdin=True) -> 'Container':
"""
Whether this container should allocate a buffer for stdin in the container
runtime. If this is not set, reads from stdin in the container will always
Expand All @@ -428,7 +448,7 @@ def set_stdin(self, stdin=True):
self.stdin = stdin
return self

def set_stdin_once(self, stdin_once=True):
def set_stdin_once(self, stdin_once=True) -> 'Container':
"""
Whether the container runtime should close the stdin channel after it has
been opened by a single attach. When stdin is true the stdin stream will
Expand All @@ -446,7 +466,7 @@ def set_stdin_once(self, stdin_once=True):
self.stdin_once = stdin_once
return self

def set_termination_message_path(self, termination_message_path):
def set_termination_message_path(self, termination_message_path) -> 'Container':
"""
Path at which the file to which the container's termination message will be
written is mounted into the container's filesystem. Message written is
Expand All @@ -460,7 +480,7 @@ def set_termination_message_path(self, termination_message_path):
self.termination_message_path = termination_message_path
return self

def set_termination_message_policy(self, termination_message_policy):
def set_termination_message_policy(self, termination_message_policy) -> 'Container':
"""
Indicate how the termination message should be populated. File will use the
contents of terminationMessagePath to populate the container status message
Expand All @@ -479,7 +499,7 @@ def set_termination_message_policy(self, termination_message_policy):
self.termination_message_policy = termination_message_policy
return self

def set_tty(self, tty=True):
def set_tty(self, tty: bool = True) -> 'Container':
"""
Whether this container should allocate a TTY for itself, also requires
'stdin' to be true.
Expand All @@ -491,7 +511,7 @@ def set_tty(self, tty=True):
self.tty = tty
return self

def set_readiness_probe(self, readiness_probe):
def set_readiness_probe(self, readiness_probe) -> 'Container':
"""
Set a readiness probe for the container.
Expand All @@ -508,7 +528,7 @@ def set_readiness_probe(self, readiness_probe):
self.readiness_probe = readiness_probe
return self

def set_liveness_probe(self, liveness_probe):
def set_liveness_probe(self, liveness_probe) -> 'Container':
"""
Set a liveness probe for the container.
Expand All @@ -525,7 +545,7 @@ def set_liveness_probe(self, liveness_probe):
self.liveness_probe = liveness_probe
return self

def set_lifecycle(self, lifecycle):
def set_lifecycle(self, lifecycle) -> 'Container':
"""
Setup a lifecycle config for the container.
Expand Down Expand Up @@ -1104,7 +1124,7 @@ def _decorated(*args, **kwargs):
name: _pipeline_param.PipelineParam(name, op_name=self.name)
for name in file_outputs.keys()
}

# Syntactic sugar: Add task.output attribute if the component has a single output.
# TODO: Currently the "MLPipeline UI Metadata" output is removed from outputs to preserve backwards compatibility.
# Maybe stop excluding it from outputs, but rather exclude it from unique_outputs.
Expand Down

0 comments on commit 0653e7c

Please sign in to comment.