Skip to content

Commit

Permalink
migrated pkg/controller/endpointslice to contextual logging
Browse files Browse the repository at this point in the history
Signed-off-by: Naman <namanlakhwani@gmail.com>
  • Loading branch information
Namanl2001 committed Jul 10, 2023
1 parent c95b16b commit 09849b0
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 132 deletions.
3 changes: 2 additions & 1 deletion cmd/kube-controller-manager/app/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import (

func startEndpointSliceController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go endpointslicecontroller.NewController(
ctx,
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Core().V1().Services(),
controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.InformerFactory.Discovery().V1().EndpointSlices(),
controllerContext.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
controllerContext.ClientBuilder.ClientOrDie("endpointslice-controller"),
controllerContext.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
).Run(int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Done())
).Run(ctx, int(controllerContext.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs))
return nil, true, nil
}

Expand Down
2 changes: 1 addition & 1 deletion hack/logcheck.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
-contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go
-contextual k8s.io/kubernetes/pkg/controller/controller_utils.go
-contextual k8s.io/kubernetes/pkg/controller/disruption/.*
-contextual k8s.io/kubernetes/pkg/controller/endpointslice/.*
-contextual k8s.io/kubernetes/pkg/controller/endpoint/.*
-contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.*
-contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.*
-contextual k8s.io/kubernetes/pkg/controller/nodeipam/.*
Expand Down
84 changes: 48 additions & 36 deletions pkg/controller/endpointslice/endpointslice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package endpointslice

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -76,7 +77,7 @@ const (
)

// NewController creates and initializes a new Controller
func NewController(podInformer coreinformers.PodInformer,
func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
endpointSliceInformer discoveryinformers.EndpointSliceInformer,
Expand Down Expand Up @@ -127,9 +128,12 @@ func NewController(podInformer coreinformers.PodInformer,
c.nodeLister = nodeInformer.Lister()
c.nodesSynced = nodeInformer.Informer().HasSynced

logger := klog.FromContext(ctx)
endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onEndpointSliceAdd,
UpdateFunc: c.onEndpointSliceUpdate,
AddFunc: c.onEndpointSliceAdd,
UpdateFunc: func(oldObj, newObj interface{}) {
c.onEndpointSliceUpdate(logger, oldObj, newObj)
},
DeleteFunc: c.onEndpointSliceDelete,
})

Expand All @@ -148,9 +152,15 @@ func NewController(podInformer coreinformers.PodInformer,

if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addNode,
UpdateFunc: c.updateNode,
DeleteFunc: c.deleteNode,
AddFunc: func(obj interface{}) {
c.addNode(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.updateNode(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
c.deleteNode(logger, obj)
},
})

c.topologyCache = topologycache.NewTopologyCache()
Expand Down Expand Up @@ -239,7 +249,7 @@ type Controller struct {
}

// Run will not return until stopCh is closed.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
func (c *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
Expand All @@ -249,43 +259,45 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {

defer c.queue.ShutDown()

klog.Infof("Starting endpoint slice controller")
defer klog.Infof("Shutting down endpoint slice controller")
logger := klog.FromContext(ctx)
logger.Info("Starting endpoint slice controller")
defer logger.Info("Shutting down endpoint slice controller")

if !cache.WaitForNamedCacheSync("endpoint_slice", stopCh, c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) {
if !cache.WaitForNamedCacheSync("endpoint_slice", ctx.Done(), c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) {
return
}

logger.V(2).Info("Starting worker threads", "total", workers)
for i := 0; i < workers; i++ {
go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
}

<-stopCh
<-ctx.Done()
}

// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time
func (c *Controller) worker() {
for c.processNextWorkItem() {
func (c *Controller) worker(logger klog.Logger) {
for c.processNextWorkItem(logger) {
}
}

func (c *Controller) processNextWorkItem() bool {
func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
cKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(cKey)

err := c.syncService(cKey.(string))
c.handleErr(err, cKey)
err := c.syncService(logger, cKey.(string))
c.handleErr(logger, err, cKey)

return true
}

func (c *Controller) handleErr(err error, key interface{}) {
func (c *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
trackSync(err)

if err == nil {
Expand All @@ -294,20 +306,20 @@ func (c *Controller) handleErr(err error, key interface{}) {
}

if c.queue.NumRequeues(key) < maxRetries {
klog.Warningf("Error syncing endpoint slices for service %q, retrying. Error: %v", key, err)
logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err)
c.queue.AddRateLimited(key)
return
}

klog.Warningf("Retry budget exceeded, dropping service %q out of the queue: %v", key, err)
logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err)
c.queue.Forget(key)
utilruntime.HandleError(err)
}

func (c *Controller) syncService(key string) error {
func (c *Controller) syncService(logger klog.Logger, key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing service %q endpoint slices. (%v)", key, time.Since(startTime))
logger.V(4).Info("Finished syncing service endpoint slices", "key", key, "elapsedTime", time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -340,7 +352,7 @@ func (c *Controller) syncService(key string) error {
return nil
}

klog.V(5).Infof("About to update endpoint slices for service %q", key)
logger.V(5).Info("About to update endpoint slices for service", "key", key)

podLabelSelector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
pods, err := c.podLister.Pods(service.Namespace).List(podLabelSelector)
Expand Down Expand Up @@ -379,7 +391,7 @@ func (c *Controller) syncService(key string) error {
lastChangeTriggerTime := c.triggerTimeTracker.
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)

err = c.reconciler.reconcile(service, pods, endpointSlices, lastChangeTriggerTime)
err = c.reconciler.reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime)
if err != nil {
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
"Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
Expand Down Expand Up @@ -429,7 +441,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) {
// the EndpointSlice resource version does not match the expected version in the
// endpointSliceTracker or the managed-by value of the EndpointSlice has changed
// from or to this controller.
func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj interface{}) {
prevEndpointSlice := prevObj.(*discovery.EndpointSlice)
endpointSlice := obj.(*discovery.EndpointSlice)
if endpointSlice == nil || prevEndpointSlice == nil {
Expand All @@ -442,7 +454,7 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
svcName := endpointSlice.Labels[discovery.LabelServiceName]
prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
if svcName != prevSvcName {
klog.Warningf("%s label changed from %s to %s for %s", discovery.LabelServiceName, prevSvcName, svcName, endpointSlice.Name)
logger.Info("label changed", "label", discovery.LabelServiceName, "oldService", prevSvcName, "newService", svcName, "endpointslice", klog.KObj(endpointSlice))
c.queueServiceForEndpointSlice(endpointSlice)
c.queueServiceForEndpointSlice(prevEndpointSlice)
return
Expand Down Expand Up @@ -512,41 +524,41 @@ func (c *Controller) deletePod(obj interface{}) {
}
}

func (c *Controller) addNode(obj interface{}) {
c.checkNodeTopologyDistribution()
func (c *Controller) addNode(logger klog.Logger, obj interface{}) {
c.checkNodeTopologyDistribution(logger)
}

func (c *Controller) updateNode(old, cur interface{}) {
func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
oldNode := old.(*v1.Node)
curNode := cur.(*v1.Node)

// LabelTopologyZone may be added by cloud provider asynchronously after the Node is created.
// The topology cache should be updated in this case.
if isNodeReady(oldNode) != isNodeReady(curNode) ||
oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] {
c.checkNodeTopologyDistribution()
c.checkNodeTopologyDistribution(logger)
}
}

func (c *Controller) deleteNode(obj interface{}) {
c.checkNodeTopologyDistribution()
func (c *Controller) deleteNode(logger klog.Logger, obj interface{}) {
c.checkNodeTopologyDistribution(logger)
}

// checkNodeTopologyDistribution updates Nodes in the topology cache and then
// queues any Services that are past the threshold.
func (c *Controller) checkNodeTopologyDistribution() {
func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
if c.topologyCache == nil {
return
}
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("Error listing Nodes: %v", err)
logger.Error(err, "Error listing Nodes")
return
}
c.topologyCache.SetNodes(nodes)
c.topologyCache.SetNodes(logger, nodes)
serviceKeys := c.topologyCache.GetOverloadedServices()
for _, serviceKey := range serviceKeys {
klog.V(2).Infof("Queuing %s Service after Node change due to overloading", serviceKey)
logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey)
c.queue.Add(serviceKey)
}
}
Expand Down
Loading

0 comments on commit 09849b0

Please sign in to comment.