Skip to content

Commit

Permalink
Merge pull request kubernetes#126780 from hbostan/automated-cherry-pi…
Browse files Browse the repository at this point in the history
…ck-of-#125294-upstream-release-1.29

Automated cherry pick of kubernetes#125294: Add a new workqueue to endpointslice controller for updating
  • Loading branch information
k8s-ci-robot authored Aug 28, 2024
2 parents 80c1fd3 + a9d15f3 commit bb3827d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 44 deletions.
95 changes: 60 additions & 35 deletions pkg/controller/endpointslice/endpointslice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ const (
// controllerName is a unique value used with LabelManagedBy to indicated
// the component managing an EndpointSlice.
controllerName = "endpointslice-controller.k8s.io"

// topologyQueueItemKey is the key for all items in the topologyQueue.
topologyQueueItemKey = "topologyQueueItemKey"
)

// NewController creates and initializes a new Controller
Expand All @@ -99,12 +102,13 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
// such as an update to a Service or Deployment. A more significant
// rate limit back off here helps ensure that the Controller does not
// overwhelm the API Server.
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff),
// 10 qps, 100 bucket size. This is only for retry speed and its
// only the overall factor (not per item).
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "endpoint_slice"),
topologyQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint_slice_topology"),
workerLoopPeriod: time.Second,
}

Expand Down Expand Up @@ -153,14 +157,14 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,

if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.addNode(logger, obj)
AddFunc: func(_ interface{}) {
c.addNode()
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.updateNode(logger, oldObj, newObj)
c.updateNode(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
c.deleteNode(logger, obj)
DeleteFunc: func(_ interface{}) {
c.deleteNode()
},
})

Expand Down Expand Up @@ -230,7 +234,11 @@ type Controller struct {
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue workqueue.RateLimitingInterface
serviceQueue workqueue.RateLimitingInterface

// topologyQueue is used to trigger a topology cache update and checking node
// topology distribution.
topologyQueue workqueue.RateLimitingInterface

// maxEndpointsPerSlice references the maximum number of endpoints that
// should be added to an EndpointSlice
Expand Down Expand Up @@ -258,7 +266,8 @@ func (c *Controller) Run(ctx context.Context, workers int) {
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()

defer c.queue.ShutDown()
defer c.serviceQueue.ShutDown()
defer c.topologyQueue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting endpoint slice controller")
Expand All @@ -268,52 +277,68 @@ func (c *Controller) Run(ctx context.Context, workers int) {
return
}

logger.V(2).Info("Starting worker threads", "total", workers)
logger.V(2).Info("Starting service queue worker threads", "total", workers)
for i := 0; i < workers; i++ {
go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
go wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
}
logger.V(2).Info("Starting topology queue worker threads", "total", 1)
go wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())

<-ctx.Done()
}

// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time
func (c *Controller) worker(logger klog.Logger) {
for c.processNextWorkItem(logger) {
// serviceQueueWorker runs a worker thread that just dequeues items, processes
// them, and marks them done. You may run as many of these in parallel as you
// wish; the workqueue guarantees that they will not end up processing the same
// service at the same time
func (c *Controller) serviceQueueWorker(logger klog.Logger) {
for c.processNextServiceWorkItem(logger) {
}
}

func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
cKey, quit := c.queue.Get()
func (c *Controller) processNextServiceWorkItem(logger klog.Logger) bool {
cKey, quit := c.serviceQueue.Get()
if quit {
return false
}
defer c.queue.Done(cKey)
defer c.serviceQueue.Done(cKey)

err := c.syncService(logger, cKey.(string))
c.handleErr(logger, err, cKey)

return true
}

func (c *Controller) topologyQueueWorker(logger klog.Logger) {
for c.processNextTopologyWorkItem(logger) {
}
}

func (c *Controller) processNextTopologyWorkItem(logger klog.Logger) bool {
key, quit := c.topologyQueue.Get()
if quit {
return false
}
defer c.topologyQueue.Done(key)
c.checkNodeTopologyDistribution(logger)
return true
}
func (c *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
trackSync(err)

if err == nil {
c.queue.Forget(key)
c.serviceQueue.Forget(key)
return
}

if c.queue.NumRequeues(key) < maxRetries {
if c.serviceQueue.NumRequeues(key) < maxRetries {
logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err)
c.queue.AddRateLimited(key)
c.serviceQueue.AddRateLimited(key)
return
}

logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err)
c.queue.Forget(key)
c.serviceQueue.Forget(key)
utilruntime.HandleError(err)
}

Expand Down Expand Up @@ -410,7 +435,7 @@ func (c *Controller) onServiceUpdate(obj interface{}) {
return
}

c.queue.Add(key)
c.serviceQueue.Add(key)
}

// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
Expand All @@ -421,7 +446,7 @@ func (c *Controller) onServiceDelete(obj interface{}) {
return
}

c.queue.Add(key)
c.serviceQueue.Add(key)
}

// onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
Expand Down Expand Up @@ -494,7 +519,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo
if c.endpointUpdatesBatchPeriod > delay {
delay = c.endpointUpdatesBatchPeriod
}
c.queue.AddAfter(key, delay)
c.serviceQueue.AddAfter(key, delay)
}

func (c *Controller) addPod(obj interface{}) {
Expand All @@ -505,14 +530,14 @@ func (c *Controller) addPod(obj interface{}) {
return
}
for key := range services {
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
}

func (c *Controller) updatePod(old, cur interface{}) {
services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
for key := range services {
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
}

Expand All @@ -525,24 +550,24 @@ func (c *Controller) deletePod(obj interface{}) {
}
}

func (c *Controller) addNode(logger klog.Logger, obj interface{}) {
c.checkNodeTopologyDistribution(logger)
func (c *Controller) addNode() {
c.topologyQueue.Add(topologyQueueItemKey)
}

func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
func (c *Controller) updateNode(old, cur interface{}) {
oldNode := old.(*v1.Node)
curNode := cur.(*v1.Node)

// LabelTopologyZone may be added by cloud provider asynchronously after the Node is created.
// The topology cache should be updated in this case.
if isNodeReady(oldNode) != isNodeReady(curNode) ||
oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] {
c.checkNodeTopologyDistribution(logger)
c.topologyQueue.Add(topologyQueueItemKey)
}
}

func (c *Controller) deleteNode(logger klog.Logger, obj interface{}) {
c.checkNodeTopologyDistribution(logger)
func (c *Controller) deleteNode() {
c.topologyQueue.Add(topologyQueueItemKey)
}

// checkNodeTopologyDistribution updates Nodes in the topology cache and then
Expand All @@ -560,7 +585,7 @@ func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
serviceKeys := c.topologyCache.GetOverloadedServices()
for _, serviceKey := range serviceKeys {
logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey)
c.queue.Add(serviceKey)
c.serviceQueue.Add(serviceKey)
}
}

Expand Down
24 changes: 15 additions & 9 deletions pkg/controller/endpointslice/endpointslice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,18 +539,18 @@ func TestOnEndpointSliceUpdate(t *testing.T) {
epSlice2 := epSlice1.DeepCopy()
epSlice2.Labels[discovery.LabelManagedBy] = "something else"

assert.Equal(t, 0, esController.queue.Len())
assert.Equal(t, 0, esController.serviceQueue.Len())
esController.onEndpointSliceUpdate(logger, epSlice1, epSlice2)
err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) {
if esController.queue.Len() > 0 {
if esController.serviceQueue.Len() > 0 {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("unexpected error waiting for add to queue")
}
assert.Equal(t, 1, esController.queue.Len())
assert.Equal(t, 1, esController.serviceQueue.Len())
}

func TestSyncService(t *testing.T) {
Expand Down Expand Up @@ -1947,8 +1947,8 @@ func Test_checkNodeTopologyDistribution(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
esController.checkNodeTopologyDistribution(logger)

if esController.queue.Len() != tc.expectedQueueLen {
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len())
if esController.serviceQueue.Len() != tc.expectedQueueLen {
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.serviceQueue.Len())
}
})
}
Expand Down Expand Up @@ -2007,8 +2007,11 @@ func TestUpdateNode(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
esController.nodeStore.Add(node1)
esController.nodeStore.Add(node2)
esController.addNode(logger, node1)
esController.addNode(logger, node2)
esController.addNode()
esController.addNode()
assert.Equal(t, 1, esController.topologyQueue.Len())
esController.processNextTopologyWorkItem(logger)
assert.Equal(t, 0, esController.topologyQueue.Len())
// The Nodes don't have the zone label, AddHints should fail.
_, _, eventsBuilders := esController.topologyCache.AddHints(logger, sliceInfo)
require.Len(t, eventsBuilders, 1)
Expand All @@ -2022,8 +2025,11 @@ func TestUpdateNode(t *testing.T) {
// After adding the zone label to the Nodes and calling the event handler updateNode, AddHints should succeed.
esController.nodeStore.Update(updateNode1)
esController.nodeStore.Update(updateNode2)
esController.updateNode(logger, node1, updateNode1)
esController.updateNode(logger, node2, updateNode2)
esController.updateNode(node1, updateNode1)
esController.updateNode(node2, updateNode2)
assert.Equal(t, 1, esController.topologyQueue.Len())
esController.processNextTopologyWorkItem(logger)
assert.Equal(t, 0, esController.topologyQueue.Len())
_, _, eventsBuilders = esController.topologyCache.AddHints(logger, sliceInfo)
require.Len(t, eventsBuilders, 1)
assert.Contains(t, eventsBuilders[0].Message, topologycache.TopologyAwareHintsEnabled)
Expand Down

0 comments on commit bb3827d

Please sign in to comment.