Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 101 additions & 1 deletion src/valkey-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -3714,7 +3714,8 @@ static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array, int len)
static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array);
static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array, clusterManagerNode **nodeptr);
static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array, clusterManagerNode *node);

static int clusterManagerIsReplicaSynced(clusterManagerNode *node, char **err);
static int clusterManagerWaitReplicasFullSync(long long timeout_ms);
/* Cluster Manager commands. */

static int clusterManagerCommandCreate(int argc, char **argv);
Expand Down Expand Up @@ -6786,6 +6787,95 @@ static void clusterManagerMode(clusterManagerCommandProc *proc) {
exit(success ? 0 : 1);
}

static int clusterManagerIsReplicaSynced(clusterManagerNode *node, char **err) {
if (err) *err = NULL;

if (node == NULL || node->replicate == NULL) {
if (err) *err = sdsnew("Node is not a replica");
return 0;
}

valkeyReply *r = CLUSTER_MANAGER_COMMAND(node, "INFO replication");
if (!clusterManagerCheckValkeyReply(node, r, err)) {
if (r) freeReplyObject(r);
return 0;
}

if (r->type != VALKEY_REPLY_STRING) {
if (err) *err = sdsnew("Unexpected INFO replication reply type");
freeReplyObject(r);
return 0;
}

int link_up = 0;
int sync_in_progress = 0;

char *str = r->str;
char *p = strstr(str, "master_link_status:");
if (p) {
p += strlen("master_link_status:");
if (!strncmp(p, "up", 2)) link_up = 1;
}

p = strstr(str, "master_sync_in_progress:");
if (p) {
p += strlen("master_sync_in_progress:");
sync_in_progress = (int)strtol(p, NULL, 10);
}

freeReplyObject(r);

if (link_up && sync_in_progress == 0)
return 1;

if (err) *err = sdsnew("Replica still syncing");
return 0;
}

static int clusterManagerWaitReplicasFullSync(long long timeout_ms) {
long long start = mstime();
long long backoff = 100;
long long max_backoff = 2000;

while (mstime() - start < timeout_ms) {

int all_good = 1;
listIter li;
listNode *ln;

listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;

if (node->replicate == NULL) continue;

char *err = NULL;
if (!clusterManagerIsReplicaSynced(node, &err)) {
all_good = 0;

if (config.verbose) {
if (err)
clusterManagerLogInfo("[replica-sync] %s:%d not synced: %s\n",
node->ip, node->port, err);
else
clusterManagerLogInfo("[replica-sync] %s:%d not synced\n",
node->ip, node->port);
}
if (err) sdsfree(err);
}
}

if (all_good) return 1;

usleep(backoff * 1000);
backoff = backoff * 2;
if (backoff > max_backoff) backoff = max_backoff;
}

clusterManagerLogErr("[ERR] replica full-sync wait timed-out.\n");
return 0;
}

/* Cluster Manager Commands */

static int clusterManagerCommandCreate(int argc, char **argv) {
Expand Down Expand Up @@ -7567,6 +7657,16 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
result = 0;
goto cleanup;
}

clusterManagerLogInfo(">>> Checking replica sync state before rebalancing...\n");

if (!clusterManagerWaitReplicasFullSync(30000)) { /* 30 seconds timeout */
clusterManagerLogErr("[ERR] Replica sync incomplete. Aborting rebalance.\n");
return 0;
}

clusterManagerLogInfo(">>> All replicas are online. Safe to proceed with rebalancing.\n");

/* Calculate the slots balance for each node. It's the number of
* slots the node should lose (if positive) or gain (if negative)
* in order to be balanced. */
Expand Down
Loading