Skip to content

Commit

Permalink
Merge pull request #87 from atlassian-labs/vportella/fix-node-state-p…
Browse files Browse the repository at this point in the history
…ending-phase

Fix node state in the Pending phase before cycling
  • Loading branch information
vincentportella authored Aug 26, 2024
2 parents 65b577a + 2971e21 commit 44ce978
Show file tree
Hide file tree
Showing 14 changed files with 675 additions and 172 deletions.
2 changes: 1 addition & 1 deletion docs/cycling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The CycleNodeRequest CRD handles a request to cycle nodes belonging to a specifi

2. Validate the CycleNodeRequest object's parameters, and if valid, transition the object to **Pending**.

3. In the **Pending** phase, store the nodes that will need to be cycled so we can keep track of them. Describe the node group in the cloud provider and check it to ensure it matches the nodes in Kubernetes. It will wait for a brief period for the nodes to match, in case the cluster has just scaled up or down. Transition the object to **Initialised**.
3. In the **Pending** phase, store the nodes that will need to be cycled so we can keep track of them. Describe the node group in the cloud provider and check it to ensure it matches the nodes in Kubernetes. It will wait for a brief period and proactively clean up any orphaned node objects, re-attach any instances that have been detached from the cloud provider node group, and then wait for the nodes to match in case the cluster has just scaled up or down. Transition the object to **Initialised**.

4. In the **Initialised** phase, detach a number of nodes (governed by the concurrency of the CycleNodeRequest) from the node group. This will trigger the cloud provider to add replacement nodes for each. Transition the object to **ScalingUp**. If there are no more nodes to cycle then transition to **Successful**.

Expand Down
17 changes: 11 additions & 6 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func (p *provider) GetNodeGroups(names []string) (cloudprovider.NodeGroups, erro
}

// InstancesExist returns a list of the instances that exist
func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []string, err error) {
instanceIDSet := map[string]string{}
instanceIDs := []string{}
func (p *provider) InstancesExist(providerIDs []string) (map[string]interface{}, error) {
validProviderIDs := make(map[string]interface{})
instanceIDSet := make(map[string]string)
instanceIDs := make([]string, 0)

for _, providerID := range providerIDs {
instanceID, err := providerIDToInstanceID(providerID)
Expand All @@ -140,8 +141,12 @@ func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []stri

for _, reservation := range output.Reservations {
for _, instance := range reservation.Instances {
if *instance.State.Name == ec2.InstanceStateNameTerminated {
continue
}

if providerID, ok := instanceIDSet[aws.StringValue(instance.InstanceId)]; ok {
validProviderIDs = append(validProviderIDs, providerID)
validProviderIDs[providerID] = nil
}
}
}
Expand Down Expand Up @@ -190,7 +195,7 @@ func (a *autoscalingGroups) ReadyInstances() map[string]cloudprovider.Instance {
instances := make(map[string]cloudprovider.Instance)
for _, group := range a.groups {
for _, i := range group.Instances {
if aws.StringValue(i.LifecycleState) != "InService" {
if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService {
continue
}
providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), *i.AvailabilityZone)
Expand All @@ -214,7 +219,7 @@ func (a *autoscalingGroups) NotReadyInstances() map[string]cloudprovider.Instanc
instances := make(map[string]cloudprovider.Instance)
for _, group := range a.groups {
for _, i := range group.Instances {
if aws.StringValue(i.LifecycleState) != "InService" {
if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService {
providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), aws.StringValue(i.AvailabilityZone))
if err != nil {
a.logger.Info("[NotReadyInstances] skip instance which failed instanceID to providerID conversion: %v", aws.StringValue(i.InstanceId))
Expand Down
29 changes: 28 additions & 1 deletion pkg/cloudprovider/aws/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func generateAutoscalingInstance(instance *Instance) *autoscaling.Instance {
autoscalingInstance := &autoscaling.Instance{
InstanceId: aws.String(instance.InstanceID),
AvailabilityZone: aws.String(defaultAvailabilityZone),
LifecycleState: aws.String(autoscaling.LifecycleStateInService),
}

return autoscalingInstance
Expand All @@ -76,7 +77,13 @@ func (m *Autoscaling) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoS
continue
}

if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists {
if instance.State != ec2.InstanceStateNameRunning {
continue
}

// Ensure to continue if the ASG name matching one of the ones from the
// input. If the input is empty then match all ASGs
if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists && len(asgNameLookup) > 0 {
continue
}

Expand Down Expand Up @@ -121,6 +128,16 @@ func (m *Autoscaling) AttachInstances(input *autoscaling.AttachInstancesInput) (
return &autoscaling.AttachInstancesOutput{}, nil
}

func (m *Autoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.AutoscalingGroupName = ""
}
}

return &autoscaling.DetachInstancesOutput{}, nil
}

// *************** EC2 *************** //

func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
Expand Down Expand Up @@ -150,3 +167,13 @@ func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.Describ
},
}, nil
}

func (m *Ec2) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.State = ec2.InstanceStateNameTerminated
}
}

return &ec2.TerminateInstancesOutput{}, nil
}
2 changes: 1 addition & 1 deletion pkg/cloudprovider/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cloudprovider
// CloudProvider provides an interface to interact with a cloud provider, e.g. AWS, GCP etc.
type CloudProvider interface {
Name() string
InstancesExist([]string) ([]string, error)
InstancesExist([]string) (map[string]interface{}, error)
GetNodeGroups([]string) (NodeGroups, error)
TerminateInstance(string) error
}
Expand Down
62 changes: 27 additions & 35 deletions pkg/controller/cyclenoderequest/transitioner/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package transitioner
import (
"fmt"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"

v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1"
"github.com/atlassian-labs/cyclops/pkg/cloudprovider"
)

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
// listNodes lists nodes matching the node selector. By default lists nodes that have also
// not been touched by Cyclops. A label is used to determine whether nodes have been touched
// by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes := make(map[string]corev1.Node)

// Get the nodes
Expand All @@ -36,13 +35,32 @@ func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (m
}
}

// Only add "Ready" nodes
nodes[node.Spec.ProviderID] = node
}

return nodes, nil
}

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes, err := t.listNodes(includeInProgress)
if err != nil {
return nil, err
}

for providerID, node := range nodes {
nodeReady := false

for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
nodes[node.Spec.ProviderID] = node
break
nodeReady = true
}
}

if !nodeReady {
delete(nodes, providerID)
}
}

return nodes, nil
Expand Down Expand Up @@ -146,7 +164,7 @@ func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[st
// specified in the CNR. These are two separate sets and the contents of one does not affect the
// contents of the other.
func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[string]corev1.Node, cloudProviderInstances map[string]cloudprovider.Instance, err error) {
kubeNodes, err = t.listReadyNodes(true)
kubeNodes, err = t.listNodes(true)
if err != nil {
return kubeNodes, cloudProviderInstances, err
}
Expand All @@ -155,32 +173,6 @@ func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[str
return kubeNodes, cloudProviderInstances, fmt.Errorf("no nodes matched selector")
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string

for _, node := range kubeNodes {
nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID)
}

existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs)
if err != nil {
return kubeNodes, cloudProviderInstances, errors.Wrap(err, "failed to check instances that exist from cloud provider")
}

existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("no existing nodes in cloud provider matched selector")
}

nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames()

// Describe the node group for the request
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/cyclenoderequest/transitioner/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ type Option func(t *Transitioner)

func WithCloudProviderInstances(nodes []*mock.Node) Option {
return func(t *Transitioner) {
t.cloudProviderInstances = append(t.cloudProviderInstances, nodes...)
t.CloudProviderInstances = append(t.CloudProviderInstances, nodes...)
}
}

func WithKubeNodes(nodes []*mock.Node) Option {
return func(t *Transitioner) {
t.kubeNodes = append(t.kubeNodes, nodes...)
t.KubeNodes = append(t.KubeNodes, nodes...)
}
}

Expand All @@ -28,23 +28,23 @@ type Transitioner struct {
*CycleNodeRequestTransitioner
*mock.Client

cloudProviderInstances []*mock.Node
kubeNodes []*mock.Node
CloudProviderInstances []*mock.Node
KubeNodes []*mock.Node
}

func NewFakeTransitioner(cnr *v1.CycleNodeRequest, opts ...Option) *Transitioner {
t := &Transitioner{
// By default there are no nodes and each test will
// override these as needed
cloudProviderInstances: make([]*mock.Node, 0),
kubeNodes: make([]*mock.Node, 0),
CloudProviderInstances: make([]*mock.Node, 0),
KubeNodes: make([]*mock.Node, 0),
}

for _, opt := range opts {
opt(t)
}

t.Client = mock.NewClient(t.kubeNodes, t.cloudProviderInstances, cnr)
t.Client = mock.NewClient(t.KubeNodes, t.CloudProviderInstances, cnr)

rm := &controller.ResourceManager{
Client: t.K8sClient,
Expand Down
Loading

0 comments on commit 44ce978

Please sign in to comment.