Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package actuation

import (
default_context "context"
"strings"
"time"

Expand All @@ -40,11 +41,14 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/expiring"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"

"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

const (
pastLatencyExpireDuration = time.Hour
pastLatencyExpireDuration = time.Hour
maxConcurrentNodesTainting = 5
)

// Actuator is responsible for draining and deleting nodes.
Expand Down Expand Up @@ -176,33 +180,55 @@ func (a *Actuator) deleteAsyncEmpty(NodeGroupViews []*budgets.NodeGroupView, nod
// taintNodesSync synchronously taints all provided nodes with NoSchedule. If tainting fails for any of the nodes, already
// applied taints are cleaned up.
func (a *Actuator) taintNodesSync(NodeGroupViews []*budgets.NodeGroupView) (time.Duration, errors.AutoscalerError) {
var taintedNodes []*apiv1.Node
nodesToTaint := make([]*apiv1.Node, 0)
var updateLatencyTracker *UpdateLatencyTracker
nodeDeleteDelayAfterTaint := a.nodeDeleteDelayAfterTaint
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker = NewUpdateLatencyTracker(a.ctx.AutoscalingKubeClients.ListerRegistry.AllNodeLister())
go updateLatencyTracker.Start()
}

for _, bucket := range NodeGroupViews {
for _, node := range bucket.Nodes {
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.StartTimeChan <- nodeTaintStartTime{node.Name, time.Now()}
}
err := a.taintNode(node)
if err != nil {
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
// Clean up already applied taints in case of issues.
for _, taintedNode := range taintedNodes {
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
close(updateLatencyTracker.AwaitOrStopChan)
}
return nodeDeleteDelayAfterTaint, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", node)
}
taintedNodes = append(taintedNodes, node)
nodesToTaint = append(nodesToTaint, node)
}
}
failedTaintedNodes := make(chan struct {
node *apiv1.Node
err error
}, len(nodesToTaint))
taintedNodes := make(chan *apiv1.Node, len(nodesToTaint))
workqueue.ParallelizeUntil(default_context.Background(), maxConcurrentNodesTainting, len(nodesToTaint), func(piece int) {
node := nodesToTaint[piece]
err := a.taintNode(node)
if err != nil {
failedTaintedNodes <- struct {
node *apiv1.Node
err error
}{node: node, err: err}
} else {
taintedNodes <- node
}
})
close(failedTaintedNodes)
close(taintedNodes)
if len(failedTaintedNodes) > 0 {
for nodeWithError := range failedTaintedNodes {
a.ctx.Recorder.Eventf(nodeWithError.node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", nodeWithError.err)
}
// Clean up already applied taints in case of issues.
for taintedNode := range taintedNodes {
_, _ = taints.CleanToBeDeleted(taintedNode, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
close(updateLatencyTracker.AwaitOrStopChan)
}
return nodeDeleteDelayAfterTaint, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint %d nodes with ToBeDeleted", len(failedTaintedNodes))
}

if a.ctx.AutoscalingOptions.DynamicNodeDeleteDelayAfterTaintEnabled {
updateLatencyTracker.AwaitOrStopChan <- true
latency, ok := <-updateLatencyTracker.ResultChan
Expand Down
10 changes: 9 additions & 1 deletion cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,10 @@ func getStartDeletionTestCases(ignoreDaemonSetsUtilization bool, suffix string)
{toBeDeletedTaint},
{},
},
"test-node-3": {
{toBeDeletedTaint},
{},
},
},
wantErr: cmpopts.AnyError,
},
Expand All @@ -472,6 +476,10 @@ func getStartDeletionTestCases(ignoreDaemonSetsUtilization bool, suffix string)
{toBeDeletedTaint},
{},
},
"atomic-4-node-3": {
{toBeDeletedTaint},
{},
},
},
wantErr: cmpopts.AnyError,
},
Expand Down Expand Up @@ -1048,7 +1056,7 @@ func TestStartDeletion(t *testing.T) {
nodeName string
taints []apiv1.Taint
}
taintUpdates := make(chan nodeTaints, 10)
taintUpdates := make(chan nodeTaints, 20)
deletedNodes := make(chan string, 10)
deletedPods := make(chan string, 10)

Expand Down
Loading