Skip to content

Commit

Permalink
[Backend]Cache - KFP pod filter logic looking for cache_enabled = tru…
Browse files Browse the repository at this point in the history
…e label selector (#3368)

* Initial execution cache

This commit adds initial execution cache service. Including http service
and execution key generation.

* fix master

* fix go.sum

* Change kfp annotation for pod filtering

* update filter logic

* Remove unused const
  • Loading branch information
rui5i committed Mar 31, 2020
1 parent 4a18f37 commit 62269eb
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 53 deletions.
3 changes: 3 additions & 0 deletions backend/src/cache/deployer/cache-configmap.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ webhooks:
namespace: ${NAMESPACE}
path: "/mutate"
caBundle: ${CA_BUNDLE}
objectSelector:
matchLabels:
pipelines.kubeflow.org/cache_enabled: true
rules:
- operations: [ "CREATE" ]
apiGroups: [""]
Expand Down
43 changes: 7 additions & 36 deletions backend/src/cache/server/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
)

const (
KFPAnnotation string = "pipelines.kubeflow.org"
ArgoWorkflowNodeName string = "workflows.argoproj.io/node-name"
ArgoWorkflowTemplate string = "workflows.argoproj.io/template"
ExecutionKey string = "pipelines.kubeflow.org/execution_cache_key"
Expand Down Expand Up @@ -74,8 +73,8 @@ func MutatePodIfCached(req *v1beta1.AdmissionRequest, clientMgr ClientManagerInt
}

// Pod filtering to only cache KFP argo pods except TFX pods
if !isValidPod(&pod) {
log.Printf("This pod %s is not a valid pod.", pod.ObjectMeta.Name)
if isTFXPod(&pod) {
log.Printf("This pod %s is created by tfx pipelines.", pod.ObjectMeta.Name)
return nil, nil
}

Expand Down Expand Up @@ -193,42 +192,14 @@ func getValueFromSerializedMap(serializedMap string, key string) string {
return value
}

func isValidPod(pod *corev1.Pod) bool {
annotations := pod.ObjectMeta.Annotations
if annotations == nil || len(annotations) == 0 {
log.Printf("The annotation of this pod %s is empty.", pod.ObjectMeta.Name)
return false
}
if !isKFPArgoPod(&annotations, pod.ObjectMeta.Name) {
log.Printf("This pod %s is not created by KFP.", pod.ObjectMeta.Name)
return false
}
func isTFXPod(pod *corev1.Pod) bool {
containers := pod.Spec.Containers
if containers != nil && len(containers) != 0 && isTFXPod(&containers) {
log.Printf("This pod %s is created by TFX pipelines.", pod.ObjectMeta.Name)
return false
}
return true
}

func isKFPArgoPod(annotations *map[string]string, podName string) bool {
// is argo pod or not
if _, exists := (*annotations)[ArgoWorkflowNodeName]; !exists {
log.Printf("This pod %s is not created by Argo.", podName)
return false
if containers == nil || len(containers) == 0 {
log.Printf("This pod container does not exist.")
return true
}
// is KFP pod or not
for k := range *annotations {
if strings.Contains(k, KFPAnnotation) {
return true
}
}
return false
}

func isTFXPod(containers *[]corev1.Container) bool {
var mainContainers []corev1.Container
for _, c := range *containers {
for _, c := range containers {
if c.Name != "" && c.Name == "main" {
mainContainers = append(mainContainers, c)
}
Expand Down
17 changes: 0 additions & 17 deletions backend/src/cache/server/mutation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var (
Annotations: map[string]string{
ArgoWorkflowNodeName: "test_node",
ArgoWorkflowTemplate: `{"name": "test_template"}`,
KFPAnnotation: "test_kfp",
},
Labels: map[string]string{
ArgoCompleteLabelKey: "true",
Expand Down Expand Up @@ -107,22 +106,6 @@ func TestMutatePodIfCachedWithDecodeError(t *testing.T) {
assert.Contains(t, err.Error(), "could not deserialize pod object")
}

func TestMutatePodIfCachedWithNonKFPPod(t *testing.T) {
nonKFPPod := *fakePod
delete(nonKFPPod.Annotations, KFPAnnotation)
patchOperation, err := MutatePodIfCached(GetFakeRequestFromPod(&nonKFPPod), fakeClientManager)
assert.Nil(t, patchOperation)
assert.Nil(t, err)
}

func TestMutatePodIfCachedWithNonArgoPod(t *testing.T) {
nonArgoPod := *fakePod
delete(nonArgoPod.Annotations, ArgoWorkflowNodeName)
patchOperation, err := MutatePodIfCached(GetFakeRequestFromPod(&nonArgoPod), fakeClientManager)
assert.Nil(t, patchOperation)
assert.Nil(t, err)
}

func TestMutatePodIfCachedWithTFXPod(t *testing.T) {
tfxPod := *fakePod
mainContainerCommand := append(tfxPod.Spec.Containers[0].Command, "/tfx-src/"+TFXPodSuffix)
Expand Down

0 comments on commit 62269eb

Please sign in to comment.