From 0653e7c766beafca02959f660077b2199ba9df1c Mon Sep 17 00:00:00 2001 From: Abhishek Vilas Munagekar Date: Sun, 20 Sep 2020 19:30:29 +0900 Subject: [PATCH] feat(sdk): adds support for ephemeral-storage in container-op (#4504) * add type annotation to container class * add support for ephemeral storage in container op --- sdk/python/kfp/dsl/_container_op.py | 84 ++++++++++++++++++----------- 1 file changed, 52 insertions(+), 32 deletions(-) diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 31d1cd98256..2f242cdecb4 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -50,7 +50,7 @@ def _wrapped(*args, **kwargs): def _create_getter_setter(prop): """Create a tuple of getter and setter methods for a property in `Container`.""" def _getter(self): - return getattr(self._container, prop) + return getattr(self._container, prop) def _setter(self, value): return setattr(self._container, prop, value) return _getter, _setter @@ -166,17 +166,17 @@ def __init__(self, image: str, command: List[str], args: List[str], super(Container, self).__init__( image=image, command=command, args=args, **kwargs) - def _validate_memory_string(self, memory_string): - """Validate a given string is valid for memory request or limit.""" + def _validate_size_string(self, size_string): + """Validate a given string is valid for memory/ephemeral-storage request or limit.""" - if isinstance(memory_string, _pipeline_param.PipelineParam): - if memory_string.value: - memory_string = memory_string.value + if isinstance(size_string, _pipeline_param.PipelineParam): + if size_string.value: + size_string = size_string.value else: return if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$', - memory_string) is None: + size_string) is None: raise ValueError( 'Invalid memory string. Should be an integer, or integer followed ' 'by one of "E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki"') @@ -218,7 +218,7 @@ def _validate_positive_number(self, str_value, param_name): if int_value <= 0: raise ValueError('{} must be positive integer.'.format(param_name)) - def add_resource_limit(self, resource_name, value): + def add_resource_limit(self, resource_name, value) -> 'Container': """Add the resource limit of the container. Args: @@ -231,7 +231,7 @@ def add_resource_limit(self, resource_name, value): self.resources.limits.update({resource_name: value}) return self - def add_resource_request(self, resource_name, value): + def add_resource_request(self, resource_name, value) -> 'Container': """Add the resource request of the container. Args: @@ -244,7 +244,7 @@ def add_resource_request(self, resource_name, value): self.resources.requests.update({resource_name: value}) return self - def set_memory_request(self, memory): + def set_memory_request(self, memory) -> 'Container': """Set memory request (minimum) for this operator. Args: @@ -252,20 +252,40 @@ def set_memory_request(self, memory): "E", "P", "T", "G", "M", "K". """ - self._validate_memory_string(memory) + self._validate_size_string(memory) return self.add_resource_request("memory", memory) - def set_memory_limit(self, memory): + def set_memory_limit(self, memory) -> 'Container': """Set memory limit (maximum) for this operator. Args: memory: a string which can be a number or a number followed by one of "E", "P", "T", "G", "M", "K". """ - self._validate_memory_string(memory) + self._validate_size_string(memory) return self.add_resource_limit("memory", memory) - def set_cpu_request(self, cpu): + def set_ephemeral_storage_request(self, size) -> 'Container': + """Set ephemeral-storage request (minimum) for this operator. + + Args: + size: a string which can be a number or a number followed by one of + "E", "P", "T", "G", "M", "K". + """ + self._validate_size_string(size) + return self.add_resource_request("ephemeral-storage", size) + + def set_ephemeral_storage_limit(self, size) -> 'Container': + """Set ephemeral-storage request (maximum) for this operator. + + Args: + size: a string which can be a number or a number followed by one of + "E", "P", "T", "G", "M", "K". + """ + self._validate_size_string(size) + return self.add_resource_limit("ephemeral-storage", size) + + def set_cpu_request(self, cpu) -> 'Container': """Set cpu request (minimum) for this operator. Args: @@ -275,7 +295,7 @@ def set_cpu_request(self, cpu): self._validate_cpu_string(cpu) return self.add_resource_request("cpu", cpu) - def set_cpu_limit(self, cpu): + def set_cpu_limit(self, cpu) -> 'Container': """Set cpu limit (maximum) for this operator. Args: @@ -285,7 +305,7 @@ def set_cpu_limit(self, cpu): self._validate_cpu_string(cpu) return self.add_resource_limit("cpu", cpu) - def set_gpu_limit(self, gpu, vendor="nvidia"): + def set_gpu_limit(self, gpu, vendor="nvidia") -> 'Container': """Set gpu limit for the operator. This function add '.com/gpu' into resource limit. Note that there is no need to add GPU request. GPUs are only supposed to be specified in the limits section. See https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/. @@ -302,7 +322,7 @@ def set_gpu_limit(self, gpu, vendor="nvidia"): return self.add_resource_limit("%s.com/gpu" % vendor, gpu) - def add_volume_mount(self, volume_mount): + def add_volume_mount(self, volume_mount) -> 'Container': """Add volume to the container Args: @@ -319,7 +339,7 @@ def add_volume_mount(self, volume_mount): volume_mount) return self - def add_volume_devices(self, volume_device): + def add_volume_devices(self, volume_device) -> 'Container': """ Add a block device to be used by the container. @@ -337,7 +357,7 @@ def add_volume_devices(self, volume_device): volume_device) return self - def add_env_variable(self, env_variable): + def add_env_variable(self, env_variable) -> 'Container': """Add environment variable to the container. Args: @@ -353,7 +373,7 @@ def add_env_variable(self, env_variable): self.env = create_and_append(self.env, env_variable) return self - def add_env_from(self, env_from): + def add_env_from(self, env_from) -> 'Container': """Add a source to populate environment variables int the container. Args: @@ -369,7 +389,7 @@ def add_env_from(self, env_from): self.env_from = create_and_append(self.env_from, env_from) return self - def set_image_pull_policy(self, image_pull_policy): + def set_image_pull_policy(self, image_pull_policy) -> 'Container': """Set image pull policy for the container. Args: @@ -383,7 +403,7 @@ def set_image_pull_policy(self, image_pull_policy): self.image_pull_policy = image_pull_policy return self - def add_port(self, container_port): + def add_port(self, container_port) -> 'Container': """Add a container port to the container. Args: @@ -399,7 +419,7 @@ def add_port(self, container_port): self.ports = create_and_append(self.ports, container_port) return self - def set_security_context(self, security_context): + def set_security_context(self, security_context) -> 'Container': """Set security configuration to be applied on the container. Args: @@ -415,7 +435,7 @@ def set_security_context(self, security_context): self.security_context = security_context return self - def set_stdin(self, stdin=True): + def set_stdin(self, stdin=True) -> 'Container': """ Whether this container should allocate a buffer for stdin in the container runtime. If this is not set, reads from stdin in the container will always @@ -428,7 +448,7 @@ def set_stdin(self, stdin=True): self.stdin = stdin return self - def set_stdin_once(self, stdin_once=True): + def set_stdin_once(self, stdin_once=True) -> 'Container': """ Whether the container runtime should close the stdin channel after it has been opened by a single attach. When stdin is true the stdin stream will @@ -446,7 +466,7 @@ def set_stdin_once(self, stdin_once=True): self.stdin_once = stdin_once return self - def set_termination_message_path(self, termination_message_path): + def set_termination_message_path(self, termination_message_path) -> 'Container': """ Path at which the file to which the container's termination message will be written is mounted into the container's filesystem. Message written is @@ -460,7 +480,7 @@ def set_termination_message_path(self, termination_message_path): self.termination_message_path = termination_message_path return self - def set_termination_message_policy(self, termination_message_policy): + def set_termination_message_policy(self, termination_message_policy) -> 'Container': """ Indicate how the termination message should be populated. File will use the contents of terminationMessagePath to populate the container status message @@ -479,7 +499,7 @@ def set_termination_message_policy(self, termination_message_policy): self.termination_message_policy = termination_message_policy return self - def set_tty(self, tty=True): + def set_tty(self, tty: bool = True) -> 'Container': """ Whether this container should allocate a TTY for itself, also requires 'stdin' to be true. @@ -491,7 +511,7 @@ def set_tty(self, tty=True): self.tty = tty return self - def set_readiness_probe(self, readiness_probe): + def set_readiness_probe(self, readiness_probe) -> 'Container': """ Set a readiness probe for the container. @@ -508,7 +528,7 @@ def set_readiness_probe(self, readiness_probe): self.readiness_probe = readiness_probe return self - def set_liveness_probe(self, liveness_probe): + def set_liveness_probe(self, liveness_probe) -> 'Container': """ Set a liveness probe for the container. @@ -525,7 +545,7 @@ def set_liveness_probe(self, liveness_probe): self.liveness_probe = liveness_probe return self - def set_lifecycle(self, lifecycle): + def set_lifecycle(self, lifecycle) -> 'Container': """ Setup a lifecycle config for the container. @@ -1104,7 +1124,7 @@ def _decorated(*args, **kwargs): name: _pipeline_param.PipelineParam(name, op_name=self.name) for name in file_outputs.keys() } - + # Syntactic sugar: Add task.output attribute if the component has a single output. # TODO: Currently the "MLPipeline UI Metadata" output is removed from outputs to preserve backwards compatibility. # Maybe stop excluding it from outputs, but rather exclude it from unique_outputs.