Skip to content

Commit

Permalink
[Backend] Cache - Add cache_enabled label for cache filtering (kubefl…
Browse files Browse the repository at this point in the history
…ow#3352)

* Initial execution cache

This commit adds initial execution cache service. Including http service
and execution key generation.

* fix master

* Add cache enabled annotation to pod annotaion for cache filtering

* fix go.sum

* Add cache disable annotation value for future use

* Rename annotation key to cache qualified

* revert cache_qualified to cache_enabled

* Fix code comment

* Change cache_enabled annotation to label

* Add value check

* Read cache_enabled flag from config

* Add comments on set template labels
  • Loading branch information
rui5i authored and Jeffwan committed Dec 9, 2020
1 parent f9874e4 commit 0346aff
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 6 deletions.
5 changes: 5 additions & 0 deletions backend/src/apiserver/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
const (
MultiUserMode string = "MULTIUSER"
PodNamespace string = "POD_NAMESPACE"
CacheEnabled string = "CacheEnabled"
)

func GetStringConfig(configName string) string {
Expand Down Expand Up @@ -82,3 +83,7 @@ func GetBoolFromStringWithDefault(value string, defaultValue bool) bool {
}
return boolVal
}

func IsCacheEnabled() string {
return GetStringConfigWithDefault(CacheEnabled, "true")
}
3 changes: 2 additions & 1 deletion backend/src/apiserver/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
"PipelinePath": "pipelines"
},
"InitConnectionTimeout": "6m",
"DefaultPipelineRunnerServiceAccount": "pipeline-runner"
"DefaultPipelineRunnerServiceAccount": "pipeline-runner",
"CacheEnabled": "true"
}
5 changes: 5 additions & 0 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {

// Disable istio sidecar injection
workflow.SetAnnotationsToAllTemplates(util.AnnotationKeyIstioSidecarInject, util.AnnotationValueIstioSidecarInjectDisabled)
// Add a KFP specific label for cache service filtering. The cache_enabled flag here is a global control for whether cache server will
// receive targeting pods. Since cache server only receives pods in step level, the resource manager here will set this global label flag
// on every single step/pod so the cache server can understand.
// TODO: Add run_level flag with similar logic by reading flag value from create_run api.
workflow.SetLabelsToAllTemplates(util.LabelKeyCacheEnabled, common.IsCacheEnabled())
// Append provided parameter
workflow.OverrideParameters(parameters)

Expand Down
12 changes: 8 additions & 4 deletions backend/src/common/util/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ const (
// It captures whether the name of the owning ScheduledWorkflow.
LabelKeyWorkflowScheduledWorkflowName = constants.FullName + "/scheduledWorkflowName"


LabelKeyWorkflowRunId = "pipeline/runid"
LabelKeyWorkflowRunId = "pipeline/runid"
LabelKeyWorkflowPersistedFinalState = "pipeline/persistedFinalState"

// LabelKeyWorkflowEpoch is a Workflow annotation key.
// It captures the the name of the Run.
AnnotationKeyRunName = "pipelines.kubeflow.org/run_name"

AnnotationKeyIstioSidecarInject = "sidecar.istio.io/inject"
AnnotationValueIstioSidecarInjectEnabled = "true"
AnnotationKeyIstioSidecarInject = "sidecar.istio.io/inject"
AnnotationValueIstioSidecarInjectEnabled = "true"
AnnotationValueIstioSidecarInjectDisabled = "false"

// LabelKeyCacheEnabled is a workflow label key.
// It captures whether this step will be selected by cache service.
// To disable/enable cache for a single run, this label needs to be added in every step under a run.
LabelKeyCacheEnabled = "pipelines.kubeflow.org/cache_enabled"
)
15 changes: 15 additions & 0 deletions backend/src/common/util/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,21 @@ func (w *Workflow) SetAnnotationsToAllTemplates(key string, value string) {
}
}

// SetLabels sets labels on all templates in a Workflow
func (w *Workflow) SetLabelsToAllTemplates(key string, value string) {
if len(w.Spec.Templates) == 0 {
return
}
for index, _ := range w.Spec.Templates {
if w.Spec.Templates[index].Metadata.Labels == nil {
w.Spec.Templates[index].Metadata.Labels = make(map[string]string)
}
if w.Spec.Templates[index].Metadata.Labels[key] != value {
w.Spec.Templates[index].Metadata.Labels[key] = value
}
}
}

// SetOwnerReferences sets owner references on a Workflow.
func (w *Workflow) SetOwnerReferences(schedule *swfapi.ScheduledWorkflow) {
w.OwnerReferences = []metav1.OwnerReference{
Expand Down
37 changes: 36 additions & 1 deletion backend/src/common/util/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package util

import (
"github.com/ghodss/yaml"
"testing"

workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/ghodss/yaml"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -252,6 +253,40 @@ func TestWorkflow_SetOwnerReferences(t *testing.T) {
assert.Equal(t, expected, workflow.Get())
}

func TestWorkflow_SetLabelsToAllTemplates(t *testing.T) {
workflow := NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "WORKFLOW_NAME",
},
Spec: workflowapi.WorkflowSpec{
Templates: []workflowapi.Template{
workflowapi.Template{
Metadata: workflowapi.Metadata{},
},
},
},
})
workflow.SetLabelsToAllTemplates("key", "value")
expected := &workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "WORKFLOW_NAME",
},
Spec: workflowapi.WorkflowSpec{
Templates: []workflowapi.Template{
workflowapi.Template{
Metadata: workflowapi.Metadata{
Labels: map[string]string{
"key": "value",
},
},
},
},
},
}

assert.Equal(t, expected, workflow.Get())
}

func TestSetLabels(t *testing.T) {
workflow := NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit 0346aff

Please sign in to comment.