Skip to content

Commit

Permalink
[Fix]Cache - Revert objectSelector in mutatingwebhookconfiguration (k…
Browse files Browse the repository at this point in the history
…ubeflow#3433)

* Initial execution cache

This commit adds initial execution cache service. Including http service
and execution key generation.

* fix master

* fix go.sum

* Change kfp annotation for pod filtering

* update filter logic

* Remove unused const

* revert objectSelector in mutatingwebhookconfig

* remove objectSelector

* remove recursive pipeline in e2e test to prevent infinite loop with cache
  • Loading branch information
rui5i authored and Jeffwan committed Dec 9, 2020
1 parent 6593b7c commit 8983ee1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 21 deletions.
11 changes: 4 additions & 7 deletions backend/src/cache/deployer/cache-configmap.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ webhooks:
namespace: ${NAMESPACE}
path: "/mutate"
caBundle: ${CA_BUNDLE}
objectSelector:
matchLabels:
pipelines.kubeflow.org/cache_enabled: true
rules:
- operations: [ "CREATE" ]
apiGroups: [""]
apiVersions: ["v1"]
resources: ["pods"]
- operations: [ "CREATE" ]
apiGroups: [""]
apiVersions: ["v1"]
resources: ["pods"]
43 changes: 31 additions & 12 deletions backend/src/cache/server/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ import (
)

const (
ArgoWorkflowNodeName string = "workflows.argoproj.io/node-name"
ArgoWorkflowTemplate string = "workflows.argoproj.io/template"
ExecutionKey string = "pipelines.kubeflow.org/execution_cache_key"
CacheIDLabelKey string = "pipelines.kubeflow.org/cache_id"
ArgoWorkflowOutputs string = "workflows.argoproj.io/outputs"
MetadataWrittenKey string = "pipelines.kubeflow.org/metadata_written"
AnnotationPath string = "/metadata/annotations"
LabelPath string = "/metadata/labels"
SpecContainersPath string = "/spec/containers"
SpecInitContainersPath string = "/spec/initContainers"
TFXPodSuffix string = "tfx/orchestration/kubeflow/container_entrypoint.py"
ArchiveLocationKey string = "archiveLocation"
KFPCacheEnabledLabelKey string = "pipelines.kubeflow.org/cache_enabled"
KFPCacheEnabledLabelValue string = "true"
ArgoWorkflowNodeName string = "workflows.argoproj.io/node-name"
ArgoWorkflowTemplate string = "workflows.argoproj.io/template"
ExecutionKey string = "pipelines.kubeflow.org/execution_cache_key"
CacheIDLabelKey string = "pipelines.kubeflow.org/cache_id"
ArgoWorkflowOutputs string = "workflows.argoproj.io/outputs"
MetadataWrittenKey string = "pipelines.kubeflow.org/metadata_written"
AnnotationPath string = "/metadata/annotations"
LabelPath string = "/metadata/labels"
SpecContainersPath string = "/spec/containers"
SpecInitContainersPath string = "/spec/initContainers"
TFXPodSuffix string = "tfx/orchestration/kubeflow/container_entrypoint.py"
ArchiveLocationKey string = "archiveLocation"
)

var (
Expand Down Expand Up @@ -73,6 +75,14 @@ func MutatePodIfCached(req *v1beta1.AdmissionRequest, clientMgr ClientManagerInt
}

// Pod filtering to only cache KFP argo pods except TFX pods
// TODO: Switch to objectSelector once Kubernetes 1.15 hits the GKE stable channel. See
// https://github.com/kubernetes/kubernetes/pull/78505
// https://cloud.google.com/kubernetes-engine/docs/release-notes-stable
if !isKFPCacheEnabled(&pod) {
log.Printf("This pod %s does not enable cache.", pod.ObjectMeta.Name)
return nil, nil
}

if isTFXPod(&pod) {
log.Printf("This pod %s is created by tfx pipelines.", pod.ObjectMeta.Name)
return nil, nil
Expand Down Expand Up @@ -192,6 +202,15 @@ func getValueFromSerializedMap(serializedMap string, key string) string {
return value
}

func isKFPCacheEnabled(pod *corev1.Pod) bool {
cacheEnabled, exists := pod.ObjectMeta.Labels[KFPCacheEnabledLabelKey]
if !exists {
log.Printf("This pod %s is not created by KFP.", pod.ObjectMeta.Name)
return false
}
return cacheEnabled == KFPCacheEnabledLabelValue
}

func isTFXPod(pod *corev1.Pod) bool {
containers := pod.Spec.Containers
if containers == nil || len(containers) == 0 {
Expand Down
11 changes: 10 additions & 1 deletion backend/src/cache/server/mutation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ var (
ArgoWorkflowTemplate: `{"name": "test_template"}`,
},
Labels: map[string]string{
ArgoCompleteLabelKey: "true",
ArgoCompleteLabelKey: "true",
KFPCacheEnabledLabelKey: KFPCacheEnabledLabelValue,
},
},
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -106,6 +107,14 @@ func TestMutatePodIfCachedWithDecodeError(t *testing.T) {
assert.Contains(t, err.Error(), "could not deserialize pod object")
}

func TestMutatePodIfCachedWithCacheDisabledPod(t *testing.T) {
cacheDisabledPod := *fakePod
cacheDisabledPod.ObjectMeta.Labels[KFPCacheEnabledLabelKey] = "false"
patchOperation, err := MutatePodIfCached(GetFakeRequestFromPod(&cacheDisabledPod), fakeClientManager)
assert.Nil(t, patchOperation)
assert.Nil(t, err)
}

func TestMutatePodIfCachedWithTFXPod(t *testing.T) {
tfxPod := *fakePod
mainContainerCommand := append(tfxPod.Spec.Containers[0].Command, "/tfx-src/"+TFXPodSuffix)
Expand Down
1 change: 0 additions & 1 deletion test/e2e_test_gke_v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ spec:
- exit_handler
- sequential
- parallel_join
- recursion
- volume_ops

- name: upgrade-test-preparation
Expand Down

0 comments on commit 8983ee1

Please sign in to comment.