Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip named nodes in the CNR that don't exist #83

Merged
merged 8 commits into from
Aug 19, 2024
4 changes: 2 additions & 2 deletions pkg/controller/cyclenoderequest/transitioner/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (t *CycleNodeRequestTransitioner) performHealthCheck(node v1.CycleNodeReque

// performInitialHealthChecks on the nodes selected to be terminated before cycling begin. If any health
// check fails return an error to prevent cycling from starting
func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes []corev1.Node) error {
func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes map[string]corev1.Node) error {
// Build a set of ready nodes from which to check below
readyNodesSet := make(map[string]v1.CycleNodeRequestNode)

Expand Down Expand Up @@ -241,7 +241,7 @@ func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes []co

// performCyclingHealthChecks before terminating an instance selected for termination. Cycling pauses
// until all health checks pass for the new instance before terminating the old one
func (t *CycleNodeRequestTransitioner) performCyclingHealthChecks(kubeNodes []corev1.Node) (bool, error) {
func (t *CycleNodeRequestTransitioner) performCyclingHealthChecks(kubeNodes map[string]corev1.Node) (bool, error) {
var allHealthChecksPassed bool = true

// Find new instsances attached to the nodegroup and perform health checks on them
Expand Down
87 changes: 48 additions & 39 deletions pkg/controller/cyclenoderequest/transitioner/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (nodes []corev1.Node, err error) {
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes := make(map[string]corev1.Node)

// Get the nodes
selector, err := t.cycleNodeRequest.NodeLabelSelector()
if err != nil {
return nodes, err
}

nodeList, err := t.rm.ListNodes(selector)
if err != nil {
return nodes, err
Expand All @@ -30,14 +33,16 @@ func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (n
continue
}
}

// Only add "Ready" nodes
for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
nodes = append(nodes, node)
nodes[node.Spec.ProviderID] = node
break
}
}
}

return nodes, nil
}

Expand All @@ -56,29 +61,34 @@ func (t *CycleNodeRequestTransitioner) getNodesToTerminate(numNodes int64) (node
}

for _, kubeNode := range kubeNodes {
// Skip nodes that are already being worked on so we don't duplicate our work
if value, ok := kubeNode.Labels[cycleNodeLabel]; ok && value == t.cycleNodeRequest.Name {
numNodesInProgress++
}
}

for _, nodeToTerminate := range t.cycleNodeRequest.Status.NodesToTerminate {
kubeNode, found := kubeNodes[nodeToTerminate.ProviderID]

if !found {
continue
}

for _, nodeToTerminate := range t.cycleNodeRequest.Status.NodesToTerminate {
// Add nodes that need to be terminated but have not yet been actioned
if kubeNode.Name == nodeToTerminate.Name && kubeNode.Spec.ProviderID == nodeToTerminate.ProviderID {
nodes = append(nodes, &kubeNode)
// Skip nodes that are already being worked on so we don't duplicate our work
if value, ok := kubeNode.Labels[cycleNodeLabel]; ok && value == t.cycleNodeRequest.Name {
continue
}

for i := 0; i < len(t.cycleNodeRequest.Status.NodesAvailable); i++ {
if kubeNode.Name == t.cycleNodeRequest.Status.NodesAvailable[i].Name {
// Remove nodes from available if they are also scheduled for termination
// Slice syntax removes this node at `i` from the array
t.cycleNodeRequest.Status.NodesAvailable = append(
t.cycleNodeRequest.Status.NodesAvailable[:i],
t.cycleNodeRequest.Status.NodesAvailable[i+1:]...,
)
// Add nodes that need to be terminated but have not yet been actioned
nodes = append(nodes, &kubeNode)

break
}
}
for i := 0; i < len(t.cycleNodeRequest.Status.NodesAvailable); i++ {
if kubeNode.Name == t.cycleNodeRequest.Status.NodesAvailable[i].Name {
// Remove nodes from available if they are also scheduled for termination
// Slice syntax removes this node at `i` from the array
t.cycleNodeRequest.Status.NodesAvailable = append(
t.cycleNodeRequest.Status.NodesAvailable[:i],
t.cycleNodeRequest.Status.NodesAvailable[i+1:]...,
)

break
}
Expand All @@ -94,33 +104,32 @@ func (t *CycleNodeRequestTransitioner) getNodesToTerminate(numNodes int64) (node
}

// addNamedNodesToTerminate adds the named nodes for this CycleNodeRequest to the list of nodes to terminate.
// Returns an error if any named node does not exist in the node group for this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes []corev1.Node, nodeGroupInstances map[string]cloudprovider.Instance) error {
for _, namedNode := range t.cycleNodeRequest.Spec.NodeNames {
foundNode := false
for _, kubeNode := range kubeNodes {
if kubeNode.Name == namedNode {
foundNode = true
// Skips any named node that does not exist in the node group for this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[string]corev1.Node, nodeGroupInstances map[string]cloudprovider.Instance) {
kubeNodesMap := make(map[string]corev1.Node)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: name this after the function of the variable. Something like nodeLookupByName. I kept trying to find it being used for something more than a lookup.

Copy link
Member Author

@vincentportella vincentportella Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, there may be other people running Cyclops who are relying on the existing behaviour.

Good point, I initially intended for this to be a breaking change but now that I think about it making it configurable may be the better approach to cater to different use-cases.

👍 I'll add some tests to validate the intended behaviour.


t.cycleNodeRequest.Status.NodesAvailable = append(
t.cycleNodeRequest.Status.NodesAvailable,
newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()),
)
for _, node := range kubeNodes {
kubeNodesMap[node.Name] = node
}

t.cycleNodeRequest.Status.NodesToTerminate = append(
t.cycleNodeRequest.Status.NodesToTerminate,
newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()),
)
for _, namedNode := range t.cycleNodeRequest.Spec.NodeNames {
kubeNode, found := kubeNodesMap[namedNode]

break
}
if !found {
t.rm.Logger.Info("could not find node by name, skipping", "nodeName", namedNode)
continue
}

if !foundNode {
return fmt.Errorf("could not find node by name: %v", namedNode)
}
t.cycleNodeRequest.Status.NodesAvailable = append(
t.cycleNodeRequest.Status.NodesAvailable,
newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()),
)

t.cycleNodeRequest.Status.NodesToTerminate = append(
t.cycleNodeRequest.Status.NodesToTerminate,
newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()),
)
}
return nil
}

// newCycleNodeRequestNode converts a corev1.Node to a v1.CycleNodeRequestNode. This is done multiple
Expand Down
45 changes: 24 additions & 21 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,11 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

kubeNodes, err := t.listReadyNodes(true)
if err != nil {
return t.transitionToHealing(err)
}
if len(kubeNodes) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're losing the ability to detect user error here. Is there a way we can keep user error detection, while also not failing at nonexistent nodes?

One strategy could be: if our selector did match some nodes, but none were still in the cluster, we can call it a success. If it matches no nodes, we don't call it a success. The only problem with this approach is nodegroups scaled to zero. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see how that's possible unless we have a way of knowing if a node name in the CNR matches a node that existed prior to the CNR being created.

A flag here would help to toggle strict node validation and enable both use cases but I'm wary of adding more settings to a CNR.

return t.transitionToHealing(fmt.Errorf("no nodes matched selector"))
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string
Expand All @@ -83,48 +81,49 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
if err != nil {
return t.transitionToHealing(errors.Wrap(err, "failed to check instances that exist from cloud provider"))
}
var existingKubeNodes []corev1.Node

for _, node := range kubeNodes {
for _, validProviderID := range existingProviderIDs {
if node.Spec.ProviderID == validProviderID {
existingKubeNodes = append(existingKubeNodes, node)
break
}
existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return t.transitionToHealing(fmt.Errorf("no existing nodes in cloud provider matched selector"))
}

// Describe the node group for the request
t.rm.LogEvent(t.cycleNodeRequest, "FetchingNodeGroup", "Fetching node group: %v", t.cycleNodeRequest.GetNodeGroupNames())

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames())
if err != nil {
return t.transitionToHealing(err)
}

// get instances inside cloud provider node groups
nodeGroupInstances := nodeGroups.Instances()

// Do some sanity checking before we start filtering things
// Check the instance count of the node group matches the number of nodes found in Kubernetes
if len(kubeNodes) != len(nodeGroupInstances) {
nodesNotInCPNodeGroup, nodesNotInKube := findOffendingNodes(kubeNodes, nodeGroupInstances)
var offendingNodesInfo string

nodesNotInCPNodeGroup, nodesNotInKube := findOffendingNodes(kubeNodes, nodeGroupInstances)

if len(nodesNotInCPNodeGroup) > 0 {
offendingNodesInfo += "nodes not in node group: "
offendingNodesInfo += strings.Join(nodesNotInCPNodeGroup, ",")
}

if len(nodesNotInKube) > 0 {
if offendingNodesInfo != "" {
offendingNodesInfo += ";"
}

offendingNodesInfo += "nodes not inside cluster: "
offendingNodesInfo += strings.Join(nodesNotInKube, ",")
}

t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch",
"node group: %v, kube: %v. %v",
len(nodeGroupInstances), len(kubeNodes), offendingNodesInfo)
Expand All @@ -134,26 +133,28 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
if err != nil {
return t.transitionToHealing(err)
}

if timedOut {
err := fmt.Errorf(
"node count mismatch, number of kubernetes of nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit)
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
)

return t.transitionToHealing(err)
}

return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}

// make a list of the nodes to terminate
if len(t.cycleNodeRequest.Spec.NodeNames) > 0 {
// If specific node names are provided, check they actually exist in the node group
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding named nodes to NodesToTerminate")
err := t.addNamedNodesToTerminate(kubeNodes, nodeGroupInstances)
if err != nil {
return t.transitionToHealing(err)
}
t.addNamedNodesToTerminate(kubeNodes, nodeGroupInstances)
} else {
// Otherwise just add all the nodes in the node group
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding all node group nodes to NodesToTerminate")

for _, kubeNode := range kubeNodes {
// Check to ensure the kubeNode object maps to an existing node in the ASG
// If this isn't the case, this is a phantom node. Fail the cnr to be safe.
Expand Down Expand Up @@ -205,7 +206,9 @@ func (t *CycleNodeRequestTransitioner) transitionInitialised() (reconcile.Result
// The maximum nodes we can select are bounded by our concurrency. We take into account the number
// of nodes we are already working on, and only introduce up to our concurrency cap more nodes in this step.
maxNodesToSelect := t.cycleNodeRequest.Spec.CycleSettings.Concurrency - t.cycleNodeRequest.Status.ActiveChildren

t.rm.Logger.Info("Selecting nodes to terminate", "numNodes", maxNodesToSelect)

nodes, numNodesInProgress, err := t.getNodesToTerminate(maxNodesToSelect)
if err != nil {
return t.transitionToHealing(err)
Expand Down
22 changes: 14 additions & 8 deletions pkg/controller/cyclenoderequest/transitioner/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (t *CycleNodeRequestTransitioner) checkIfTransitioning(numNodesToCycle, num
transition, err := t.transitionObject(v1.CycleNodeRequestWaitingTermination)
return true, transition, err
}

// otherwise, we have finished everything, so transition to Successful
transition, err := t.transitionToSuccessful()
return true, transition, err
Expand All @@ -252,21 +253,26 @@ func (t *CycleNodeRequestTransitioner) checkIfTransitioning(numNodesToCycle, num

// findOffendingNodes finds the offending nodes information which cause number of nodes mismatch between
// cloud provider node group and nodes inside kubernetes cluster using label selector
func findOffendingNodes(kubeNodes []corev1.Node, cloudProviderNodes map[string]cloudprovider.Instance) ([]string, []string) {
kubeNodesMap := make(map[string]corev1.Node)
func findOffendingNodes(kubeNodes map[string]corev1.Node, cloudProviderNodes map[string]cloudprovider.Instance) ([]string, []string) {
var nodesNotInCPNodeGroup []string
var nodesNotInKube []string

for _, kubeNode := range kubeNodes {
kubeNodesMap[kubeNode.Spec.ProviderID] = kubeNode
if _, ok := cloudProviderNodes[kubeNode.Spec.ProviderID]; !ok {
nodesNotInCPNodeGroup = append(nodesNotInCPNodeGroup, fmt.Sprintf("id %q", kubeNode.Spec.ProviderID))
nodesNotInCPNodeGroup = append(nodesNotInCPNodeGroup,
fmt.Sprintf("id %q", kubeNode.Spec.ProviderID),
)
}
}

for cpNode := range cloudProviderNodes {
if _, ok := kubeNodesMap[cpNode]; !ok {
nodesNotInKube = append(nodesNotInKube, fmt.Sprintf("id %q in %q",
cpNode,
cloudProviderNodes[cpNode].NodeGroupName()))
if _, ok := kubeNodes[cpNode]; !ok {
nodesNotInKube = append(nodesNotInKube,
fmt.Sprintf("id %q in %q",
cpNode,
cloudProviderNodes[cpNode].NodeGroupName(),
),
)
}
}

Expand Down
Loading
Loading