Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List applying policies in InMemoryChannels status #8011

Merged
merged 13 commits into from
Jun 20, 2024
Merged
Prev Previous commit
Next Next commit
Reconcile IMC only on relevant EventPolicy changes
  • Loading branch information
creydr committed Jun 18, 2024
commit 98f6c795c01bcfec176b0d41f660acb3a74cd588
92 changes: 88 additions & 4 deletions pkg/reconciler/inmemorychannel/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ package controller

import (
"context"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"
"strings"

"github.com/kelseyhightower/envconfig"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/messaging"
v1 "knative.dev/eventing/pkg/client/listers/messaging/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/system"

"knative.dev/pkg/resolver"
Expand Down Expand Up @@ -66,6 +72,7 @@ func NewController(
serviceAccountInformer := serviceaccount.Get(ctx)
roleBindingInformer := rolebinding.Get(ctx)
secretInformer := secretinformer.Get(ctx)
eventPolicyInformer := eventpolicy.Get(ctx)

r := &Reconciler{
kubeClientSet: kubeclient.Get(ctx),
Expand All @@ -76,6 +83,7 @@ func NewController(
serviceAccountLister: serviceAccountInformer.Lister(),
roleBindingLister: roleBindingInformer.Lister(),
secretLister: secretInformer.Lister(),
eventPolicyLister: eventPolicyInformer.Lister(),
}

env := &envConfig{}
Expand Down Expand Up @@ -141,7 +149,24 @@ func NewController(
Handler: controller.HandleAll(globalResync),
})

eventPolicyInformer.Informer().AddEventHandler(controller.HandleAll(globalResync))
// Enqueue the InMemoryChannel, if we have an EventPolicy which was referencing
// or got updated and now is referencing the InMemoryChannel
eventPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Here we need to check if either the old or the new EventPolicy was referencing the InMemoryChannel

alreadyEnqueued := enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), oldObj, impl.EnqueueKey)
if !alreadyEnqueued {
enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), newObj, impl.EnqueueKey)
}
},
DeleteFunc: func(obj interface{}) {
enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey)
},
})
creydr marked this conversation as resolved.
Show resolved Hide resolved

// Setup the watch on the config map of dispatcher config
configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx))
Expand All @@ -150,3 +175,62 @@ func NewController(

return impl
}

// enqueueApplyingChannelOfEventPolicy checks if an InMemoryChannel is referenced in the given EventPolicy.
// If so, it enqueues the channel into the enqueueFn() and returns true.
func enqueueApplyingChannelOfEventPolicy(imcLister v1.InMemoryChannelLister, obj interface{}, enqueueFn func(key types.NamespacedName)) bool {
eventPolicy, ok := obj.(*v1alpha1.EventPolicy)
if !ok {
return false
}

for _, to := range eventPolicy.Spec.To {
if to.Ref != nil {
toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion)
if err != nil {
return false
}

if strings.EqualFold(toGV.Group, messaging.GroupName) &&
strings.EqualFold(to.Ref.Kind, "InMemoryChannel") {

enqueueFn(types.NamespacedName{
Namespace: eventPolicy.Namespace,
Name: to.Ref.Name,
})
return true
}
}

if to.Selector != nil {
selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion)
if err != nil {
return false
}

if strings.EqualFold(selectorGV.Group, messaging.GroupName) &&
strings.EqualFold(to.Selector.Kind, "InMemoryChannel") {

selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector)
if err != nil {
return false
}

imcs, err := imcLister.InMemoryChannels(eventPolicy.Namespace).List(selector)
if err != nil {
return false
}

for _, imc := range imcs {
enqueueFn(types.NamespacedName{
Namespace: eventPolicy.Namespace,
Name: imc.Name,
})
}
return true
}
}
}

return false
}
Loading