Skip to content

Commit

Permalink
Merge pull request kubernetes#4970 from MaciekPytel/estimation_limiter
Browse files Browse the repository at this point in the history
Binpacking can exit without packing all the pods
  • Loading branch information
MaciekPytel authored and lrouquette committed Dec 15, 2022
1 parent a28edd0 commit 2bfc51b
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 82 deletions.
21 changes: 21 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,25 @@ type AutoscalingOptions struct {
DaemonSetEvictionForOccupiedNodes bool
// User agent to use for HTTP calls.
UserAgent string
// InitialNodeGroupBackoffDuration is the duration of first backoff after a new node failed to start
InitialNodeGroupBackoffDuration time.Duration
// MaxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start.
MaxNodeGroupBackoffDuration time.Duration
// NodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.
NodeGroupBackoffResetTimeout time.Duration
// MaxScaleDownParallelism is the maximum number of nodes (both empty and needing drain) that can be deleted in parallel.
MaxScaleDownParallelism int
// MaxDrainParallelism is the maximum number of nodes needing drain, that can be drained and deleted in parallel.
MaxDrainParallelism int
// GceExpanderEphemeralStorageSupport is whether scale-up takes ephemeral storage resources into account.
GceExpanderEphemeralStorageSupport bool
// RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window.
RecordDuplicatedEvents bool
// MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up.
// Note that this is strictly a performance optimization aimed at limiting binpacking time, not a tool to rate-limit
// scale-up. There is nothing stopping CA from adding MaxNodesPerScaleUp every loop.
MaxNodesPerScaleUp int
// MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold
// is exceeded binpacking will be cut short and a partial scale-up will be performed.
MaxNodeGroupBinpackingDuration time.Duration
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.ExpanderStrategy = expanderStrategy
}
if opts.EstimatorBuilder == nil {
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName)
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func NewScaleTestAutoscalingContext(
}
// Ignoring error here is safe - if a test doesn't specify valid estimatorName,
// it either doesn't need one, or should fail when it turns out to be nil.
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName)
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0))
predicateChecker, err := simulator.NewTestPredicateChecker()
if err != nil {
return context.AutoscalingContext{}, err
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG

if len(option.Pods) > 0 {
estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot)
option.NodeCount = estimator.Estimate(option.Pods, nodeInfo)
option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup)
}

return option, nil
Expand Down
75 changes: 56 additions & 19 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
klog "k8s.io/klog/v2"
Expand All @@ -38,15 +39,18 @@ type podInfo struct {
type BinpackingNodeEstimator struct {
predicateChecker simulator.PredicateChecker
clusterSnapshot simulator.ClusterSnapshot
limiter EstimationLimiter
}

// NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator.
func NewBinpackingNodeEstimator(
predicateChecker simulator.PredicateChecker,
clusterSnapshot simulator.ClusterSnapshot) *BinpackingNodeEstimator {
clusterSnapshot simulator.ClusterSnapshot,
limiter EstimationLimiter) *BinpackingNodeEstimator {
return &BinpackingNodeEstimator{
predicateChecker: predicateChecker,
clusterSnapshot: clusterSnapshot,
limiter: limiter,
}
}

Expand All @@ -57,69 +61,102 @@ func NewBinpackingNodeEstimator(
// still be maintained.
// It is assumed that all pods from the given list can fit to nodeTemplate.
// Returns the number of nodes needed to accommodate all pods from the list.
func (estimator *BinpackingNodeEstimator) Estimate(
func (e *BinpackingNodeEstimator) Estimate(
pods []*apiv1.Pod,
nodeTemplate *schedulerframework.NodeInfo) int {
nodeTemplate *schedulerframework.NodeInfo,
nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) {

e.limiter.StartEstimation(pods, nodeGroup)
defer e.limiter.EndEstimation()

podInfos := calculatePodScore(pods, nodeTemplate)
sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score })

newNodeNames := make(map[string]bool)
newNodesWithPods := make(map[string]bool)

if err := estimator.clusterSnapshot.Fork(); err != nil {
if err := e.clusterSnapshot.Fork(); err != nil {
klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err)
return 0
return 0, nil
}
defer func() {
if err := estimator.clusterSnapshot.Revert(); err != nil {
if err := e.clusterSnapshot.Revert(); err != nil {
klog.Fatalf("Error while calling ClusterSnapshot.Revert; %v", err)
}
}()

newNodeNameIndex := 0
scheduledPods := []*apiv1.Pod{}
lastNodeName := ""

for _, podInfo := range podInfos {
found := false

nodeName, err := estimator.predicateChecker.FitsAnyNodeMatching(estimator.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
return newNodeNames[nodeInfo.Node().Name]
})
if err == nil {
found = true
if err := estimator.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil {
if err := e.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, nodeName, err)
return 0
return 0, nil
}
scheduledPods = append(scheduledPods, podInfo.pod)
newNodesWithPods[nodeName] = true
}

if !found {
// Stop binpacking if we reach the limit of nodes we can add.
// We return the result of the binpacking that we already performed.
if !e.limiter.PermissionToAddNode() {
break
}

// If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule
// on a new node either. There is no point adding more nodes to snapshot in such case, especially because of
// performance cost each extra node adds to future FitsAnyNodeMatching calls.
if lastNodeName != "" && !newNodesWithPods[lastNodeName] {
continue
}

// Add new node
newNodeName, err := estimator.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
if err != nil {
klog.Errorf("Error while adding new node for template to ClusterSnapshot; %v", err)
return 0
return 0, nil
}
newNodeNameIndex++
// And schedule pod to it
if err := estimator.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil {
newNodeNames[newNodeName] = true
lastNodeName = newNodeName

// And try to schedule pod to it.
// Note that this may still fail (ex. if topology spreading with zonal topologyKey is used);
// in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid
// adding and removing node to snapshot for each such pod.
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil {
continue
}
if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err)
return 0
return 0, nil
}
newNodeNames[newNodeName] = true
newNodesWithPods[newNodeName] = true
scheduledPods = append(scheduledPods, podInfo.pod)
}
}
return len(newNodeNames)
return len(newNodesWithPods), scheduledPods
}

func (estimator *BinpackingNodeEstimator) addNewNodeToSnapshot(
func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
template *schedulerframework.NodeInfo,
nameIndex int) (string, error) {

newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("estimator-%d", nameIndex))
newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", nameIndex))
var pods []*apiv1.Pod
for _, podInfo := range newNodeInfo.Pods {
pods = append(pods, podInfo.Pod)
}
if err := estimator.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
return "", err
}
return newNodeInfo.Node().Name, nil
Expand Down
Loading

0 comments on commit 2bfc51b

Please sign in to comment.