Skip to content

Commit 9388ee3

Browse files
authored
Merge pull request #7549 from abdelrahman882/asynchronous_taint_nodes
Taint nodes for deletion asynchronously
2 parents 1ba6007 + 392e758 commit 9388ee3

File tree

2 files changed

+50
-16
lines changed

2 files changed

+50
-16
lines changed

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package actuation
1818

1919
import (
20+
default_context "context"
2021
"strings"
2122
"time"
2223

@@ -43,11 +44,14 @@ import (
4344
"k8s.io/autoscaler/cluster-autoscaler/utils/expiring"
4445
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
4546
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
47+
48+
"k8s.io/client-go/util/workqueue"
4649
"k8s.io/klog/v2"
4750
)
4851

4952
const (
50-
pastLatencyExpireDuration = time.Hour
53+
pastLatencyExpireDuration = time.Hour
54+
maxConcurrentNodesTainting = 5
5155
)
5256

5357
// Actuator is responsible for draining and deleting nodes.
@@ -179,33 +183,55 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView, nod
179183
// taintNodesSync synchronously taints all provided nodes with NoSchedule. If tainting fails for any of the nodes, already
180184
// applied taints are cleaned up.
181185
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) (time.Duration, errors.AutoscalerError) {
182-
var taintedNodes []*apiv1.Node
186+
nodesToTaint := make([]*apiv1.Node, 0)
183187
var updateLatencyTracker *UpdateLatencyTracker
184188
nodeDeleteDelayAfterTaint := a.nodeDeleteDelayAfterTaint
185189
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
186190
updateLatencyTracker = NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister())
187191
go updateLatencyTracker.Start()
188192
}
193+
189194
for _, bucket := range NodeGroupViews {
190195
for _, node := range bucket.Nodes {
191196
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
192197
updateLatencyTracker.StartTimeChan <- nodeTaintStartTime{node.Name, time.Now()}
193198
}
194-
err := a.taintNode(node)
195-
if err != nil {
196-
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
197-
// Clean up already applied taints in case of issues.
198-
for _, taintedNode := range taintedNodes {
199-
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
200-
}
201-
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
202-
close(updateLatencyTracker.AwaitOrStopChan)
203-
}
204-
return nodeDeleteDelayAfterTaint, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node)
205-
}
206-
taintedNodes = append(taintedNodes, node)
199+
nodesToTaint = append(nodesToTaint, node)
200+
}
201+
}
202+
failedTaintedNodes := make(chan struct {
203+
node *apiv1.Node
204+
err error
205+
}, len(nodesToTaint))
206+
taintedNodes := make(chan *apiv1.Node, len(nodesToTaint))
207+
workqueue.ParallelizeUntil(default_context.Background(), maxConcurrentNodesTainting, len(nodesToTaint), func(piece int) {
208+
node := nodesToTaint[piece]
209+
err := a.taintNode(node)
210+
if err != nil {
211+
failedTaintedNodes <- struct {
212+
node *apiv1.Node
213+
err error
214+
}{node: node, err: err}
215+
} else {
216+
taintedNodes <- node
217+
}
218+
})
219+
close(failedTaintedNodes)
220+
close(taintedNodes)
221+
if len(failedTaintedNodes) > 0 {
222+
for nodeWithError := range failedTaintedNodes {
223+
a.ctx.Recorder.Eventf(nodeWithError.node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", nodeWithError.err)
224+
}
225+
// Clean up already applied taints in case of issues.
226+
for taintedNode := range taintedNodes {
227+
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
207228
}
229+
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
230+
close(updateLatencyTracker.AwaitOrStopChan)
231+
}
232+
return nodeDeleteDelayAfterTaint, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint %d nodes with ToBeDeleted", len(failedTaintedNodes))
208233
}
234+
209235
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
210236
updateLatencyTracker.AwaitOrStopChan <- true
211237
latency, ok := <-updateLatencyTracker.ResultChan

cluster-autoscaler/core/scaledown/actuation/actuator_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,10 @@ func getStartDeletionTestCases(ignoreDaemonSetsUtilization bool, suffix string)
446446
{toBeDeletedTaint},
447447
{},
448448
},
449+
"test-node-3": {
450+
{toBeDeletedTaint},
451+
{},
452+
},
449453
},
450454
wantErr: cmpopts.AnyError,
451455
},
@@ -472,6 +476,10 @@ func getStartDeletionTestCases(ignoreDaemonSetsUtilization bool, suffix string)
472476
{toBeDeletedTaint},
473477
{},
474478
},
479+
"atomic-4-node-3": {
480+
{toBeDeletedTaint},
481+
{},
482+
},
475483
},
476484
wantErr: cmpopts.AnyError,
477485
},
@@ -1046,7 +1054,7 @@ func TestStartDeletion(t *testing.T) {
10461054
nodeName string
10471055
taints []apiv1.Taint
10481056
}
1049-
taintUpdates := make(chan nodeTaints, 10)
1057+
taintUpdates := make(chan nodeTaints, 20)
10501058
deletedNodes := make(chan string, 10)
10511059
deletedPods := make(chan string, 10)
10521060

0 commit comments

Comments
 (0)