Skip to content

Commit

Permalink
SDK: In preparation for the new feature ResourceOps (kubeflow#801)
Browse files Browse the repository at this point in the history
* Rename BaseOp.volumes to k8s_volumes
* Add cops attributes to Pipeline. This is a dict having all the
  ContainerOps of the pipeline.
* Set some processing in _op_to_template as ContainerOp specific

Signed-off-by: Ilias Katsakioris <elikatsis@arrikto.com>
  • Loading branch information
elikatsis committed Apr 15, 2019
1 parent 0182ab9 commit 349276c
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 32 deletions.
43 changes: 24 additions & 19 deletions sdk/python/kfp/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,18 @@ def _inputs_to_json(inputs_params: List[dsl.PipelineParam], _artifacts=None):
return {'parameters': parameters} if parameters else None


def _outputs_to_json(outputs: Dict[str, dsl.PipelineParam],
file_outputs: Dict[str, str],
def _outputs_to_json(op: BaseOp,
outputs: Dict[str, dsl.PipelineParam],
param_outputs: Dict[str, str],
output_artifacts: List[dict]):
"""Creates an argo `outputs` JSON obj."""
value_from_key = "path"
output_parameters = []
for param in outputs.values():
output_parameters.append({
'name': param.full_name,
'valueFrom': {
'path': file_outputs[param.name]
value_from_key: param_outputs[param.name]
}
})
output_parameters.sort(key=lambda x: x['name'])
Expand Down Expand Up @@ -176,29 +178,32 @@ def _op_to_template(op: BaseOp):
# replace all PipelineParams with template var strings
processed_op = _process_base_ops(op)

# default output artifacts
output_artifact_paths = OrderedDict()
output_artifact_paths.setdefault('mlpipeline-ui-metadata', '/mlpipeline-ui-metadata.json')
output_artifact_paths.setdefault('mlpipeline-metrics', '/mlpipeline-metrics.json')

output_artifacts = [
_build_conventional_artifact(name, path)
for name, path in output_artifact_paths.items()
]

# workflow template
template = {
'name': op.name,
'container': K8sHelper.convert_k8s_obj_to_json(op.container)
}
if isinstance(op, dsl.ContainerOp):
# default output artifacts
output_artifact_paths = OrderedDict()
output_artifact_paths.setdefault('mlpipeline-ui-metadata', '/mlpipeline-ui-metadata.json')
output_artifact_paths.setdefault('mlpipeline-metrics', '/mlpipeline-metrics.json')

output_artifacts = [
_build_conventional_artifact(name, path)
for name, path in output_artifact_paths.items()
]

# workflow template
template = {
'name': op.name,
'container': K8sHelper.convert_k8s_obj_to_json(op.container)
}

# inputs
inputs = _inputs_to_json(processed_op.inputs)
if inputs:
template['inputs'] = inputs

# outputs
template['outputs'] = _outputs_to_json(op.outputs, op.file_outputs,
if isinstance(op, dsl.ContainerOp):
param_outputs = op.file_outputs
template['outputs'] = _outputs_to_json(op, op.outputs, param_outputs,
output_artifacts)

# node selector
Expand Down
13 changes: 7 additions & 6 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ def _create_volumes(self, pipeline):
volumes = []
volume_name_set = set()
for op in pipeline.ops.values():
if op.volumes:
for v in op.volumes:
if op.k8s_volumes:
for v in op.k8s_volumes:
# Remove volume duplicates which have the same name
#TODO: check for duplicity based on the serialized volumes instead of just name.
if v['name'] not in volume_name_set:
Expand Down Expand Up @@ -601,8 +601,8 @@ def _compile(self, pipeline_func):
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default

# Sanitize operator names and param names
sanitized_ops = {}
for op in p.ops.values():
sanitized_cops = {}
for op in p.cops.values():
sanitized_name = K8sHelper.sanitize_k8s_name(op.name)
op.name = sanitized_name
for param in op.outputs.values():
Expand All @@ -619,8 +619,9 @@ def _compile(self, pipeline_func):
for key in op.file_outputs.keys():
sanitized_file_outputs[K8sHelper.sanitize_k8s_name(key)] = op.file_outputs[key]
op.file_outputs = sanitized_file_outputs
sanitized_ops[sanitized_name] = op
p.ops = sanitized_ops
sanitized_cops[sanitized_name] = op
p.cops = sanitized_cops
p.ops = dict(sanitized_cops)
workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow

Expand Down
7 changes: 3 additions & 4 deletions sdk/python/kfp/dsl/_base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ class BaseOp(object):
# Excludes `file_outputs` and `outputs` as they are handled separately
# in the compilation process to generate the DAGs and task io parameters.
attrs_with_pipelineparams = [
'node_selector', 'volumes', 'pod_annotations', 'pod_labels',
'node_selector', 'k8s_volumes', 'pod_annotations', 'pod_labels',
'num_retries', 'sidecars'
]

Expand Down Expand Up @@ -646,13 +646,12 @@ def __init__(self,
# actual name for argo workflow
self.name = _pipeline.Pipeline.get_default_pipeline().add_op(
self, is_exit_handler)

self.is_exit_handler = is_exit_handler

# TODO: proper k8s definitions so that `convert_k8s_obj_to_json` can be
# used? `io.argoproj.workflow.v1alpha1.Template` properties
self.node_selector = {}
self.volumes = []
self.k8s_volumes = []
self.pod_annotations = {}
self.pod_labels = {}
self.num_retries = 0
Expand Down Expand Up @@ -718,7 +717,7 @@ def add_volume(self, volume):
For detailed spec, check volume definition
https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py
"""
self.volumes.append(volume)
self.k8s_volumes.append(volume)
return self

def add_node_selector_constraint(self, label_name, value):
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/kfp/dsl/_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018 Google LLC
# Copyright 2018-2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
# limitations under the License.


from . import _base_op
from . import _container_op
from . import _ops_group
from ..components._naming import _make_name_unique_by_adding_index
Expand Down Expand Up @@ -116,6 +117,7 @@ def __init__(self, name: str):
"""
self.name = name
self.ops = {}
self.cops = {}
# Add the root group.
self.groups = [_ops_group.OpsGroup('pipeline', name=name)]
self.group_id = 0
Expand All @@ -132,7 +134,7 @@ def __enter__(self):
def __exit__(self, *args):
Pipeline._default_pipeline = None

def add_op(self, op: _container_op.ContainerOp, define_only: bool):
def add_op(self, op: _base_op.BaseOp, define_only: bool):
"""Add a new operator.
Args:
Expand All @@ -146,6 +148,8 @@ def add_op(self, op: _container_op.ContainerOp, define_only: bool):
op_name = _make_name_unique_by_adding_index(op.human_name, list(self.ops.keys()), ' ')

self.ops[op_name] = op
if isinstance(op, _container_op.ContainerOp):
self.cops[op_name] = op
if not define_only:
self.groups[-1].ops.append(op)

Expand Down
1 change: 0 additions & 1 deletion sdk/python/tests/dsl/container_op_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.


import warnings
import unittest
from kubernetes.client.models import V1EnvVar, V1VolumeMount

Expand Down

0 comments on commit 349276c

Please sign in to comment.