Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions api/v1/redkeycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ const (
StatusInitializing = "Initializing"
StatusError = "Error"

RobinStatusNoReconciling = "NoReconciling"
RobinStatusUpgrading = "Upgrading"
RobinStatusScalingDown = "ScalingDown"
RobinStatusScalingUp = "ScalingUp"
RobinStatusReady = "Ready"
RobinStatusConfiguring = "Configuring"
RobinStatusInitializing = "Initializing"
RobinStatusError = "Error"

SubstatusFastUpgrading = "FastUpgrading"
SubstatusEndingFastUpgrading = "EndingFastUpgrading"
SubstatusSlowUpgrading = "SlowUpgrading"
Expand Down Expand Up @@ -311,3 +320,24 @@ func IsFastOperationStatus(status RedKeyClusterSubstatus) bool {
return status.Status == SubstatusFastScaling || status.Status == SubstatusEndingFastScaling ||
status.Status == SubstatusFastUpgrading || status.Status == SubstatusEndingFastUpgrading
}

func GetRobinStatusCodeEquivalence(redkeyClusterStatus string) string {
switch redkeyClusterStatus {
case StatusUpgrading:
return RobinStatusUpgrading
case StatusScalingDown:
return RobinStatusScalingDown
case StatusScalingUp:
return RobinStatusScalingUp
case StatusReady:
return RobinStatusReady
case StatusConfiguring:
return RobinStatusConfiguring
case StatusInitializing:
return RobinStatusInitializing
case StatusError:
return RobinStatusError
default:
return RobinStatusError
}
}
72 changes: 36 additions & 36 deletions controllers/redis_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re
r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas)

logger := r.getHelperLogger(redkeyCluster.NamespacedName())
robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness")
return err
}

// Set the number of replicas to Robin to have the new node met to the existing nodes.
replicas, replicasPerMaster, err := robinRedis.GetReplicas()
replicas, replicasPerMaster, err := redkeyRobin.GetReplicas()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting replicas from Robin")
return err
Expand All @@ -274,15 +274,15 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingUp(ctx context.Context, re
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin replicas")
}
err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin replicas")
return err
}
}

// Check all cluster nodes are ready from Robin.
clusterNodes, err := robinRedis.GetClusterNodes()
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return err
Expand Down Expand Up @@ -318,14 +318,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re

// Get Robin.
logger := r.getHelperLogger(redkeyCluster.NamespacedName())
robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness")
return err
}

// Check cluster health.
check, errors, warnings, err := robinRedis.ClusterCheck()
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin")
return err
Expand Down Expand Up @@ -363,7 +363,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
}
} else {
// Move slots from partition before rolling update.
completed, err := robinRedis.MoveSlots(currentPartition, currentPartition+1, 0)
completed, err := redkeyRobin.MoveSlots(currentPartition, currentPartition+1, 0)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error moving slots", "From node", currentPartition, "To node", currentPartition+1)
return err
Expand All @@ -374,7 +374,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeUpgrading(ctx context.Context, re
}

// Forget node
err = robinRedis.ClusterResetNode(currentPartition)
err = redkeyRobin.ClusterResetNode(currentPartition)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error from Robin resetting the node", "node index", currentPartition)
return err
Expand Down Expand Up @@ -430,14 +430,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl

// Get Robin.
logger := r.getHelperLogger(redkeyCluster.NamespacedName())
robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness")
return err
}

// Check cluster health.
check, errors, warnings, err := robinRedis.ClusterCheck()
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin")
return err
Expand All @@ -449,7 +449,7 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeEnd(ctx context.Context, redkeyCl

// Move slots from extra node to node 0.
extraNodeIndex := int(*(existingStatefulSet.Spec.Replicas)) - 1
completed, err := robinRedis.MoveSlots(extraNodeIndex, 0, 0)
completed, err := redkeyRobin.MoveSlots(extraNodeIndex, 0, 0)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error moving slots", "From node", extraNodeIndex, "To node", 0)
return err
Expand Down Expand Up @@ -495,14 +495,14 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingDown(ctx context.Context,
r.logInfo(redkeyCluster.NamespacedName(), "Redis node pods are ready", "pods", existingStatefulSet.Spec.Replicas)

logger := r.getHelperLogger(redkeyCluster.NamespacedName())
robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin")
return err
}

// Set the number of replicas to Robin to have the new node met to the existing nodes.
replicas, replicasPerMaster, err := robinRedis.GetReplicas()
replicas, replicasPerMaster, err := redkeyRobin.GetReplicas()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting replicas from Robin")
return err
Expand All @@ -512,15 +512,15 @@ func (r *RedKeyClusterReconciler) doSlowUpgradeScalingDown(ctx context.Context,
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin replicas")
}
err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin replicas")
return err
}
}

// Check all cluster nodes are ready from Robin.
check, errors, warnings, err := robinRedis.ClusterCheck()
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin")
return err
Expand Down Expand Up @@ -678,15 +678,15 @@ func (r *RedKeyClusterReconciler) doFastScaling(ctx context.Context, redkeyClust
return true, nil
}

// Rebuild the cluster
// Reset the cluster
robin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness")
return true, err
}
err = robin.ClusterFix()
err = robin.ClusterRecreate()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error performing a cluster fix through Robin")
r.logError(redkeyCluster.NamespacedName(), err, "Error performing a cluster recreate through Robin")
return true, err
}

Expand Down Expand Up @@ -731,28 +731,28 @@ func (r *RedKeyClusterReconciler) doFastScaling(ctx context.Context, redkeyClust
return true, err
}

// At this point Robin status has been updated from Ready status. Ge go back to Ready status.
robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
// Set Robin status to NotReconciling.
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to check its readiness")
return true, err
}
err = robinRedis.SetStatus(redkeyv1.StatusReady)
err = redkeyRobin.SetStatus(redkeyv1.RobinStatusNoReconciling)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin status", "status", redkeyv1.StatusReady)
r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin status", "status", redkeyv1.RobinStatusNoReconciling)
return true, err
}
err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyv1.StatusReady)
err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyv1.RobinStatusNoReconciling)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin status", "status", redkeyv1.StatusReady)
r.logError(redkeyCluster.NamespacedName(), err, "Error persisting Robin status", "status", redkeyv1.RobinStatusNoReconciling)
return true, err
}

// ** FAST SCALING start **
r.Client.Delete(ctx, existingStatefulSet)

// Update Robin replicas.
err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error setting Robin replicas", "replicas", redkeyCluster.Spec.Replicas,
"replicas per master", redkeyCluster.Spec.ReplicasPerMaster)
Expand Down Expand Up @@ -843,20 +843,20 @@ func (r *RedKeyClusterReconciler) doSlowScaling(ctx context.Context, redkeyClust
func (r *RedKeyClusterReconciler) scaleDownCluster(ctx context.Context, redkeyCluster *redkeyv1.RedKeyCluster) (bool, error) {

logger := r.getHelperLogger((redkeyCluster.NamespacedName()))
robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to get cluster nodes")
return true, err
}

// Update replicas
replicas, replicasPerMaster, err := robinRedis.GetReplicas()
replicas, replicasPerMaster, err := redkeyRobin.GetReplicas()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin replicas")
return true, err
}
if replicas != int(redkeyCluster.Spec.Replicas) || replicasPerMaster != int(redkeyCluster.Spec.ReplicasPerMaster) {
err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating Robin replicas", "replicas", redkeyCluster.Spec.Replicas,
"replicas per master", redkeyCluster.Spec.ReplicasPerMaster)
Expand All @@ -872,7 +872,7 @@ func (r *RedKeyClusterReconciler) scaleDownCluster(ctx context.Context, redkeyCl
}

// Check cluster replicas meet the requirements
clusterNodes, err := robinRedis.GetClusterNodes()
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin nodes info")
return true, err
Expand Down Expand Up @@ -979,12 +979,12 @@ func (r *RedKeyClusterReconciler) completeClusterScaleDown(ctx context.Context,

func (r *RedKeyClusterReconciler) getCulledNodes(ctx context.Context, redkeyCluster *redkeyv1.RedKeyCluster) ([]robin.Node, error) {
logger := r.getHelperLogger((redkeyCluster.NamespacedName()))
redisRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to get cluster nodes")
return nil, err
}
clusterNodes, err := redisRobin.GetClusterNodes()
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes info from Robin")
return nil, err
Expand Down Expand Up @@ -1035,7 +1035,7 @@ func (r *RedKeyClusterReconciler) scaleUpCluster(ctx context.Context, redkeyClus

func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, redkeyCluster *redkeyv1.RedKeyCluster) (bool, error) {
logger := r.getHelperLogger((redkeyCluster.NamespacedName()))
robinRedis, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
redkeyRobin, err := robin.NewRobin(ctx, r.Client, redkeyCluster, logger)
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting Robin to get cluster nodes")
return true, err
Expand All @@ -1058,7 +1058,7 @@ func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, re
}

// Update Robin with new replicas/replicasPerMaster
err = robinRedis.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
err = redkeyRobin.SetReplicas(int(redkeyCluster.Spec.Replicas), int(redkeyCluster.Spec.ReplicasPerMaster))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating replicas in Robin", "replicas", redkeyCluster.Spec.Replicas, "replicasPerMaster", redkeyCluster.Spec.ReplicasPerMaster)
return true, err
Expand All @@ -1083,7 +1083,7 @@ func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, re
// We will ensure that all cluster nodes are initialized before asking Robin to meet all new nodes,
// forget outdated nodes, ensure slots coverage and rebalance.

clusterNodes, err := robinRedis.GetClusterNodes()
clusterNodes, err := redkeyRobin.GetClusterNodes()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error getting cluster nodes from Robin")
return true, err
Expand All @@ -1093,7 +1093,7 @@ func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, re
r.logInfo(redkeyCluster.NamespacedName(), "ScaleCluster - Inconsistency. Statefulset replicas equals to RedKeyCluster replicas but we have a different number of cluster nodes. Trying to fix it...",
"RedKeyCluster replicas", redkeyCluster.Spec.Replicas, "Cluster nodes", masterNodes)
}
err = robinRedis.ClusterFix()
err = redkeyRobin.ClusterFix()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error asking to Robin to ensure the cluster is ok")
return true, err
Expand All @@ -1109,7 +1109,7 @@ func (r *RedKeyClusterReconciler) completeClusterScaleUp(ctx context.Context, re
case redkeyv1.SubstatusEndingScaling:
// Final step: ensure the cluster is Ok.

check, errors, warnings, err := robinRedis.ClusterCheck()
check, errors, warnings, err := redkeyRobin.ClusterCheck()
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error checking the cluster readiness over Robin")
return true, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r *RedKeyClusterReconciler) updateClusterStatus(ctx context.Context, redke
return err
}

err = robin.SetStatus(redkeyCluster.Status.Status)
err = robin.SetStatus(redkeyv1.GetRobinStatusCodeEquivalence(redkeyCluster.Status.Status))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error setting the new status to Robin", "status", redkeyCluster.Status.Status)
return err
Expand All @@ -75,7 +75,7 @@ func (r *RedKeyClusterReconciler) updateClusterStatus(ctx context.Context, redke
}

// Update Robin ConfigMap status
err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyCluster.Status.Status)
err = robin.PersistRobinStatut(ctx, r.Client, redkeyCluster, redkeyv1.GetRobinStatusCodeEquivalence(redkeyCluster.Status.Status))
if err != nil {
r.logError(redkeyCluster.NamespacedName(), err, "Error updating the new status in Robin ConfigMap", "status", redkeyCluster.Status.Status)
return err
Expand Down
Loading