-
Notifications
You must be signed in to change notification settings - Fork 883
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
574866f
commit 547e4e9
Showing
1 changed file
with
243 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
package binding | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/errors" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/cache" | ||
"k8s.io/client-go/tools/record" | ||
"k8s.io/client-go/util/workqueue" | ||
"k8s.io/klog/v2" | ||
|
||
"github.com/huawei-cloudnative/karmada/pkg/controllers/util" | ||
clientset "github.com/huawei-cloudnative/karmada/pkg/generated/clientset/versioned" | ||
karmadaScheme "github.com/huawei-cloudnative/karmada/pkg/generated/clientset/versioned/scheme" | ||
informers "github.com/huawei-cloudnative/karmada/pkg/generated/informers/externalversions" | ||
listers "github.com/huawei-cloudnative/karmada/pkg/generated/listers/propagationstrategy/v1alpha1" | ||
) | ||
|
||
const controllerAgentName = "binding-controller" | ||
|
||
// Controller is the controller implementation for binding resources | ||
type Controller struct { | ||
// karmadaClientSet is the clientset for our own API group. | ||
karmadaClientSet clientset.Interface | ||
|
||
// kubeClientSet is a standard kubernetes clientset. | ||
kubeClientSet kubernetes.Interface | ||
|
||
propagationBindingLister listers.PropagationBindingLister | ||
propagationBindingSynced cache.InformerSynced | ||
// workqueue is a rate limited work queue. This is used to queue work to be | ||
// processed instead of performing it as soon as a change happens. This | ||
// means we can ensure we only process a fixed amount of resources at a | ||
// time, and makes it easy to ensure we are never processing the same item | ||
// simultaneously in two different workers. | ||
workqueue workqueue.RateLimitingInterface | ||
|
||
// recorder is an event recorder for recording Event resources to the | ||
// Kubernetes API. | ||
eventRecorder record.EventRecorder | ||
} | ||
|
||
// StartPropagationBindingController starts a new binding controller. | ||
func StartPropagationBindingController(config *util.ControllerConfig, stopChan <-chan struct{}) error { | ||
controller, err := newPropagationBindingController(config) | ||
if err != nil { | ||
return err | ||
} | ||
klog.Infof("Starting PropagationBinding controller") | ||
|
||
go wait.Until(func() { | ||
if err := controller.Run(1, stopChan); err != nil { | ||
klog.Errorf("controller exit unexpected! will restart later, controller: %s, error: %v", controllerAgentName, err) | ||
} | ||
}, 1*time.Second, stopChan) | ||
|
||
return nil | ||
} | ||
|
||
// newPropagationBindingController returns a new controller. | ||
func newPropagationBindingController(config *util.ControllerConfig) (*Controller, error) { | ||
|
||
headClusterConfig := rest.CopyConfig(config.HeadClusterConfig) | ||
kubeClientSet := kubernetes.NewForConfigOrDie(headClusterConfig) | ||
|
||
karmadaClientSet := clientset.NewForConfigOrDie(headClusterConfig) | ||
propagationBindingInformer := informers.NewSharedInformerFactory(karmadaClientSet, 0).Propagationstrategy().V1alpha1().PropagationBindings() | ||
|
||
// Add karmada types to the default Kubernetes Scheme so Events can be logged for karmada types. | ||
utilruntime.Must(karmadaScheme.AddToScheme(scheme.Scheme)) | ||
|
||
// Create event broadcaster | ||
klog.V(1).Infof("Creating event broadcaster for %s", controllerAgentName) | ||
eventBroadcaster := record.NewBroadcaster() | ||
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")}) | ||
|
||
controller := &Controller{ | ||
karmadaClientSet: karmadaClientSet, | ||
kubeClientSet: kubeClientSet, | ||
propagationBindingLister: propagationBindingInformer.Lister(), | ||
propagationBindingSynced: propagationBindingInformer.Informer().HasSynced, | ||
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), | ||
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}), | ||
} | ||
|
||
klog.Info("Setting up event handlers") | ||
propagationBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
AddFunc: func(obj interface{}) { | ||
klog.Infof("Received add event. just add to queue.") | ||
controller.enqueueEventResource(obj) | ||
}, | ||
UpdateFunc: func(old, new interface{}) { | ||
klog.Infof("Received update event. just add to queue.") | ||
controller.enqueueEventResource(new) | ||
}, | ||
DeleteFunc: func(obj interface{}) { | ||
klog.Infof("Received delete event. Do nothing just log.") | ||
}, | ||
}) | ||
|
||
return controller, nil | ||
} | ||
|
||
// Run will set up the event handlers for types we are interested in, as well | ||
// as syncing informer caches and starting workers. It will block until stopCh | ||
// is closed, at which point it will shutdown the workqueue and wait for | ||
// workers to finish processing their current work items. | ||
func (c *Controller) Run(workerNumber int, stopCh <-chan struct{}) error { | ||
defer utilruntime.HandleCrash() | ||
defer c.workqueue.ShutDown() | ||
|
||
klog.Infof("Starting controller: %s", controllerAgentName) | ||
|
||
// Wait for the caches to be synced before starting workers | ||
klog.Info("Waiting for informer caches to sync") | ||
if ok := cache.WaitForCacheSync(stopCh, c.propagationBindingSynced); !ok { | ||
return fmt.Errorf("failed to wait for caches to sync") | ||
} | ||
|
||
klog.Infof("Starting workers for controller. worker number: %d, controller: %s", workerNumber, controllerAgentName) | ||
for i := 0; i < workerNumber; i++ { | ||
go wait.Until(c.runWorker, time.Second, stopCh) | ||
} | ||
|
||
// Controller will block here until stopCh is closed. | ||
<-stopCh | ||
klog.Info("Shutting down workers") | ||
|
||
return nil | ||
} | ||
|
||
// runWorker is a long-running function that will continually call the | ||
// processNextWorkItem function in order to read and process a message on the | ||
// workqueue. | ||
func (c *Controller) runWorker() { | ||
for c.processNextWorkItem() { | ||
} | ||
} | ||
|
||
// processNextWorkItem will read a single work item off the workqueue and | ||
// attempt to process it, by calling the syncHandler. | ||
func (c *Controller) processNextWorkItem() bool { | ||
obj, shutdown := c.workqueue.Get() | ||
|
||
if shutdown { | ||
return false | ||
} | ||
|
||
// We wrap this block in a func so we can defer c.workqueue.Done. | ||
err := func(obj interface{}) error { | ||
// We call Done here so the workqueue knows we have finished | ||
// processing this item. We also must remember to call Forget if we | ||
// do not want this work item being re-queued. For example, we do | ||
// not call Forget if a transient error occurs, instead the item is | ||
// put back on the workqueue and attempted again after a back-off | ||
// period. | ||
defer c.workqueue.Done(obj) | ||
var key string | ||
var ok bool | ||
// We expect strings to come off the workqueue. These are of the | ||
// form namespace/name. We do this as the delayed nature of the | ||
// workqueue means the items in the informer cache may actually be | ||
// more up to date that when the item was initially put onto the | ||
// workqueue. | ||
if key, ok = obj.(string); !ok { | ||
// As the item in the workqueue is actually invalid, we call | ||
// Forget here else we'd go into a loop of attempting to | ||
// process a work item that is invalid. | ||
c.workqueue.Forget(obj) | ||
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) | ||
return nil | ||
} | ||
// Run the syncHandler, passing it the namespace/name string of the | ||
// PropagateStrategy resource to be synced. | ||
if err := c.syncHandler(key); err != nil { | ||
// Put the item back on the workqueue to handle any transient errors. | ||
c.workqueue.AddRateLimited(key) | ||
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) | ||
} | ||
// Finally, if no error occurs we Forget this item so it does not | ||
// get queued again until another change happens. | ||
c.workqueue.Forget(obj) | ||
klog.Infof("Successfully synced '%s'", key) | ||
return nil | ||
}(obj) | ||
|
||
if err != nil { | ||
utilruntime.HandleError(err) | ||
return true | ||
} | ||
|
||
return true | ||
} | ||
|
||
// syncHandler compares the actual state with the desired, and attempts to | ||
// converge the two. It then updates the Status block of the PropagateStrategy resource | ||
// with the current status of the resource. | ||
func (c *Controller) syncHandler(key string) error { | ||
// Convert the namespace/name string into a distinct namespace and name | ||
namespace, name, err := cache.SplitMetaNamespaceKey(key) | ||
if err != nil { | ||
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) | ||
return nil | ||
} | ||
|
||
// Get the resource with this namespace/name | ||
propagationBinding, err := c.propagationBindingLister.PropagationBindings(namespace).Get(name) | ||
if err != nil { | ||
// The propagationBinding resource may no longer exist, in which case we stop | ||
// processing. | ||
if errors.IsNotFound(err) { | ||
utilruntime.HandleError(fmt.Errorf("propagationBinding '%s' in work queue no longer exists", key)) | ||
return nil | ||
} | ||
|
||
return err | ||
} | ||
|
||
klog.Infof("Sync propagationBinding: %s/%s", propagationBinding.Namespace, propagationBinding.Name) | ||
|
||
return nil | ||
} | ||
|
||
// enqueueFoo takes a resource and converts it into a namespace/name | ||
// string which is then put onto the work queue. This method should *not* be | ||
// passed resources of any type other than propagationBinding. | ||
func (c *Controller) enqueueEventResource(obj interface{}) { | ||
var key string | ||
var err error | ||
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { | ||
utilruntime.HandleError(err) | ||
return | ||
} | ||
c.workqueue.Add(key) | ||
} |