Skip to content

Commit

Permalink
add init container for container op (#1650)
Browse files Browse the repository at this point in the history
* add init container

* update test

* update tests

* address comments
  • Loading branch information
IronPan authored and k8s-ci-robot committed Jul 23, 2019
1 parent 88244a0 commit 8bc4644
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 47 deletions.
4 changes: 4 additions & 0 deletions sdk/python/kfp/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ def _op_to_template(op: BaseOp):
if processed_op.timeout:
template['activeDeadlineSeconds'] = processed_op.timeout

# initContainers
if processed_op.init_containers:
template['initContainers'] = processed_op.init_containers

# sidecars
if processed_op.sidecars:
template['sidecars'] = processed_op.sidecars
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from ._pipeline_param import PipelineParam, match_serialized_pipelineparam
from ._pipeline import Pipeline, pipeline, get_pipeline_conf
from ._container_op import ContainerOp, Sidecar
from ._container_op import ContainerOp, UserContainer, Sidecar
from ._resource_op import ResourceOp
from ._volume_op import (
VolumeOp, VOLUME_MODE_RWO, VOLUME_MODE_RWM, VOLUME_MODE_ROM
Expand Down
124 changes: 88 additions & 36 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,25 +541,25 @@ def set_lifecycle(self, lifecycle):
return self


class Sidecar(Container):
class UserContainer(Container):
"""
Represents an argo workflow sidecar (io.argoproj.workflow.v1alpha1.Sidecar)
to be used in `sidecars` property in argo's workflow template
(io.argoproj.workflow.v1alpha1.Template).
Represents an argo workflow UserContainer (io.argoproj.workflow.v1alpha1.UserContainer)
to be used in `UserContainer` property in argo's workflow template
(io.argoproj.workflow.v1alpha1.Template).
`Sidecar` inherits from `Container` class with an addition of `mirror_volume_mounts`
`UserContainer` inherits from `Container` class with an addition of `mirror_volume_mounts`
attribute (`mirrorVolumeMounts` property).
See https://github.com/argoproj/argo/blob/master/api/openapi-spec/swagger.json
Example
from kfp.dsl import ContainerOp, Sidecar
from kfp.dsl import ContainerOp, UserContainer
# creates a `ContainerOp` and adds a redis `Sidecar`
# creates a `ContainerOp` and adds a redis init container
op = (ContainerOp(name='foo-op', image='busybox:latest')
.add_sidecar(
Sidecar(name='redis', image='redis:alpine')))
.add_initContainer(
UserContainer(name='redis', image='redis:alpine')))
"""
"""
Expand All @@ -569,7 +569,7 @@ class Sidecar(Container):
attribute_map (dict): The key is attribute name
and the value is json key in definition.
"""
# adds `mirror_volume_mounts` to `Sidecar` swagger definition
# adds `mirror_volume_mounts` to `UserContainer` swagger definition
# NOTE inherits definition from `V1Container` rather than `Container`
# because `Container` has no `name` property.
swagger_types = dict(
Expand All @@ -579,25 +579,25 @@ class Sidecar(Container):
**V1Container.attribute_map, mirror_volume_mounts='mirrorVolumeMounts')

def __init__(self,
name: str,
image: str,
command: StringOrStringList = None,
args: StringOrStringList = None,
mirror_volume_mounts: bool = None,
**kwargs):
"""Creates a new instance of `Sidecar`.
name: str,
image: str,
command: StringOrStringList = None,
args: StringOrStringList = None,
mirror_volume_mounts: bool = None,
**kwargs):
"""Creates a new instance of `UserContainer`.
Args:
name {str}: unique name for the sidecar container
image {str}: image to use for the sidecar container, e.g. redis:alpine
name {str}: unique name for the user container
image {str}: image to use for the user container, e.g. redis:alpine
command {StringOrStringList}: entrypoint array. Not executed within a shell.
args {StringOrStringList}: arguments to the entrypoint.
mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same
volumes specified in the main container to the sidecar (including artifacts),
at the same mountPaths. This enables dind daemon to partially see the same
filesystem as the main container in order to use features such as docker
args {StringOrStringList}: arguments to the entrypoint.
mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same
volumes specified in the main container to the container (including artifacts),
at the same mountPaths. This enables dind daemon to partially see the same
filesystem as the main container in order to use features such as docker
volume binding
**kwargs: keyword arguments available for `Container`
**kwargs: keyword arguments available for `Container`
"""
super().__init__(
Expand All @@ -611,12 +611,12 @@ def __init__(self,

def set_mirror_volume_mounts(self, mirror_volume_mounts=True):
"""
Setting mirrorVolumeMounts to true will mount the same volumes specified
in the main container to the sidecar (including artifacts), at the same
mountPaths. This enables dind daemon to partially see the same filesystem
as the main container in order to use features such as docker volume
Setting mirrorVolumeMounts to true will mount the same volumes specified
in the main container to the container (including artifacts), at the same
mountPaths. This enables dind daemon to partially see the same filesystem
as the main container in order to use features such as docker volume
binding.
Args:
mirror_volume_mounts: boolean flag
"""
Expand All @@ -626,10 +626,43 @@ def set_mirror_volume_mounts(self, mirror_volume_mounts=True):

@property
def inputs(self):
"""A list of PipelineParam found in the Sidecar object."""
"""A list of PipelineParam found in the UserContainer object."""
return _pipeline_param.extract_pipelineparams_from_any(self)


class Sidecar(UserContainer):

def __init__(self,
name: str,
image: str,
command: StringOrStringList = None,
args: StringOrStringList = None,
mirror_volume_mounts: bool = None,
**kwargs):
"""Creates a new instance of `Sidecar`.
Args:
name {str}: unique name for the sidecar container
image {str}: image to use for the sidecar container, e.g. redis:alpine
command {StringOrStringList}: entrypoint array. Not executed within a shell.
args {StringOrStringList}: arguments to the entrypoint.
mirror_volume_mounts {bool}: MirrorVolumeMounts will mount the same
volumes specified in the main container to the sidecar (including artifacts),
at the same mountPaths. This enables dind daemon to partially see the same
filesystem as the main container in order to use features such as docker
volume binding
**kwargs: keyword arguments available for `Container`
"""
super().__init__(
name=name,
image=image,
command=command,
args=args,
mirror_volume_mounts=mirror_volume_mounts,
**kwargs)


def _make_hash_based_id_for_op(op):
# Generating a unique ID for Op. For class instances, the hash is the object's memory address which is unique.
return op.human_name + ' ' + hex(2**63 + hash(op))[2:]
Expand All @@ -647,19 +680,22 @@ class BaseOp(object):
# in the compilation process to generate the DAGs and task io parameters.
attrs_with_pipelineparams = [
'node_selector', 'volumes', 'pod_annotations', 'pod_labels',
'num_retries', 'sidecars', 'tolerations'
'num_retries', 'init_containers', 'sidecars', 'tolerations'
]

def __init__(self,
name: str,
init_containers: List[UserContainer] = None,
sidecars: List[Sidecar] = None,
is_exit_handler: bool = False):
"""Create a new instance of BaseOp
Args:
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy
init_containers: the list of `InitContainer` objects describing the InitContainer
to deploy before the `main` container.
sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy
together with the `main` container.
is_exit_handler: Whether it is used as an exit handler.
"""
Expand Down Expand Up @@ -689,6 +725,7 @@ def __init__(self,
self.pod_labels = {}
self.num_retries = 0
self.timeout = 0
self.init_containers = init_containers or []
self.sidecars = sidecars or []

# attributes specific to `BaseOp`
Expand Down Expand Up @@ -818,6 +855,16 @@ def set_timeout(self, seconds: int):
self.timeout = seconds
return self

def add_init_container(self, init_container: UserContainer):
"""Add a init container to the Op.
Args:
init_container: InitContainer object.
"""

self.init_containers.append(init_container)
return self

def add_sidecar(self, sidecar: Sidecar):
"""Add a sidecar to the Op.
Expand Down Expand Up @@ -864,6 +911,8 @@ def foo_pipeline(tag: str, pull_image_policy: str):
# any attributes can be parameterized (both serialized string or actual PipelineParam)
op = dsl.ContainerOp(name='foo',
image='busybox:%s' % tag,
# pass in init_container list
init_containers=[dsl.InitContainer('print', 'busybox:latest', command='echo "hello"')],
# pass in sidecars list
sidecars=[dsl.Sidecar('print', 'busybox:latest', command='echo "hello"')],
# pass in k8s container kwargs
Expand All @@ -890,6 +939,7 @@ def __init__(self,
image: str,
command: StringOrStringList = None,
arguments: StringOrStringList = None,
init_containers: List[UserContainer] = None,
sidecars: List[Sidecar] = None,
container_kwargs: Dict = None,
file_outputs: Dict[str, str] = None,
Expand All @@ -909,7 +959,9 @@ def __init__(self,
arguments: the arguments of the command. The command can include "%s" and supply
a PipelineParam as the string replacement. For example, ('echo %s' % input_param).
At container run time the argument will be 'echo param_value'.
sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy
init_containers: the list of `InitContainer` objects describing the InitContainer
to deploy before the `main` container.
sidecars: the list of `Sidecar` objects describing the sidecar containers to deploy
together with the `main` container.
container_kwargs: the dict of additional keyword arguments to pass to the
op's `Container` definition.
Expand All @@ -929,7 +981,7 @@ def __init__(self,
E.g {"/my/path": vol, "/mnt": other_op.pvolumes["/output"]}.
"""

super().__init__(name=name, sidecars=sidecars, is_exit_handler=is_exit_handler)
super().__init__(name=name, init_containers=init_containers, sidecars=sidecars, is_exit_handler=is_exit_handler)
self.attrs_with_pipelineparams = BaseOp.attrs_with_pipelineparams + ['_container', 'artifact_location'] #Copying the BaseOp class variable!

# convert to list if not a list
Expand Down
42 changes: 33 additions & 9 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,21 +527,22 @@ def some_pipeline():
template = workflow_dict['spec']['templates'][0]
self.assertEqual(template['metadata']['annotations']['pipelines.kubeflow.org/task_display_name'], 'Custom name')


def test_op_transformers(self):
def some_op():
return dsl.ContainerOp(
name='sleep',
image='busybox',
command=['sleep 1'],
)
return dsl.ContainerOp(
name='sleep',
image='busybox',
command=['sleep 1'],
)

@dsl.pipeline(name='some_pipeline', description='')
def some_pipeline():
task1 = some_op()
task2 = some_op()
task3 = some_op()
task1 = some_op()
task2 = some_op()
task3 = some_op()

dsl.get_pipeline_conf().op_transformers.append(lambda op: op.set_retry(5))
dsl.get_pipeline_conf().op_transformers.append(lambda op: op.set_retry(5))

workflow_dict = compiler.Compiler()._compile(some_pipeline)
for template in workflow_dict['spec']['templates']:
Expand All @@ -551,3 +552,26 @@ def some_pipeline():

def test_add_pod_env(self):
self._test_py_compile_yaml('add_pod_env')

def test_init_container(self):
echo = dsl.UserContainer(
name='echo',
image='alpine:latest',
command=['echo', 'bye'])

@dsl.pipeline(name='InitContainer', description='A pipeline with init container.')
def init_container_pipeline():
dsl.ContainerOp(
name='hello',
image='alpine:latest',
command=['echo', 'hello'],
init_containers=[echo])

workflow_dict = compiler.Compiler()._compile(init_container_pipeline)
for template in workflow_dict['spec']['templates']:
init_containers = template.get('initContainers', None)
if init_containers:
self.assertEqual(len(init_containers),1)
init_container = init_containers[0]
self.assertEqual(init_container, {'image':'alpine:latest', 'command': ['echo', 'bye'], 'name': 'echo'})

7 changes: 6 additions & 1 deletion sdk/python/tests/dsl/container_op_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import unittest
from kubernetes.client.models import V1EnvVar, V1VolumeMount

from kfp.dsl import Pipeline, PipelineParam, ContainerOp, Sidecar
from kfp.dsl import Pipeline, PipelineParam, ContainerOp, UserContainer, Sidecar


class TestContainerOp(unittest.TestCase):
Expand All @@ -27,16 +27,21 @@ def test_basic(self):
param2 = PipelineParam('param2')
op1 = (ContainerOp(name='op1', image='image',
arguments=['%s hello %s %s' % (param1, param2, param1)],
init_containers=[UserContainer(name='initcontainer0', image='initimage0')],
sidecars=[Sidecar(name='sidecar0', image='image0')],
container_kwargs={'env': [V1EnvVar(name='env1', value='value1')]},
file_outputs={'out1': '/tmp/b'})
.add_init_container(UserContainer(name='initcontainer1', image='initimage1'))
.add_init_container(UserContainer(name='initcontainer2', image='initimage2'))
.add_sidecar(Sidecar(name='sidecar1', image='image1'))
.add_sidecar(Sidecar(name='sidecar2', image='image2')))

self.assertCountEqual([x.name for x in op1.inputs], ['param1', 'param2'])
self.assertCountEqual(list(op1.outputs.keys()), ['out1'])
self.assertCountEqual([x.op_name for x in op1.outputs.values()], [op1.name])
self.assertEqual(op1.output.name, 'out1')
self.assertCountEqual([init_container.name for init_container in op1.init_containers], ['initcontainer0', 'initcontainer1', 'initcontainer2'])
self.assertCountEqual([init_container.image for init_container in op1.init_containers], ['initimage0', 'initimage1', 'initimage2'])
self.assertCountEqual([sidecar.name for sidecar in op1.sidecars], ['sidecar0', 'sidecar1', 'sidecar2'])
self.assertCountEqual([sidecar.image for sidecar in op1.sidecars], ['image0', 'image1', 'image2'])
self.assertCountEqual([env.name for env in op1.container.env], ['env1'])
Expand Down

0 comments on commit 8bc4644

Please sign in to comment.