Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add typed errors, add errors_total metric #74

Merged
merged 4 commits into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
Expand All @@ -37,7 +38,7 @@ type AutoscalerOptions struct {
// The configuration can be injected at the creation of an autoscaler
type Autoscaler interface {
// RunOnce represents an iteration in the control-loop of CA
RunOnce(currentTime time.Time)
RunOnce(currentTime time.Time) *errors.AutoscalerError
// CleanUp represents a clean-up required before the first invocation of RunOnce
CleanUp()
// ExitCleanUp is a clean-up performed just before process termination.
Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/dynamic_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
Expand Down Expand Up @@ -56,14 +57,14 @@ func (a *DynamicAutoscaler) ExitCleanUp() {
}

// RunOnce represents a single iteration of a dynamic autoscaler inside the CA's control-loop
func (a *DynamicAutoscaler) RunOnce(currentTime time.Time) {
func (a *DynamicAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError {
reconfigureStart := time.Now()
metrics.UpdateLastTime("reconfigure", reconfigureStart)
if err := a.Reconfigure(); err != nil {
glog.Errorf("Failed to reconfigure : %v", err)
}
metrics.UpdateDuration("reconfigure", reconfigureStart)
a.autoscaler.RunOnce(currentTime)
return a.autoscaler.RunOnce(currentTime)
}

// Reconfigure this dynamic autoscaler if the configmap is updated
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/dynamic_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package core
import (
"github.com/stretchr/testify/mock"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"testing"
"time"
)
Expand All @@ -27,8 +28,9 @@ type AutoscalerMock struct {
mock.Mock
}

func (m *AutoscalerMock) RunOnce(currentTime time.Time) {
func (m *AutoscalerMock) RunOnce(currentTime time.Time) *errors.AutoscalerError {
m.Called(currentTime)
return nil
}

func (m *AutoscalerMock) CleanUp() {
Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/polling_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)

// PollingAutoscaler is a variant of autoscaler which polls the source-of-truth every time RunOnce is invoked
Expand Down Expand Up @@ -48,14 +49,14 @@ func (a *PollingAutoscaler) ExitCleanUp() {
}

// RunOnce represents a single iteration of a polling autoscaler inside the CA's control-loop
func (a *PollingAutoscaler) RunOnce(currentTime time.Time) {
func (a *PollingAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError {
reconfigureStart := time.Now()
metrics.UpdateLastTime("poll", reconfigureStart)
if err := a.Poll(); err != nil {
glog.Errorf("Failed to poll : %v", err)
}
metrics.UpdateDuration("poll", reconfigureStart)
a.autoscaler.RunOnce(currentTime)
return a.autoscaler.RunOnce(currentTime)
}

// Poll latest data from cloud provider to recreate this autoscaler
Expand Down
55 changes: 30 additions & 25 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

"k8s.io/apimachinery/pkg/api/errors"
kube_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_record "k8s.io/client-go/tools/record"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
nodes []*apiv1.Node,
pods []*apiv1.Pod,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget) error {
pdbs []*policyv1.PodDisruptionBudget) *errors.AutoscalerError {

currentlyUnneededNodes := make([]*apiv1.Node, 0)
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, nodes)
Expand Down Expand Up @@ -152,18 +153,18 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}

// Phase2 - check which nodes can be probably removed using fast drain.
nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nodesToRemove, newHints, simulatorErr := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nil, sd.context.PredicateChecker,
len(currentlyUnneededNodes), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if err != nil {
glog.Errorf("Error while simulating node drains: %v", err)
if simulatorErr != nil {
glog.Errorf("Error while simulating node drains: %v", simulatorErr)

sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.nodeUtilizationMap = make(map[string]float64)
sd.context.ClusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)

return fmt.Errorf("error while simulating node drains: %v", err)
return simulatorErr.AddPrefix("error while simulating node drains: ")
}

// Update the timestamp map.
Expand All @@ -190,7 +191,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(

// TryToScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
// removed and error if such occured.
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, error) {
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, *errors.AutoscalerError) {

now := time.Now()
candidates := make([]*apiv1.Node, 0)
Expand Down Expand Up @@ -248,7 +249,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
// to recreate on other nodes.
emptyNodes := getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, sd.context.CloudProvider)
if len(emptyNodes) > 0 {
confirmation := make(chan error, len(emptyNodes))
confirmation := make(chan *errors.AutoscalerError, len(emptyNodes))
for _, node := range emptyNodes {
glog.V(0).Infof("Scale-down: removing empty node %s", node.Name)
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %s", node.Name)
Expand All @@ -267,14 +268,14 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
confirmation <- deleteErr
}(node)
}
var finalError error
var finalError *errors.AutoscalerError

startTime := time.Now()
for range emptyNodes {
timeElapsed := time.Now().Sub(startTime)
timeLeft := MaxCloudProviderNodeDeletionTime - timeElapsed
if timeLeft < 0 {
finalError = fmt.Errorf("Failed to delete nodes in time")
finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time")
break
}
select {
Expand All @@ -284,13 +285,13 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
finalError = err
}
case <-time.After(timeLeft):
finalError = fmt.Errorf("Failed to delete nodes in time")
finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time")
}
}
if finalError == nil {
return ScaleDownNodeDeleted, nil
}
return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError)
return ScaleDownError, finalError.AddPrefix("failed to delete at least one empty node: ")
}

// We look for only 1 node so new hints may be incomplete.
Expand All @@ -299,7 +300,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
sd.podLocationHints, sd.usageTracker, time.Now(), pdbs)

if err != nil {
return ScaleDownError, fmt.Errorf("Find node to remove failed: %v", err)
return ScaleDownError, err.AddPrefix("Find node to remove failed: ")
}
if len(nodesToRemove) == 0 {
glog.V(1).Infof("No node to remove")
Expand All @@ -320,7 +321,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes)
err = deleteNode(sd.context, toRemove.Node, toRemove.PodsToReschedule)
if err != nil {
return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
return ScaleDownError, err.AddPrefix("Failed to delete %s: ", toRemove.Node.Name)
}
if readinessMap[toRemove.Node.Name] {
metrics.RegisterScaleDown(1, metrics.Underutilized)
Expand Down Expand Up @@ -373,7 +374,7 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
return result[:limit]
}

func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) error {
func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) *errors.AutoscalerError {
if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGratefulTerminationSec,
MaxPodEvictionTime, EvictionRetryTime); err != nil {
return err
Expand Down Expand Up @@ -410,13 +411,13 @@ func evictPod(podToEvict *apiv1.Pod, client kube_client.Interface, recorder kube
// Performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving
// them up to MaxGracefulTerminationTime to finish.
func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
maxGratefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) error {
maxGratefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) *errors.AutoscalerError {

drainSuccessful := false
toEvict := len(pods)
if err := deletetaint.MarkToBeDeleted(node, client); err != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return err
return errors.ToAutoscalerError(errors.ApiCallError, err)
}

// If we fail to evict all the pods from the node we want to remove delete taint
Expand Down Expand Up @@ -448,11 +449,13 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
metrics.RegisterEvictions(1)
}
case <-time.After(retryUntil.Sub(time.Now()) + 5*time.Second):
return fmt.Errorf("Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name)
return errors.NewAutoscalerError(
errors.ApiCallError, "Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name)
}
}
if len(evictionErrs) != 0 {
return fmt.Errorf("Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs)
return errors.NewAutoscalerError(
errors.ApiCallError, "Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs)
}

// Evictions created successfully, wait maxGratefulTerminationSec + PodEvictionHeadroom to see if pods really disappeared.
Expand All @@ -466,7 +469,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
allGone = false
break
}
if !errors.IsNotFound(err) {
if !kube_errors.IsNotFound(err) {
glog.Errorf("Failed to check pod %s/%s: %v", pod.Namespace, pod.Name, err)
allGone = false
}
Expand All @@ -478,7 +481,8 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
return nil
}
}
return fmt.Errorf("Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name)
return errors.NewAutoscalerError(
errors.TransientError, "Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name)
}

// cleanToBeDeleted cleans ToBeDeleted taints.
Expand All @@ -499,16 +503,17 @@ func cleanToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorde
// Removes the given node from cloud provider. No extra pre-deletion actions are executed on
// the Kubernetes side.
func deleteNodeFromCloudProvider(node *apiv1.Node, cloudProvider cloudprovider.CloudProvider,
recorder kube_record.EventRecorder, registry *clusterstate.ClusterStateRegistry) error {
recorder kube_record.EventRecorder, registry *clusterstate.ClusterStateRegistry) *errors.AutoscalerError {
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return fmt.Errorf("failed to node group for %s: %v", node.Name, err)
return errors.NewAutoscalerError(
errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return fmt.Errorf("picked node that doesn't belong to a node group: %s", node.Name)
return errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)
}
if err = nodeGroup.DeleteNodes([]*apiv1.Node{node}); err != nil {
return fmt.Errorf("failed to delete %s: %v", node.Name, err)
return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", node.Name, err)
}
recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler")
registry.RegisterScaleDown(&clusterstate.ScaleDownRequest{
Expand Down
22 changes: 15 additions & 7 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.
package core

import (
"fmt"
"time"

"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand All @@ -35,7 +35,7 @@ import (
// false if it didn't and error if an error occured. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node,
daemonSets []*extensionsv1.DaemonSet) (bool, error) {
daemonSets []*extensionsv1.DaemonSet) (bool, *errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
Expand All @@ -49,14 +49,17 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet,
daemonSets, context.PredicateChecker)
if err != nil {
return false, fmt.Errorf("failed to build node infos for node groups: %v", err)
return false, err.AddPrefix("failed to build node infos for node groups: ")
}

upcomingNodes := make([]*schedulercache.NodeInfo, 0)
for nodeGroup, numberOfNodes := range context.ClusterStateRegistry.GetUpcomingNodes() {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
return false, fmt.Errorf("failed to find template node for node group %s", nodeGroup)
return false, errors.NewAutoscalerError(
errors.InternalError,
"failed to find template node for node group %s",
nodeGroup)
}
for i := 0; i < numberOfNodes; i++ {
upcomingNodes = append(upcomingNodes, nodeTemplate)
Expand Down Expand Up @@ -153,7 +156,9 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes

currentSize, err := bestOption.NodeGroup.TargetSize()
if err != nil {
return false, fmt.Errorf("failed to get node group size: %v", err)
return false, errors.NewAutoscalerError(
errors.CloudProviderError,
"failed to get node group size: %v", err)
}
newSize := currentSize + bestOption.NodeCount
if newSize >= bestOption.NodeGroup.MaxSize() {
Expand All @@ -165,14 +170,17 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newSize = context.MaxNodesTotal - len(nodes) + currentSize
if newSize < currentSize {
return false, fmt.Errorf("max node total count already reached")
return false, errors.NewAutoscalerError(
errors.TransientError,
"max node total count already reached")
}
}

glog.V(0).Infof("Scale-up: setting group %s size to %d", bestOption.NodeGroup.Id(), newSize)
increase := newSize - currentSize
if err := bestOption.NodeGroup.IncreaseSize(increase); err != nil {
return false, fmt.Errorf("failed to increase node group size: %v", err)
return false, errors.NewAutoscalerError(
errors.CloudProviderError, "failed to increase node group size: %v", err)
}
context.ClusterStateRegistry.RegisterScaleUp(
&clusterstate.ScaleUpRequest{
Expand Down
Loading