Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/atlassian-labs/cyclops in…
Browse files Browse the repository at this point in the history
…to vportella/add-max-failed-cnrs-threshold
  • Loading branch information
vincentportella committed Aug 26, 2024
2 parents 8fc6b65 + 44ce978 commit bffa3ce
Show file tree
Hide file tree
Showing 13 changed files with 711 additions and 130 deletions.
2 changes: 1 addition & 1 deletion docs/cycling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The CycleNodeRequest CRD handles a request to cycle nodes belonging to a specifi

2. Validate the CycleNodeRequest object's parameters, and if valid, transition the object to **Pending**.

3. In the **Pending** phase, store the nodes that will need to be cycled so we can keep track of them. Describe the node group in the cloud provider and check it to ensure it matches the nodes in Kubernetes. It will wait for a brief period for the nodes to match, in case the cluster has just scaled up or down. Transition the object to **Initialised**.
3. In the **Pending** phase, store the nodes that will need to be cycled so we can keep track of them. Describe the node group in the cloud provider and check it to ensure it matches the nodes in Kubernetes. It will wait for a brief period and proactively clean up any orphaned node objects, re-attach any instances that have been detached from the cloud provider node group, and then wait for the nodes to match in case the cluster has just scaled up or down. Transition the object to **Initialised**.

4. In the **Initialised** phase, detach a number of nodes (governed by the concurrency of the CycleNodeRequest) from the node group. This will trigger the cloud provider to add replacement nodes for each. Transition the object to **ScalingUp**. If there are no more nodes to cycle then transition to **Successful**.

Expand Down
17 changes: 11 additions & 6 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func (p *provider) GetNodeGroups(names []string) (cloudprovider.NodeGroups, erro
}

// InstancesExist returns a list of the instances that exist
func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []string, err error) {
instanceIDSet := map[string]string{}
instanceIDs := []string{}
func (p *provider) InstancesExist(providerIDs []string) (map[string]interface{}, error) {
validProviderIDs := make(map[string]interface{})
instanceIDSet := make(map[string]string)
instanceIDs := make([]string, 0)

for _, providerID := range providerIDs {
instanceID, err := providerIDToInstanceID(providerID)
Expand All @@ -140,8 +141,12 @@ func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []stri

for _, reservation := range output.Reservations {
for _, instance := range reservation.Instances {
if *instance.State.Name == ec2.InstanceStateNameTerminated {
continue
}

if providerID, ok := instanceIDSet[aws.StringValue(instance.InstanceId)]; ok {
validProviderIDs = append(validProviderIDs, providerID)
validProviderIDs[providerID] = nil
}
}
}
Expand Down Expand Up @@ -190,7 +195,7 @@ func (a *autoscalingGroups) ReadyInstances() map[string]cloudprovider.Instance {
instances := make(map[string]cloudprovider.Instance)
for _, group := range a.groups {
for _, i := range group.Instances {
if aws.StringValue(i.LifecycleState) != "InService" {
if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService {
continue
}
providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), *i.AvailabilityZone)
Expand All @@ -214,7 +219,7 @@ func (a *autoscalingGroups) NotReadyInstances() map[string]cloudprovider.Instanc
instances := make(map[string]cloudprovider.Instance)
for _, group := range a.groups {
for _, i := range group.Instances {
if aws.StringValue(i.LifecycleState) != "InService" {
if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService {
providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), aws.StringValue(i.AvailabilityZone))
if err != nil {
a.logger.Info("[NotReadyInstances] skip instance which failed instanceID to providerID conversion: %v", aws.StringValue(i.InstanceId))
Expand Down
29 changes: 28 additions & 1 deletion pkg/cloudprovider/aws/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func generateAutoscalingInstance(instance *Instance) *autoscaling.Instance {
autoscalingInstance := &autoscaling.Instance{
InstanceId: aws.String(instance.InstanceID),
AvailabilityZone: aws.String(defaultAvailabilityZone),
LifecycleState: aws.String(autoscaling.LifecycleStateInService),
}

return autoscalingInstance
Expand All @@ -76,7 +77,13 @@ func (m *Autoscaling) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoS
continue
}

if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists {
if instance.State != ec2.InstanceStateNameRunning {
continue
}

// Ensure to continue if the ASG name matching one of the ones from the
// input. If the input is empty then match all ASGs
if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists && len(asgNameLookup) > 0 {
continue
}

Expand Down Expand Up @@ -121,6 +128,16 @@ func (m *Autoscaling) AttachInstances(input *autoscaling.AttachInstancesInput) (
return &autoscaling.AttachInstancesOutput{}, nil
}

func (m *Autoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.AutoscalingGroupName = ""
}
}

return &autoscaling.DetachInstancesOutput{}, nil
}

// *************** EC2 *************** //

func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
Expand Down Expand Up @@ -150,3 +167,13 @@ func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.Describ
},
}, nil
}

func (m *Ec2) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.State = ec2.InstanceStateNameTerminated
}
}

return &ec2.TerminateInstancesOutput{}, nil
}
2 changes: 1 addition & 1 deletion pkg/cloudprovider/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cloudprovider
// CloudProvider provides an interface to interact with a cloud provider, e.g. AWS, GCP etc.
type CloudProvider interface {
Name() string
InstancesExist([]string) ([]string, error)
InstancesExist([]string) (map[string]interface{}, error)
GetNodeGroups([]string) (NodeGroups, error)
TerminateInstance(string) error
}
Expand Down
62 changes: 27 additions & 35 deletions pkg/controller/cyclenoderequest/transitioner/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package transitioner
import (
"fmt"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"

v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1"
"github.com/atlassian-labs/cyclops/pkg/cloudprovider"
)

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
// listNodes lists nodes matching the node selector. By default lists nodes that have also
// not been touched by Cyclops. A label is used to determine whether nodes have been touched
// by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes := make(map[string]corev1.Node)

// Get the nodes
Expand All @@ -36,13 +35,32 @@ func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (m
}
}

// Only add "Ready" nodes
nodes[node.Spec.ProviderID] = node
}

return nodes, nil
}

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes, err := t.listNodes(includeInProgress)
if err != nil {
return nil, err
}

for providerID, node := range nodes {
nodeReady := false

for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
nodes[node.Spec.ProviderID] = node
break
nodeReady = true
}
}

if !nodeReady {
delete(nodes, providerID)
}
}

return nodes, nil
Expand Down Expand Up @@ -146,7 +164,7 @@ func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[st
// specified in the CNR. These are two separate sets and the contents of one does not affect the
// contents of the other.
func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[string]corev1.Node, cloudProviderInstances map[string]cloudprovider.Instance, err error) {
kubeNodes, err = t.listReadyNodes(true)
kubeNodes, err = t.listNodes(true)
if err != nil {
return kubeNodes, cloudProviderInstances, err
}
Expand All @@ -155,32 +173,6 @@ func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[str
return kubeNodes, cloudProviderInstances, fmt.Errorf("no nodes matched selector")
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string

for _, node := range kubeNodes {
nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID)
}

existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs)
if err != nil {
return kubeNodes, cloudProviderInstances, errors.Wrap(err, "failed to check instances that exist from cloud provider")
}

existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("no existing nodes in cloud provider matched selector")
}

nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames()

// Describe the node group for the request
Expand Down
104 changes: 46 additions & 58 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,13 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Start the equilibrium wait timer, if this times out as the set of nodes in kube and
// the cloud provider is not considered valid, then transition to the Healing phase as
// cycling should not proceed.
timedOut, err := t.equilibriumWaitTimedOut()
if err != nil {
// Start the equilibrium wait timer, if this times out then the set of nodes in kube and
// the cloud provider is not considered valid. Transition to the Healing phase as cycling
// should not proceed.
if err := t.errorIfEquilibriumTimeoutReached(); err != nil {
return t.transitionToHealing(err)
}

if timedOut {
return t.transitionToHealing(fmt.Errorf(
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
))
}

// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

Expand All @@ -95,50 +87,47 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
// The nodes in the cloud provider can either not exist or be detached from one of the nodegroups
// and this will be determined when dealt with. This is an XOR condition on the two initial sets
// of nodes.
nodesNotInCloudProvider, nodesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances)

// Do some sanity checking before we start filtering things
// Check the instance count of the node group matches the number of nodes found in Kubernetes
if len(nodesNotInCloudProvider) > 0 || len(nodesNotInKube) > 0 {
var offendingNodesInfo string

if len(nodesNotInCloudProvider) > 0 {
providerIDs := make([]string, 0)

for providerID := range nodesNotInCloudProvider {
providerIDs = append(providerIDs,
fmt.Sprintf("id %q", providerID),
)
}

offendingNodesInfo += "nodes not in node group: "
offendingNodesInfo += strings.Join(providerIDs, ",")
nodesNotInCloudProviderNodegroup, instancesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances)

// If the node state isn't correct then go through and attempt to fix it. The steps in this block
// attempt to fix the node state and then requeues the Pending phase to re-check. It is very
// possible that the node state changes during the steps and it cannot be fixed. Hopefully after
// a few runs the state can be fixed.
if len(nodesNotInCloudProviderNodegroup) > 0 || len(instancesNotInKube) > 0 {
t.logProblemNodes(nodesNotInCloudProviderNodegroup, instancesNotInKube)

// Try to fix the case where there are 1 or more instances matching the node selector for the
// nodegroup in kube but are not attached to the nodegroup in the cloud provider by
// re-attaching them.
if err := t.reattachAnyDetachedInstances(nodesNotInCloudProviderNodegroup); err != nil {
return t.transitionToHealing(err)
}

if len(nodesNotInKube) > 0 {
if offendingNodesInfo != "" {
offendingNodesInfo += ";"
}

providerIDs := make([]string, 0)

for providerID, node := range nodesNotInKube {
providerIDs = append(providerIDs,
fmt.Sprintf("id %q in %q", providerID, node.NodeGroupName()),
)
}

offendingNodesInfo += "nodes not inside cluster: "
offendingNodesInfo += strings.Join(providerIDs, ",")
// Try to fix the case where there are 1 or more kube node objects without any matching
// running instances in the cloud provider. This could be because of the finalizer that
// was added during a previous failed cycle.
if err := t.deleteAnyOrphanedKubeNodes(nodesNotInCloudProviderNodegroup); err != nil {
return t.transitionToHealing(err)
}

t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch",
"node group: %v, kube: %v. %v",
len(validNodeGroupInstances), len(validKubeNodes), offendingNodesInfo)
// After working through these attempts, requeue to run through the Pending phase from the
// beginning to check the full state of nodes again. If there are any problem nodes we should
// not proceed and keep requeuing until the state is fixed or the timeout has been reached.
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}

valid, err := t.validateInstanceState(validNodeGroupInstances)
if err != nil {
return t.transitionToHealing(err)
}

if !valid {
t.rm.Logger.Info("instance state not valid, requeuing")
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}

t.rm.Logger.Info("instance state valid, proceeding")

// make a list of the nodes to terminate
if len(t.cycleNodeRequest.Spec.NodeNames) > 0 {
// If specific node names are provided, check they actually exist in the node group
Expand Down Expand Up @@ -281,6 +270,14 @@ func (t *CycleNodeRequestTransitioner) transitionInitialised() (reconcile.Result
return t.transitionToHealing(err)
}

t.rm.Logger.Info("Adding annotation to node", "node", node.Name)

// Add the nodegroup annotation to the node before detaching it
if err := t.rm.AddNodegroupAnnotationToNode(node.Name, node.NodeGroupName); err != nil {
t.rm.LogEvent(t.cycleNodeRequest, "AddAnnotationToNodeError", err.Error())
return t.transitionToHealing(err)
}

alreadyDetaching, err := nodeGroups.DetachInstance(node.ProviderID)

if alreadyDetaching {
Expand Down Expand Up @@ -345,16 +342,7 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result,

// Increase the kubeNode count requirement by the number of nodes which are observed to have been removed prematurely
for _, node := range t.cycleNodeRequest.Status.CurrentNodes {
var instanceFound bool = false

for _, kubeNode := range kubeNodes {
if node.Name == kubeNode.Name {
instanceFound = true
break
}
}

if !instanceFound {
if _, instanceFound := kubeNodes[node.ProviderID]; !instanceFound {
nodesToRemove = append(nodesToRemove, node)
numKubeNodesReady++
}
Expand Down
Loading

0 comments on commit bffa3ce

Please sign in to comment.