From 8bc464409b344aa036ab5877d078a63d470cf3fd Mon Sep 17 00:00:00 2001 From: IronPan Date: Mon, 22 Jul 2019 20:40:54 -0700 Subject: [PATCH] add init container for container op (#1650) * add init container * update test * update tests * address comments --- sdk/python/kfp/compiler/_op_to_template.py | 4 + sdk/python/kfp/dsl/__init__.py | 2 +- sdk/python/kfp/dsl/_container_op.py | 124 ++++++++++++++------ sdk/python/tests/compiler/compiler_tests.py | 42 +++++-- sdk/python/tests/dsl/container_op_tests.py | 7 +- 5 files changed, 132 insertions(+), 47 deletions(-) diff --git a/sdk/python/kfp/compiler/_op_to_template.py b/sdk/python/kfp/compiler/_op_to_template.py index 564529774cd..ae7bf7a5c9e 100644 --- a/sdk/python/kfp/compiler/_op_to_template.py +++ b/sdk/python/kfp/compiler/_op_to_template.py @@ -249,6 +249,10 @@ def _op_to_template(op: BaseOp): if processed_op.timeout: template['activeDeadlineSeconds'] = processed_op.timeout + # initContainers + if processed_op.init_containers: + template['initContainers'] = processed_op.init_containers + # sidecars if processed_op.sidecars: template['sidecars'] = processed_op.sidecars diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index ae995e29873..de281992bc5 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -15,7 +15,7 @@ from ._pipeline_param import PipelineParam, match_serialized_pipelineparam from ._pipeline import Pipeline, pipeline, get_pipeline_conf -from ._container_op import ContainerOp, Sidecar +from ._container_op import ContainerOp, UserContainer, Sidecar from ._resource_op import ResourceOp from ._volume_op import ( VolumeOp, VOLUME_MODE_RWO, VOLUME_MODE_RWM, VOLUME_MODE_ROM diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 73ee443bf4d..9f9c43e4e86 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -541,25 +541,25 @@ def set_lifecycle(self, lifecycle): return self -class Sidecar(Container): +class UserContainer(Container): """ - Represents an argo workflow sidecar (io.argoproj.workflow.v1alpha1.Sidecar) - to be used in `sidecars` property in argo's workflow template - (io.argoproj.workflow.v1alpha1.Template). + Represents an argo workflow UserContainer (io.argoproj.workflow.v1alpha1.UserContainer) + to be used in `UserContainer` property in argo's workflow template + (io.argoproj.workflow.v1alpha1.Template). - `Sidecar` inherits from `Container` class with an addition of `mirror_volume_mounts` + `UserContainer` inherits from `Container` class with an addition of `mirror_volume_mounts` attribute (`mirrorVolumeMounts` property). See https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json Example - from kfp.dsl import ContainerOp, Sidecar + from kfp.dsl import ContainerOp, UserContainer - # creates a `ContainerOp` and adds a redis `Sidecar` + # creates a `ContainerOp` and adds a redis init container op = (ContainerOp(name='foo-op', image='busybox:latest') - .add_sidecar( - Sidecar(name='redis', image='redis:alpine'))) + .add_initContainer( + UserContainer(name='redis', image='redis:alpine'))) """ """ @@ -569,7 +569,7 @@ class Sidecar(Container): attribute_map (dict): The key is attribute name and the value is json key in definition. """ - # adds `mirror_volume_mounts` to `Sidecar` swagger definition + # adds `mirror_volume_mounts` to `UserContainer` swagger definition # NOTE inherits definition from `V1Container` rather than `Container` # because `Container` has no `name` property. swagger_types = dict( @@ -579,25 +579,25 @@ class Sidecar(Container): **V1Container.attribute_map, mirror_volume_mounts='mirrorVolumeMounts') def __init__(self, - name: str, - image: str, - command: StringOrStringList = None, - args: StringOrStringList = None, - mirror_volume_mounts: bool = None, - **kwargs): - """Creates a new instance of `Sidecar`. - + name: str, + image: str, + command: StringOrStringList = None, + args: StringOrStringList = None, + mirror_volume_mounts: bool = None, + **kwargs): + """Creates a new instance of `UserContainer`. + Args: - name {str}: unique name for the sidecar container - image {str}: image to use for the sidecar container, e.g. redis:alpine + name {str}: unique name for the user container + image {str}: image to use for the user container, e.g. redis:alpine command {StringOrStringList}: entrypoint array. Not executed within a shell. - args {StringOrStringList}: arguments to the entrypoint. - mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same - volumes specified in the main container to the sidecar (including artifacts), - at the same mountPaths. This enables dind daemon to partially see the same - filesystem as the main container in order to use features such as docker + args {StringOrStringList}: arguments to the entrypoint. + mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same + volumes specified in the main container to the container (including artifacts), + at the same mountPaths. This enables dind daemon to partially see the same + filesystem as the main container in order to use features such as docker volume binding - **kwargs: keyword arguments available for `Container` + **kwargs: keyword arguments available for `Container` """ super().__init__( @@ -611,12 +611,12 @@ def __init__(self, def set_mirror_volume_mounts(self, mirror_volume_mounts=True): """ - Setting mirrorVolumeMounts to true will mount the same volumes specified - in the main container to the sidecar (including artifacts), at the same - mountPaths. This enables dind daemon to partially see the same filesystem - as the main container in order to use features such as docker volume + Setting mirrorVolumeMounts to true will mount the same volumes specified + in the main container to the container (including artifacts), at the same + mountPaths. This enables dind daemon to partially see the same filesystem + as the main container in order to use features such as docker volume binding. - + Args: mirror_volume_mounts: boolean flag """ @@ -626,10 +626,43 @@ def set_mirror_volume_mounts(self, mirror_volume_mounts=True): @property def inputs(self): - """A list of PipelineParam found in the Sidecar object.""" + """A list of PipelineParam found in the UserContainer object.""" return _pipeline_param.extract_pipelineparams_from_any(self) +class Sidecar(UserContainer): + + def __init__(self, + name: str, + image: str, + command: StringOrStringList = None, + args: StringOrStringList = None, + mirror_volume_mounts: bool = None, + **kwargs): + """Creates a new instance of `Sidecar`. + + Args: + name {str}: unique name for the sidecar container + image {str}: image to use for the sidecar container, e.g. redis:alpine + command {StringOrStringList}: entrypoint array. Not executed within a shell. + args {StringOrStringList}: arguments to the entrypoint. + mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same + volumes specified in the main container to the sidecar (including artifacts), + at the same mountPaths. This enables dind daemon to partially see the same + filesystem as the main container in order to use features such as docker + volume binding + **kwargs: keyword arguments available for `Container` + + """ + super().__init__( + name=name, + image=image, + command=command, + args=args, + mirror_volume_mounts=mirror_volume_mounts, + **kwargs) + + def _make_hash_based_id_for_op(op): # Generating a unique ID for Op. For class instances, the hash is the object's memory address which is unique. return op.human_name + ' ' + hex(2**63 + hash(op))[2:] @@ -647,11 +680,12 @@ class BaseOp(object): # in the compilation process to generate the DAGs and task io parameters. attrs_with_pipelineparams = [ 'node_selector', 'volumes', 'pod_annotations', 'pod_labels', - 'num_retries', 'sidecars', 'tolerations' + 'num_retries', 'init_containers', 'sidecars', 'tolerations' ] def __init__(self, name: str, + init_containers: List[UserContainer] = None, sidecars: List[Sidecar] = None, is_exit_handler: bool = False): """Create a new instance of BaseOp @@ -659,7 +693,9 @@ def __init__(self, Args: name: the name of the op. It does not have to be unique within a pipeline because the pipeline will generates a unique new name in case of conflicts. - sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy + init_containers: the list of `InitContainer` objects describing the InitContainer + to deploy before the `main` container. + sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy together with the `main` container. is_exit_handler: Whether it is used as an exit handler. """ @@ -689,6 +725,7 @@ def __init__(self, self.pod_labels = {} self.num_retries = 0 self.timeout = 0 + self.init_containers = init_containers or [] self.sidecars = sidecars or [] # attributes specific to `BaseOp` @@ -818,6 +855,16 @@ def set_timeout(self, seconds: int): self.timeout = seconds return self + def add_init_container(self, init_container: UserContainer): + """Add a init container to the Op. + + Args: + init_container: InitContainer object. + """ + + self.init_containers.append(init_container) + return self + def add_sidecar(self, sidecar: Sidecar): """Add a sidecar to the Op. @@ -864,6 +911,8 @@ def foo_pipeline(tag: str, pull_image_policy: str): # any attributes can be parameterized (both serialized string or actual PipelineParam) op = dsl.ContainerOp(name='foo', image='busybox:%s' % tag, + # pass in init_container list + init_containers=[dsl.InitContainer('print', 'busybox:latest', command='echo "hello"')], # pass in sidecars list sidecars=[dsl.Sidecar('print', 'busybox:latest', command='echo "hello"')], # pass in k8s container kwargs @@ -890,6 +939,7 @@ def __init__(self, image: str, command: StringOrStringList = None, arguments: StringOrStringList = None, + init_containers: List[UserContainer] = None, sidecars: List[Sidecar] = None, container_kwargs: Dict = None, file_outputs: Dict[str, str] = None, @@ -909,7 +959,9 @@ def __init__(self, arguments: the arguments of the command. The command can include "%s" and supply a PipelineParam as the string replacement. For example, ('echo %s' % input_param). At container run time the argument will be 'echo param_value'. - sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy + init_containers: the list of `InitContainer` objects describing the InitContainer + to deploy before the `main` container. + sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy together with the `main` container. container_kwargs: the dict of additional keyword arguments to pass to the op's `Container` definition. @@ -929,7 +981,7 @@ def __init__(self, E.g {"/my/path": vol, "/mnt": other_op.pvolumes["/output"]}. """ - super().__init__(name=name, sidecars=sidecars, is_exit_handler=is_exit_handler) + super().__init__(name=name, init_containers=init_containers, sidecars=sidecars, is_exit_handler=is_exit_handler) self.attrs_with_pipelineparams = BaseOp.attrs_with_pipelineparams + ['_container', 'artifact_location'] #Copying the BaseOp class variable! # convert to list if not a list diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index b8c9da359cc..6c05a572334 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -527,21 +527,22 @@ def some_pipeline(): template = workflow_dict['spec']['templates'][0] self.assertEqual(template['metadata']['annotations']['pipelines.kubeflow.org/task_display_name'], 'Custom name') + def test_op_transformers(self): def some_op(): - return dsl.ContainerOp( - name='sleep', - image='busybox', - command=['sleep 1'], - ) + return dsl.ContainerOp( + name='sleep', + image='busybox', + command=['sleep 1'], + ) @dsl.pipeline(name='some_pipeline', description='') def some_pipeline(): - task1 = some_op() - task2 = some_op() - task3 = some_op() + task1 = some_op() + task2 = some_op() + task3 = some_op() - dsl.get_pipeline_conf().op_transformers.append(lambda op: op.set_retry(5)) + dsl.get_pipeline_conf().op_transformers.append(lambda op: op.set_retry(5)) workflow_dict = compiler.Compiler()._compile(some_pipeline) for template in workflow_dict['spec']['templates']: @@ -551,3 +552,26 @@ def some_pipeline(): def test_add_pod_env(self): self._test_py_compile_yaml('add_pod_env') + + def test_init_container(self): + echo = dsl.UserContainer( + name='echo', + image='alpine:latest', + command=['echo', 'bye']) + + @dsl.pipeline(name='InitContainer', description='A pipeline with init container.') + def init_container_pipeline(): + dsl.ContainerOp( + name='hello', + image='alpine:latest', + command=['echo', 'hello'], + init_containers=[echo]) + + workflow_dict = compiler.Compiler()._compile(init_container_pipeline) + for template in workflow_dict['spec']['templates']: + init_containers = template.get('initContainers', None) + if init_containers: + self.assertEqual(len(init_containers),1) + init_container = init_containers[0] + self.assertEqual(init_container, {'image':'alpine:latest', 'command': ['echo', 'bye'], 'name': 'echo'}) + diff --git a/sdk/python/tests/dsl/container_op_tests.py b/sdk/python/tests/dsl/container_op_tests.py index ce6bb813a9c..88a951e0424 100644 --- a/sdk/python/tests/dsl/container_op_tests.py +++ b/sdk/python/tests/dsl/container_op_tests.py @@ -16,7 +16,7 @@ import unittest from kubernetes.client.models import V1EnvVar, V1VolumeMount -from kfp.dsl import Pipeline, PipelineParam, ContainerOp, Sidecar +from kfp.dsl import Pipeline, PipelineParam, ContainerOp, UserContainer, Sidecar class TestContainerOp(unittest.TestCase): @@ -27,9 +27,12 @@ def test_basic(self): param2 = PipelineParam('param2') op1 = (ContainerOp(name='op1', image='image', arguments=['%s hello %s %s' % (param1, param2, param1)], + init_containers=[UserContainer(name='initcontainer0', image='initimage0')], sidecars=[Sidecar(name='sidecar0', image='image0')], container_kwargs={'env': [V1EnvVar(name='env1', value='value1')]}, file_outputs={'out1': '/tmp/b'}) + .add_init_container(UserContainer(name='initcontainer1', image='initimage1')) + .add_init_container(UserContainer(name='initcontainer2', image='initimage2')) .add_sidecar(Sidecar(name='sidecar1', image='image1')) .add_sidecar(Sidecar(name='sidecar2', image='image2'))) @@ -37,6 +40,8 @@ def test_basic(self): self.assertCountEqual(list(op1.outputs.keys()), ['out1']) self.assertCountEqual([x.op_name for x in op1.outputs.values()], [op1.name]) self.assertEqual(op1.output.name, 'out1') + self.assertCountEqual([init_container.name for init_container in op1.init_containers], ['initcontainer0', 'initcontainer1', 'initcontainer2']) + self.assertCountEqual([init_container.image for init_container in op1.init_containers], ['initimage0', 'initimage1', 'initimage2']) self.assertCountEqual([sidecar.name for sidecar in op1.sidecars], ['sidecar0', 'sidecar1', 'sidecar2']) self.assertCountEqual([sidecar.image for sidecar in op1.sidecars], ['image0', 'image1', 'image2']) self.assertCountEqual([env.name for env in op1.container.env], ['env1'])