Skip to content

Commit

Permalink
Use typed errors in scale down
Browse files Browse the repository at this point in the history
  • Loading branch information
MaciekPytel committed May 18, 2017
1 parent 6bab869 commit 12cfc2c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 37 deletions.
55 changes: 30 additions & 25 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

"k8s.io/apimachinery/pkg/api/errors"
kube_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_record "k8s.io/client-go/tools/record"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
nodes []*apiv1.Node,
pods []*apiv1.Pod,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget) error {
pdbs []*policyv1.PodDisruptionBudget) *errors.AutoscalerError {

currentlyUnneededNodes := make([]*apiv1.Node, 0)
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, nodes)
Expand Down Expand Up @@ -152,18 +153,18 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}

// Phase2 - check which nodes can be probably removed using fast drain.
nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nodesToRemove, newHints, simulatorErr := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nil, sd.context.PredicateChecker,
len(currentlyUnneededNodes), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if err != nil {
glog.Errorf("Error while simulating node drains: %v", err)
if simulatorErr != nil {
glog.Errorf("Error while simulating node drains: %v", simulatorErr)

sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.nodeUtilizationMap = make(map[string]float64)
sd.context.ClusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)

return fmt.Errorf("error while simulating node drains: %v", err)
return simulatorErr.AddPrefix("error while simulating node drains: ")
}

// Update the timestamp map.
Expand All @@ -190,7 +191,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(

// TryToScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
// removed and error if such occured.
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, error) {
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, *errors.AutoscalerError) {

now := time.Now()
candidates := make([]*apiv1.Node, 0)
Expand Down Expand Up @@ -248,7 +249,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
// to recreate on other nodes.
emptyNodes := getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, sd.context.CloudProvider)
if len(emptyNodes) > 0 {
confirmation := make(chan error, len(emptyNodes))
confirmation := make(chan *errors.AutoscalerError, len(emptyNodes))
for _, node := range emptyNodes {
glog.V(0).Infof("Scale-down: removing empty node %s", node.Name)
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %s", node.Name)
Expand All @@ -267,14 +268,14 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
confirmation <- deleteErr
}(node)
}
var finalError error
var finalError *errors.AutoscalerError

startTime := time.Now()
for range emptyNodes {
timeElapsed := time.Now().Sub(startTime)
timeLeft := MaxCloudProviderNodeDeletionTime - timeElapsed
if timeLeft < 0 {
finalError = fmt.Errorf("Failed to delete nodes in time")
finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time")
break
}
select {
Expand All @@ -284,13 +285,13 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
finalError = err
}
case <-time.After(timeLeft):
finalError = fmt.Errorf("Failed to delete nodes in time")
finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time")
}
}
if finalError == nil {
return ScaleDownNodeDeleted, nil
}
return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError)
return ScaleDownError, finalError.AddPrefix("failed to delete at least one empty node: ")
}

// We look for only 1 node so new hints may be incomplete.
Expand All @@ -299,7 +300,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
sd.podLocationHints, sd.usageTracker, time.Now(), pdbs)

if err != nil {
return ScaleDownError, fmt.Errorf("Find node to remove failed: %v", err)
return ScaleDownError, err.AddPrefix("Find node to remove failed: ")
}
if len(nodesToRemove) == 0 {
glog.V(1).Infof("No node to remove")
Expand All @@ -320,7 +321,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes)
err = deleteNode(sd.context, toRemove.Node, toRemove.PodsToReschedule)
if err != nil {
return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
return ScaleDownError, err.AddPrefix("Failed to delete %s: ", toRemove.Node.Name)
}
if readinessMap[toRemove.Node.Name] {
metrics.RegisterScaleDown(1, metrics.Underutilized)
Expand Down Expand Up @@ -373,7 +374,7 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
return result[:limit]
}

func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) error {
func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) *errors.AutoscalerError {
if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGratefulTerminationSec,
MaxPodEvictionTime, EvictionRetryTime); err != nil {
return err
Expand Down Expand Up @@ -410,13 +411,13 @@ func evictPod(podToEvict *apiv1.Pod, client kube_client.Interface, recorder kube
// Performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving
// them up to MaxGracefulTerminationTime to finish.
func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
maxGratefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) error {
maxGratefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) *errors.AutoscalerError {

drainSuccessful := false
toEvict := len(pods)
if err := deletetaint.MarkToBeDeleted(node, client); err != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return err
return errors.ToAutoscalerError(errors.ApiCallError, err)
}

// If we fail to evict all the pods from the node we want to remove delete taint
Expand Down Expand Up @@ -448,11 +449,13 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
metrics.RegisterEvictions(1)
}
case <-time.After(retryUntil.Sub(time.Now()) + 5*time.Second):
return fmt.Errorf("Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name)
return errors.NewAutoscalerError(
errors.ApiCallError, "Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name)
}
}
if len(evictionErrs) != 0 {
return fmt.Errorf("Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs)
return errors.NewAutoscalerError(
errors.ApiCallError, "Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs)
}

// Evictions created successfully, wait maxGratefulTerminationSec + PodEvictionHeadroom to see if pods really disappeared.
Expand All @@ -466,7 +469,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
allGone = false
break
}
if !errors.IsNotFound(err) {
if !kube_errors.IsNotFound(err) {
glog.Errorf("Failed to check pod %s/%s: %v", pod.Namespace, pod.Name, err)
allGone = false
}
Expand All @@ -478,7 +481,8 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
return nil
}
}
return fmt.Errorf("Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name)
return errors.NewAutoscalerError(
errors.TransientError, "Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name)
}

// cleanToBeDeleted cleans ToBeDeleted taints.
Expand All @@ -499,16 +503,17 @@ func cleanToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorde
// Removes the given node from cloud provider. No extra pre-deletion actions are executed on
// the Kubernetes side.
func deleteNodeFromCloudProvider(node *apiv1.Node, cloudProvider cloudprovider.CloudProvider,
recorder kube_record.EventRecorder, registry *clusterstate.ClusterStateRegistry) error {
recorder kube_record.EventRecorder, registry *clusterstate.ClusterStateRegistry) *errors.AutoscalerError {
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return fmt.Errorf("failed to node group for %s: %v", node.Name, err)
return errors.NewAutoscalerError(
errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return fmt.Errorf("picked node that doesn't belong to a node group: %s", node.Name)
return errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)
}
if err = nodeGroup.DeleteNodes([]*apiv1.Node{node}); err != nil {
return fmt.Errorf("failed to delete %s: %v", node.Name, err)
return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", node.Name, err)
}
recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler")
registry.RegisterScaleDown(&clusterstate.ScaleDownRequest{
Expand Down
21 changes: 10 additions & 11 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerErro
glog.V(4).Infof("Calculating unneeded nodes")

scaleDown.CleanUp(time.Now())
err = scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now(), pdbs)
if err != nil {
glog.Warningf("Failed to scale down: %v", err)
// TODO(maciekpytel): temporary hack, fix this
return nil
typedErr := scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now(), pdbs)
if typedErr != nil {
glog.Errorf("Failed to scale down: %v", typedErr)
return typedErr
}

metrics.UpdateDuration("findUnneeded", unneededStart)
Expand All @@ -283,16 +282,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerErro

scaleDownStart := time.Now()
metrics.UpdateLastTime("scaleDown", scaleDownStart)
result, err := scaleDown.TryToScaleDown(allNodes, allScheduled, pdbs)
result, typedErr := scaleDown.TryToScaleDown(allNodes, allScheduled, pdbs)
metrics.UpdateDuration("scaleDown", scaleDownStart)

// TODO: revisit result handling
if err != nil {
if typedErr != nil {
glog.Errorf("Failed to scale down: %v", err)
} else {
if result == ScaleDownError || result == ScaleDownNoNodeDeleted {
a.lastScaleDownFailedTrial = time.Now()
}
return typedErr
}
if result == ScaleDownError || result == ScaleDownNoNodeDeleted {
a.lastScaleDownFailedTrial = time.Now()
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
policyv1 "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
Expand Down Expand Up @@ -58,7 +59,7 @@ func FindNodesToRemove(candidates []*apiv1.Node, allNodes []*apiv1.Node, pods []
fastCheck bool, oldHints map[string]string, usageTracker *UsageTracker,
timestamp time.Time,
podDisruptionBudgets []*policyv1.PodDisruptionBudget,
) (nodesToRemove []NodeToBeRemoved, podReschedulingHints map[string]string, finalError error) {
) (nodesToRemove []NodeToBeRemoved, podReschedulingHints map[string]string, finalError *errors.AutoscalerError) {

nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, allNodes)
result := make([]NodeToBeRemoved, 0)
Expand Down

0 comments on commit 12cfc2c

Please sign in to comment.