Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// SPDX-FileCopyrightText: 2025 INDUSTRIA DE DISEÑO TEXTIL, S.A. (INDITEX, S.A.)
//
// SPDX-License-Identifier: Apache-2.0

{
"version": "0.2.0",
"configurations": [
{
"name": "Connect to robin",
"type": "go",
"request": "attach",
"mode": "remote",
"remotePath": "${workspaceFolder}",
"port": 40002,
"host": "127.0.0.1",
"trace": "verbose",
"env": {
"WATCH_NAMESPACE": "default",
"KUBERNETES_CONFIG": "${HOME}/.kube/config",
}
}
]
}
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ debug: ## Build a new robin binary, copy the file to the pod and run it in debu
kubectl cp ./bin/robin $(REDKEY_ROBIN):/robin -n ${NAMESPACE}
kubectl exec -it po/$(REDKEY_ROBIN) -n ${NAMESPACE} -- dlv --listen=:40000 --headless=true --api-version=2 --accept-multiclient exec /robin --continue

port-forward: ## Port forwarding of port 40000 for debugging robin with Delve.
kubectl port-forward pod/$(REDKEY_ROBIN) 40000:40000 -n ${NAMESPACE}
port-forward: ## Port forwarding of port 40000 to port 40002 for debugging robin with Delve.
kubectl port-forward pod/$(REDKEY_ROBIN) 40002:40000 -n ${NAMESPACE}

port-forward-metrics: ## Port forwarding of port 8080 for debugging the manager with Delve.
kubectl port-forward pod/$(REDKEY_ROBIN) 8080:8080 -n ${NAMESPACE}
Expand Down
7 changes: 5 additions & 2 deletions debug.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
#
# SPDX-License-Identifier: Apache-2.0

FROM golang:1.23
FROM golang:1.24.6

RUN go install github.com/go-delve/delve/cmd/dlv@v1.23.1
RUN go install github.com/go-delve/delve/cmd/dlv@v1.25

# Install redis-cli by adding the redis package
RUN apt update -y && apt install -y redis-tools curl procps

WORKDIR /
EXPOSE 40000
34 changes: 30 additions & 4 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type clusterGetter interface {
GetStatus() string
// GetReconcilerInterval returns the interval of the cluster reconciler.
GetReconcilerInterval() int
// GetReconcilerOperationCleanupInterval returns the interval for stabilizing slots.
GetReconcilerStabilizeSlotsReconciliationThreshold() int
// GetName returns the name of the cluster.
GetName() string
// GetAddress returns the address of the cluster.
Expand All @@ -76,6 +78,8 @@ type clusterGetter interface {
GetMetricsRedisInfoKeys() []string
// GetMetricsInterval returns the interval for collecting Redis metrics
GetMetricsInterval() int
// GetOpenSlots returns the open slots of the RedKey Cluster
GetOpenSlots() map[int]int
}

type clusterSetter interface {
Expand All @@ -85,6 +89,8 @@ type clusterSetter interface {
SetReplicas(replicas int, replicasPerMaster *int) error
// SetStatus sets the status of the cluster.
SetStatus(status string) error
// SetOpenSlots sets the open slots of the cluster.
SetOpenSlots(openSlots map[int]int)
}

type clusterAsker interface {
Expand Down Expand Up @@ -127,10 +133,14 @@ type clusterPrivate interface {
balanceClusterIfNeeded(weights map[string]int) error
// forgetNode forgets a node from the cluster.
forgetNode(ctx context.Context, node redis.RedisNode) error
// removeNode removes a node from the cluster.
removeNode(ctx context.Context, node redis.RedisNode) error
// removeNodesIfNeeded removes nodes if needed.
removeNodesIfNeeded(ctx context.Context) error
// addNewNodesIfNeeded adds new nodes if needed.
addNewNodesIfNeeded() error
// stabilizeOpenSlots stabilizes open slots if needed.
stabilizeOpenSlots(ctx context.Context, counter map[int]int, threshold int) (map[int]int, error)
}

// Cluster represents a cluster, either standalone or Redis.
Expand Down Expand Up @@ -174,10 +184,11 @@ func NewCluster(ctx context.Context, conf *config.Configuration, channel chan st

// clusterBase represents the base of a cluster.
type clusterBase struct {
ctx context.Context
logger *slog.Logger
conf *config.Configuration
status string
ctx context.Context
logger *slog.Logger
conf *config.Configuration
status string
openSlots map[int]int
}

// ----------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -234,6 +245,11 @@ func (rc *clusterBase) GetReconcilerOperationCleanupInterval() int {
return rc.conf.Redis.Reconciler.OperationCleanupIntervalSeconds
}

// GetReconcilerStabilizeSlotsReconciliationThreshold returns the threshold for stabilizing slots
func (rc *clusterBase) GetReconcilerStabilizeSlotsReconciliationThreshold() int {
return rc.conf.Redis.Reconciler.StabilizeSlotsReconciliationThreshold
}

// GetClusterMaxRetries returns the maximum number of retries for a RedKey Cluster check connection operation
func (rc *clusterBase) GetClusterMaxRetries() int {
return rc.conf.Redis.Cluster.MaxRetries
Expand Down Expand Up @@ -279,6 +295,11 @@ func (rc *clusterBase) GetMetadata() map[string]string {
return rc.conf.Metadata
}

// GetOpenSlots returns the open slots of the RedKey Cluster
func (rc *clusterBase) GetOpenSlots() map[int]int {
return rc.openSlots
}

// ----------------------------------------------------------------------------------------------------
// ---------------------------------------------- SETTERS ---------------------------------------------
// ----------------------------------------------------------------------------------------------------
Expand All @@ -289,6 +310,11 @@ func (rc *clusterBase) SetStatus(status string) error {
return nil
}

// SetOpenSlots sets the open slots of the RedKey Cluster
func (rc *clusterBase) SetOpenSlots(openSlots map[int]int) {
rc.openSlots = openSlots
}

// ----------------------------------------------------------------------------------------------------
// ---------------------------------------------- ASKERS ----------------------------------------------
// ----------------------------------------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions internal/cluster/integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (ro *RedisOperationCheckIntegrity) doCheckIntegrity(ctx context.Context) er
ro.logger.Error("Error removing outdated nodes", "error", err)
}

// Remove nodes if needed
if err := ro.cluster.removeNodesIfNeeded(ctx); err != nil {
return err
}

// Meet nodes if needed
if err := ro.cluster.meetNodesIfNeeded(ctx); err != nil {
return err
Expand Down
123 changes: 100 additions & 23 deletions internal/cluster/redkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ type RedKeyCluster struct {
operationFactory *OperationFactory
}

// MigratingSlot represents a slot being migrated from one node to another
type MigratingSlot struct {
Slot int
From string
To string
}

// Default client factory for RedKeyCluster (global, no closure)
func defaultRedKeyClusterClientFactory(ctx context.Context, addr string, maxRetries int, backoff time.Duration) (redis.RedisClientInterface, error) {
redisClient := redis.NewRedisClient(ctx, addr, os.Getenv("REDISAUTH"), 0)
Expand Down Expand Up @@ -118,6 +125,16 @@ func (rc *RedKeyCluster) GetNode(name string) *redis.RedisNode {
return rc.nodes[name]
}

// GetNodeById returns the Redis node with the specified ID or nil if it doesn't exist
func (rc *RedKeyCluster) GetNodeById(id string) *redis.RedisNode {
for _, node := range rc.nodes {
if node.ID == id {
return node
}
}
return nil
}

// GetNodeFromID returns the Redis node with the specified ID or nil if it doesn't exist
func (rc *RedKeyCluster) GetNodeFromID(nodeID string) *redis.RedisNode {
rc.mux.RLock()
Expand Down Expand Up @@ -619,16 +636,16 @@ func (rc *RedKeyCluster) addNode(name, addr string) *redis.RedisNode {
}

// removeNode removes a Redis node from the cluster
func (rc *RedKeyCluster) removeNode(name string) error {
func (rc *RedKeyCluster) removeNode(ctx context.Context, nodeToForget redis.RedisNode) error {
rc.mux.RLock()
defer rc.mux.RUnlock()

_, ok := rc.nodes[name]
_, ok := rc.nodes[nodeToForget.Name]
if !ok {
return fmt.Errorf("node %s not found", name)
return fmt.Errorf("node %s not found", nodeToForget.Name)
}

delete(rc.nodes, name)
delete(rc.nodes, nodeToForget.Name)
return nil
}

Expand Down Expand Up @@ -672,38 +689,50 @@ func (rc *RedKeyCluster) refreshNodes() error {

// Update nodes info
rc.updateNodesInfo(nodesInfo)

return nil
}

// checkNodes checks the nodes of the RedKey cluster
func (rc *RedKeyCluster) checkNodes() error {
refresedNodes := make(map[string]*redis.RedisNode)
for i := range rc.GetDesiredReplicas() {
nodeName := fmt.Sprintf("%s-%d", rc.GetName(), i)

// Check if the node exists
node := rc.GetNode(nodeName)
if node == nil {
rc.logger.Error("Node not found", "node", nodeName)
continue
}
rc.logger.Info("Node not found (probably been forgotten), creating...", "node", nodeName)
nodeName := fmt.Sprintf("%s-%d", rc.GetName(), i)
nodeAddr := fmt.Sprintf("%s.%s", nodeName, rc.GetAddress())
freshNode := redis.NewRedisNode(nodeName, nodeAddr, rc.GetClusterMaxRetries(), rc.GetClusterBackOff())
if err := freshNode.Init(rc.ctx); err != nil {
rc.logger.Info("Error initializing node", "error", err, "node", nodeName)
continue
}
rc.nodes[nodeName] = freshNode
} else {

// Init a fresh node to check if IP or ID have changed. This can happen if the node has been restarted
freshNode := redis.NewRedisNode(nodeName, node.Addr, rc.GetClusterMaxRetries(), rc.GetClusterBackOff())
if err := freshNode.Init(rc.ctx); err != nil {
rc.logger.Info("Error initializing node", "error", err, "node", nodeName)
continue
}
// Init a fresh node to check if IP or ID have changed. This can happen if the node has been restarted
freshNode := redis.NewRedisNode(nodeName, node.Addr, rc.GetClusterMaxRetries(), rc.GetClusterBackOff())
if err := freshNode.Init(rc.ctx); err != nil {
rc.logger.Info("Error initializing node", "error", err, "node", nodeName)
continue
}

// Update ID and IP
if node.ID != freshNode.ID {
rc.logger.Info("Node ID has changed", "node", nodeName, "oldID", node.ID, "newID", freshNode.ID)
node.SetID(freshNode.ID)
}
if node.IP != freshNode.IP {
rc.logger.Info("Node IP has changed", "node", nodeName, "oldIP", node.IP, "newIP", freshNode.IP)
node.SetIP(freshNode.IP)
// Update ID and IP
if node.ID != freshNode.ID {
rc.logger.Info("Node ID has changed", "node", nodeName, "oldID", node.ID, "newID", freshNode.ID)
node.SetID(freshNode.ID)
}
if node.IP != freshNode.IP {
rc.logger.Info("Node IP has changed", "node", nodeName, "oldIP", node.IP, "newIP", freshNode.IP)
node.SetIP(freshNode.IP)
}
}
refresedNodes[nodeName] = rc.nodes[nodeName]
}
rc.nodes = refresedNodes

// Update nodes info
if err := rc.refreshNodes(); err != nil {
Expand Down Expand Up @@ -957,7 +986,7 @@ func (rc *RedKeyCluster) forgetAndRemoveNodes(ctx context.Context, nodes []*redi
}

// Remove the node
if err := rc.removeNode(node.Name); err != nil {
if err := rc.removeNode(ctx, *node); err != nil {
return err
}
}
Expand Down Expand Up @@ -1047,7 +1076,7 @@ func (rc *RedKeyCluster) removeOutdatedNodes(ctx context.Context) error {
return fmt.Errorf("error forgetting node %s from node %s: %v", clusterNode.ID, node.Name, err)
}

rc.logger.Info("Node forgotten successfully", "node", clusterNode.ID, "from", node.Name)
rc.logger.Info("Outdated node forgotten successfully", "node", clusterNode.ID, "from", node.Name)
}
}

Expand Down Expand Up @@ -1524,3 +1553,51 @@ func (rc *RedKeyCluster) doRemoveOutdatedNodes() {
}
}
}

// stabilizeOpenSlots sets open slots as stable if the check counter threshold is reached
func (rc *RedKeyCluster) stabilizeOpenSlots(ctx context.Context, counter map[int]int, threshold int) (map[int]int, error) {
var slotsToStabilize []MigratingSlot
updatedCounter := make(map[int]int)
for _, node := range rc.nodes {
clusterNodes, err := node.GetClusterNodes(ctx)
if err != nil {
return nil, fmt.Errorf("error getting cluster nodes from node %s: %v", node.Name, err)
}

for _, clusterNode := range clusterNodes {
if clusterNode.ID == node.ID && len(clusterNode.Migrating) > 0 {
for slot := range clusterNode.Migrating {
if count, ok := counter[slot]; ok {
if count+1 > threshold {
slotsToStabilize = append(slotsToStabilize, MigratingSlot{Slot: slot, From: clusterNode.ID, To: clusterNode.Migrating[slot]})
} else {
updatedCounter[slot] = count + 1
}
} else {
updatedCounter[slot] = 1
}
}
}
}
}

if len(slotsToStabilize) > 0 {
for _, slot := range slotsToStabilize {
rc.logger.Info("Slot needs to be stabilized", "slot", slot.Slot, "from", slot.From, "to", slot.To, "checks", threshold)
if fromNode := rc.GetNodeById(slot.From); fromNode != nil {
if err := fromNode.StabilizeSlot(ctx, fromNode.IP, slot.Slot); err != nil {
rc.logger.Error("Error stabilizing slot", "slot", slot.Slot, "from", slot.From, "error", err)
}
rc.logger.Info("Slot stabilized on from node", "slot", slot.Slot, "from", slot.From)
}
if toNode := rc.GetNodeById(slot.To); toNode != nil {
if err := toNode.StabilizeSlot(ctx, toNode.IP, slot.Slot); err != nil {
rc.logger.Error("Error stabilizing slot", "slot", slot.Slot, "to", slot.To, "error", err)
}
rc.logger.Info("Slot stabilized on to node", "slot", slot.Slot, "to", slot.To)
}
}
}

return updatedCounter, nil
}
8 changes: 8 additions & 0 deletions internal/cluster/redkey_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type MockRedKeyCluster struct {
RemoveNodesIfNeededError error
AddNewNodesIfNeededError error
ForgetNodeError error
StabilizeOpenSlotsError error

// Behavior control fields
IsEphemeralValue bool
Expand Down Expand Up @@ -122,6 +123,11 @@ func (m *MockRedKeyCluster) balanceClusterIfNeeded(weights map[string]int) error
return m.BalanceClusterIfNeededError
}

// stabilizeOpenSlots mocks the stabilizeOpenSlots method
func (m *MockRedKeyCluster) stabilizeOpenSlots(ctx context.Context, counter map[int]int, threshold int) (map[int]int, error) {
return nil, m.StabilizeOpenSlotsError
}

// SetRedisClientError configures an error for a specific Redis client method
func (m *MockRedKeyCluster) SetRedisClientError(method string, err error) {
switch method {
Expand Down Expand Up @@ -159,6 +165,8 @@ func (m *MockRedKeyCluster) SetRedisClientError(method string, err error) {
m.MockRedisClient.ClusterRebalanceError = err
case "ReshardNode":
m.MockRedisClient.ReshardNodeError = err
case "StabilizeSlot":
m.MockRedisClient.StabilizeSlotError = err
}
}

Expand Down
Loading
Loading