Skip to content
This repository has been archived by the owner on Feb 19, 2021. It is now read-only.

Commit

Permalink
Make Kube and Calico node update more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmxs committed May 25, 2020
1 parent 60573cc commit 39a6ac8
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 55 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ all: manager

# Run tests
test: generate fmt vet manifests
go test ./... -coverprofile cover.out
go test ./... -race -coverprofile cover.out

# Build manager binary
manager: generate fmt vet
go build -o bin/manager main.go
go build -race -o bin/manager main.go

# Run against the configured Kubernetes cluster in ~/.kube/config
run: generate fmt vet manifests
Expand Down
5 changes: 5 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ spec:
- /manager
args:
- --enable-leader-election
env:
- name: DATASTORE_TYPE
value: "kubernetes"
- name: KUBECONFIG
value: /root/.kube/config
image: controller:latest
imagePullPolicy: Always
name: manager
Expand Down
145 changes: 95 additions & 50 deletions controllers/routereflectorconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"math"

"github.com/go-logr/logr"
Expand All @@ -27,25 +28,34 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
types "k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"

calicoApi "github.com/projectcalico/libcalico-go/lib/apis/v3"
calicoClient "github.com/projectcalico/libcalico-go/lib/clientv3"
"github.com/projectcalico/libcalico-go/lib/options"
)

var (
nodeNotFound = ctrl.Result{}
nodeCleaned = ctrl.Result{Requeue: true}
nodeReverted = ctrl.Result{Requeue: true}
finished = ctrl.Result{}

nodeGetError = ctrl.Result{}
nodeCleanupError = ctrl.Result{}
labelSelectorError = ctrl.Result{}
nodeListError = ctrl.Result{}
nodeUpdateError = ctrl.Result{}
nodeGetError = ctrl.Result{}
nodeCleanupError = ctrl.Result{}
labelSelectorError = ctrl.Result{}
nodeListError = ctrl.Result{}
nodeRevertError = ctrl.Result{}
calicoNodeGetError = ctrl.Result{}
calicoNodeUpdateError = ctrl.Result{}
nodeUpdateError = ctrl.Result{}
)

var routeReflectorsUnderOperation = map[types.UID]bool{}

type RouteReflectorConfig struct {
ClusterID string
Min int
Expand All @@ -59,9 +69,10 @@ type RouteReflectorConfig struct {
// RouteReflectorConfigReconciler reconciles a RouteReflectorConfig object
type RouteReflectorConfigReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
config RouteReflectorConfig
CalicoClient calicoClient.Interface
Log logr.Logger
Scheme *runtime.Scheme
config RouteReflectorConfig
}

type reconcileImplClient interface {
Expand All @@ -78,22 +89,22 @@ func (r *RouteReflectorConfigReconciler) Reconcile(req ctrl.Request) (ctrl.Resul
_ = r.Log.WithValues("routereflectorconfig", req.NamespacedName)

node := corev1.Node{}
err := r.Client.Get(context.Background(), req.NamespacedName, &node)
if err != nil && !errors.IsNotFound(err) {
if err := r.Client.Get(context.Background(), req.NamespacedName, &node); err != nil && !errors.IsNotFound(err) {
log.Errorf("Unable to fetch node %s because of %s", req.NamespacedName, err.Error())
return nodeGetError, err
} else if errors.IsNotFound(err) {
log.Debugf("Node not found %s", req.NamespacedName)
return nodeNotFound, nil
} else if err == nil && node.GetDeletionTimestamp() != nil || !isNodeReady(&node) || !isNodeSchedulable(&node) {
} else if err == nil && isLabeled(node.GetLabels(), r.config.NodeLabelKey, r.config.NodeLabelValue) && node.GetDeletionTimestamp() != nil ||
!isNodeReady(&node) || !isNodeSchedulable(&node) {
// Node is deleted right now or has some issues, better to remove form RRs
if updated, err := r.cleanupBGPStatus(req, &node); err != nil {
if err := r.cleanupBGPStatus(req, &node); err != nil {
log.Errorf("Unable to cleanup label on %s because of %s", req.NamespacedName, err.Error())
return nodeCleanupError, err
} else if updated {
log.Infof("Label was removed from node %s time to re-reconcile", req.NamespacedName)
return nodeCleaned, nil
}

log.Infof("Label was removed from node %s time to re-reconcile", req.NamespacedName)
return nodeCleaned, nil
}

listOptions := client.ListOptions{}
Expand Down Expand Up @@ -127,7 +138,23 @@ func (r *RouteReflectorConfigReconciler) Reconcile(req ctrl.Request) (ctrl.Resul
log.Infof("Expected number of route reflector nodes are %d", expectedNumber)

for n, isReady := range nodes {
if !isReady {
if status, ok := routeReflectorsUnderOperation[n.GetUID()]; ok {
if status {
delete(n.Labels, r.config.NodeLabelKey)
} else {
n.Labels[r.config.NodeLabelKey] = r.config.NodeLabelValue
}

log.Infof("Revert route reflector label on %s to %t", req.NamespacedName, !status)
if err := r.Client.Update(context.Background(), n); err != nil && !errors.IsNotFound(err) {
log.Errorf("Failed to revert node %s because of %s", req.NamespacedName, err.Error())
return nodeRevertError, err
}

delete(routeReflectorsUnderOperation, n.GetUID())

return nodeReverted, nil
} else if !isReady {
continue
} else if expectedNumber == actualReadyNumber {
break
Expand Down Expand Up @@ -175,63 +202,80 @@ func (r *RouteReflectorConfigReconciler) collectNodeInfo(allNodes []corev1.Node)
return
}

func (r *RouteReflectorConfigReconciler) cleanupBGPStatus(req ctrl.Request, node *corev1.Node) (bool, error) {
if isLabeled(node.GetLabels(), r.config.NodeLabelKey, r.config.NodeLabelValue) {
calicoNode, err := r.fetchCalicoNode(req, node)
if err != nil {
log.Errorf("Failed to fetch Calico node %s because of %s", req.NamespacedName, err.Error())
return false, err
}

delete(calicoNode.Labels, r.config.NodeLabelKey)
calicoNode.Spec.BGP.RouteReflectorClusterID = ""
func (r *RouteReflectorConfigReconciler) cleanupBGPStatus(req ctrl.Request, node *corev1.Node) error {
delete(node.Labels, r.config.NodeLabelKey)

log.Infof("Removing route reflector label from %s", req.NamespacedName)
if err := r.Client.Update(context.Background(), calicoNode); err != nil {
log.Errorf("Unable to cleanup node %s because of %s", req.NamespacedName, err.Error())
return false, err
}
log.Infof("Removing route reflector label from %s", req.NamespacedName)
if err := r.Client.Update(context.Background(), node); err != nil {
log.Errorf("Unable to cleanup node %s because of %s", req.NamespacedName, err.Error())
return err
}

return true, nil
if err := r.updateRouteReflectorClusterID(req, node, ""); err != nil {
log.Errorf("Unable to cleanup Calico node %s because of %s", req.NamespacedName, err.Error())
return err
}

return false, nil
return nil
}

func (r *RouteReflectorConfigReconciler) updateBGPStatus(req ctrl.Request, node *corev1.Node, diff int) (bool, error) {
labeled := isLabeled(node.GetLabels(), r.config.NodeLabelKey, r.config.NodeLabelValue)
if labeled && diff < 0 {
return r.cleanupBGPStatus(req, node)
if labeled := isLabeled(node.GetLabels(), r.config.NodeLabelKey, r.config.NodeLabelValue); labeled && diff < 0 {
return true, r.cleanupBGPStatus(req, node)
} else if labeled || diff <= 0 {
return false, nil
}

calicoNode, err := r.fetchCalicoNode(req, node)
if err != nil {
node.Labels[r.config.NodeLabelKey] = r.config.NodeLabelValue

log.Infof("Adding route reflector label to %s", req.NamespacedName)
if err := r.Client.Update(context.Background(), node); err != nil {
log.Errorf("Unable to update node %s because of %s", req.NamespacedName, err.Error())
return false, err
}

log.Infof("Label node %s as route reflector", node.GetName())
calicoNode.Labels[r.config.NodeLabelKey] = r.config.NodeLabelValue
calicoNode.Spec.BGP.RouteReflectorClusterID = r.config.ClusterID

log.Infof("Updating labels on node %s to %v", req.NamespacedName, node.Labels)
if err := r.Client.Update(context.Background(), calicoNode); err != nil {
log.Errorf("Failed to fetch Calico node %s because of %s", req.NamespacedName, err.Error())
if err := r.updateRouteReflectorClusterID(req, node, r.config.ClusterID); err != nil {
log.Errorf("Unable to update Calico node %s because of %s", req.NamespacedName, err.Error())
return false, err
}

return true, nil
}

func (r *RouteReflectorConfigReconciler) fetchCalicoNode(req ctrl.Request, node *corev1.Node) (*calicoApi.Node, error) {
func (r *RouteReflectorConfigReconciler) updateRouteReflectorClusterID(req ctrl.Request, node *corev1.Node, clusterID string) error {
routeReflectorsUnderOperation[node.GetUID()] = clusterID != ""

log.Debugf("Fetching Calico node object of %s", req.NamespacedName)
calicoNode := calicoApi.Node{}
if err := r.Client.Get(context.Background(), req.NamespacedName, &calicoNode); err != nil {
return nil, err
calicoNodes, err := r.CalicoClient.Nodes().List(context.Background(), options.ListOptions{})
if err != nil {
log.Errorf("Unable to fetch Calico nodes %s because of %s", req.NamespacedName, err.Error())
return err
}

var calicoNode *calicoApi.Node
for _, cn := range calicoNodes.Items {
if hostname, ok := cn.GetLabels()["kubernetes.io/hostname"]; ok && hostname == node.GetLabels()["kubernetes.io/hostname"] {
calicoNode = &cn
break
}
}
if calicoNode == nil {
err := fmt.Errorf("Unable to find Calico node for %s", req.NamespacedName)
log.Error(err.Error())
return err
}

calicoNode.Spec.BGP.RouteReflectorClusterID = clusterID

calicoNode, err = r.CalicoClient.Nodes().Update(context.Background(), calicoNode, options.SetOptions{})
if err != nil {
log.Errorf("Unable to update Calico node %s because of %s", req.NamespacedName, err.Error())
return err
}

delete(routeReflectorsUnderOperation, node.GetUID())

return &calicoNode, nil
return nil
}

func isNodeReady(node *corev1.Node) bool {
Expand Down Expand Up @@ -275,6 +319,7 @@ func (ef eventFilter) Generic(event.GenericEvent) bool {
}

func (r *RouteReflectorConfigReconciler) SetupWithManager(mgr ctrl.Manager, config RouteReflectorConfig) error {
log.Infof("Given configuration is: %v", config)
r.config = config
return ctrl.NewControllerManagedBy(mgr).
WithEventFilter(eventFilter{}).
Expand Down
Loading

0 comments on commit 39a6ac8

Please sign in to comment.