Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1731,15 +1731,19 @@ void replySlotsFlushAndFree(client *c, slotRangeArray *slots) {
slotRangeArrayFree(slots);
}

/* Checks that slot ranges are well-formed and non-overlapping. */
int validateSlotRanges(slotRangeArray *slots, sds *err) {
/* Normalizes (sorts and merges adjacent ranges), checks that slot ranges are
* well-formed and non-overlapping. */
int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err) {
unsigned char used_slots[CLUSTER_SLOTS] = {0};

if (slots->num_ranges <= 0 || slots->num_ranges >= CLUSTER_SLOTS) {
*err = sdscatprintf(sdsempty(), "invalid number of slot ranges: %d", slots->num_ranges);
return C_ERR;
}

/* Sort and merge adjacent slot ranges. */
slotRangeArraySortAndMerge(slots);

for (int i = 0; i < slots->num_ranges; i++) {
if (slots->ranges[i].start >= CLUSTER_SLOTS ||
slots->ranges[i].end >= CLUSTER_SLOTS)
Expand Down Expand Up @@ -1789,6 +1793,7 @@ void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end) {
/* Create a slot range string in the format of: "1000-2000 3000-4000 ..." */
sds slotRangeArrayToString(slotRangeArray *slots) {
sds s = sdsempty();
if (slots == NULL || slots->num_ranges == 0) return s;

for (int i = 0; i < slots->num_ranges; i++) {
slotRange *sr = &slots->ranges[i];
Expand Down Expand Up @@ -1826,7 +1831,7 @@ slotRangeArray *slotRangeArrayFromString(sds data) {

/* Validate all ranges */
sds err_msg = NULL;
if (validateSlotRanges(slots, &err_msg) != C_OK) {
if (slotRangeArrayNormalizeAndValidate(slots, &err_msg) != C_OK) {
if (err_msg) sdsfree(err_msg);
goto err;
}
Expand All @@ -1847,13 +1852,32 @@ static int compareSlotRange(const void *a, const void *b) {
return 0;
}

/* Sort slot ranges by start slot and merge adjacent ranges.
* Adjacent means: prev.end + 1 == next.start.
* e.g. 1000-2000 2001-3000 0-100 => 0-100 1000-3000
*
* Note: Overlapping ranges are not merged.*/
void slotRangeArraySortAndMerge(slotRangeArray *slots) {
if (!slots || slots->num_ranges <= 1) return;

qsort(slots->ranges, slots->num_ranges, sizeof(slotRange), compareSlotRange);

int idx = 0;
for (int i = 1; i < slots->num_ranges; i++) {
if (slots->ranges[idx].end + 1 == slots->ranges[i].start)
slots->ranges[idx].end = slots->ranges[i].end;
else
slots->ranges[++idx] = slots->ranges[i];
}
slots->num_ranges = idx + 1;
}

/* Compare two slot range arrays, return 1 if equal, 0 otherwise */
int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) {
if (slots1->num_ranges != slots2->num_ranges) return 0;

/* Sort slot ranges first */
qsort(slots1->ranges, slots1->num_ranges, sizeof(slotRange), compareSlotRange);
qsort(slots2->ranges, slots2->num_ranges, sizeof(slotRange), compareSlotRange);
slotRangeArraySortAndMerge(slots1);
slotRangeArraySortAndMerge(slots2);
Comment on lines 1876 to +1880

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: slotRangeArrayIsEqual checks count before merging

The num_ranges comparison at line 1877 happens before slotRangeArraySortAndMerge() is called at lines 1879-1880. This means two slot range arrays that represent the same set of slots but have different pre-merge num_ranges will incorrectly be reported as not equal.

For example:

  • Array A: {7000-7001, 7002-7003} → num_ranges = 2, merges to {7000-7003} (num_ranges = 1)
  • Array B: {7000-7003} → num_ranges = 1

slotRangeArrayIsEqual(A, B) would return 0 at line 1877 before ever merging, even though both arrays represent slots 7000-7003.

The fix is to move the num_ranges comparison after both slotRangeArraySortAndMerge() calls.

Was this helpful? React with 👍 / 👎

Suggested change
int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) {
if (slots1->num_ranges != slots2->num_ranges) return 0;
/* Sort slot ranges first */
qsort(slots1->ranges, slots1->num_ranges, sizeof(slotRange), compareSlotRange);
qsort(slots2->ranges, slots2->num_ranges, sizeof(slotRange), compareSlotRange);
slotRangeArraySortAndMerge(slots1);
slotRangeArraySortAndMerge(slots2);
int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) {
slotRangeArraySortAndMerge(slots1);
slotRangeArraySortAndMerge(slots2);
if (slots1->num_ranges != slots2->num_ranges) return 0;
for (int i = 0; i < slots1->num_ranges; i++) {
  • Apply suggested fix


for (int i = 0; i < slots1->num_ranges; i++) {
if (slots1->ranges[i].start != slots2->ranges[i].start ||
Expand Down Expand Up @@ -1959,13 +1983,18 @@ void slotRangeArrayIteratorFree(slotRangeArrayIter *it) {
zfree(it);
}

/* Parse slot ranges from the command arguments. Returns NULL on error. */
/* Parse slot range pairs from argv starting at `pos`.
* `argc` is the argument count, `pos` is the first slot argument index.
* Returns a slotRangeArray or NULL on error. */
slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) {
int start, end, count;
slotRangeArray *slots;

serverAssert(pos <= argc);
serverAssert((argc - pos) % 2 == 0);
/* Ensure there is at least one (start,end) slot range pairs. */
if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) {
addReplyErrorArity(c);
return NULL;
}

count = (argc - pos) / 2;
slots = slotRangeArrayCreate(count);
Expand All @@ -1983,8 +2012,8 @@ slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) {
}

sds err = NULL;
if (validateSlotRanges(slots, &err) != C_OK) {
addReplyErrorSds(c, err);
if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) {
sdsfree(err);
slotRangeArrayFree(slots);
return NULL;
Comment on lines 2014 to 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Validation error silently dropped, no reply sent to client

In parseSlotRangesOrReply, the validation error message from slotRangeArrayNormalizeAndValidate() is silently freed at line 2016 without sending any error reply to the client. The old code was:

addReplyErrorSds(c, err);  // sends error AND takes ownership of err

The new code is:

sdsfree(err);  // silently discards error message

Since the function name convention is *OrReply (meaning it replies on error), and callers like clusterMigrationCommandImport simply return when parseSlotRangesOrReply returns NULL (trusting that an error was already sent), the client will receive no response at all. This will cause the client connection to hang or timeout.

The fix should restore the error reply to the client. Note that addReplyErrorSds takes ownership of the sds string, so sdsfree(err) should be removed when using it.

Was this helpful? React with 👍 / 👎

Suggested change
sds err = NULL;
if (validateSlotRanges(slots, &err) != C_OK) {
addReplyErrorSds(c, err);
if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) {
sdsfree(err);
slotRangeArrayFree(slots);
return NULL;
if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) {
addReplyErrorSds(c, err);
slotRangeArrayFree(slots);
return NULL;
}
  • Apply suggested fix

}
Expand Down
3 changes: 2 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ slotRangeArray *slotRangeArrayDup(slotRangeArray *slots);
void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end);
sds slotRangeArrayToString(slotRangeArray *slots);
slotRangeArray *slotRangeArrayFromString(sds data);
void slotRangeArraySortAndMerge(slotRangeArray *slots);
int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2);
slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot);
int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot);
Expand All @@ -193,7 +194,7 @@ slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots);
int slotRangeArrayNext(slotRangeArrayIter *it);
int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it);
void slotRangeArrayIteratorFree(slotRangeArrayIter *it);
int validateSlotRanges(slotRangeArray *slots, sds *err);
int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err);
slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos);

unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command);
Expand Down
66 changes: 47 additions & 19 deletions src/cluster_asm.c
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,6 @@ static asmTask *lookupAsmTaskBySlotRange(slotRange *req) {
/* Validates the given slot ranges for a migration task:
* - Ensures the current node is a master.
* - Verifies all slots are in a STABLE state.
* - Checks that slot ranges are well-formed and non-overlapping.
* - Confirms all slots belong to a single source node.
* - Confirms no ongoing import task that overlaps with the slot ranges.
*
Expand Down Expand Up @@ -804,11 +803,11 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er
* initiated for them. */
source = validateImportSlotRanges(slots, err, NULL);
if (!source)
return NULL;
goto err;

if (source == getMyClusterNode()) {
*err = sdsnew("this node is already the owner of the slot range");
return NULL;
goto err;
}

/* Only support a single task at a time now. */
Expand All @@ -820,15 +819,15 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er
asmTaskCancel(current, "new import requested");
} else {
*err = sdsnew("another ASM task is already in progress");
return NULL;
goto err;
}
}
/* There should be no task in progress. */
serverAssert(listLength(asmManager->tasks) == 0);

/* Create a slot migration task */
asmTask *task = asmTaskCreate(task_id);
task->slots = slotRangeArrayDup(slots);
task->slots = slots;
task->state = ASM_NONE;
task->operation = ASM_IMPORT;
task->source_node = source;
Expand All @@ -842,6 +841,10 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er
sdsfree(slots_str);

return task;

err:
slotRangeArrayFree(slots);
return NULL;
}

/* CLUSTER MIGRATION IMPORT <start-slot end-slot [start-slot end-slot ...]>
Expand All @@ -860,7 +863,6 @@ static void clusterMigrationCommandImport(client *c) {

sds err = NULL;
asmTask *task = asmCreateImportTask(NULL, slots, &err);
slotRangeArrayFree(slots);
if (!task) {
addReplyErrorSds(c, err);
return;
Expand Down Expand Up @@ -1006,6 +1008,20 @@ void clusterMigrationCommand(client *c) {
}
}

/* Return the number of keys in the specified slot ranges. */
unsigned long long asmCountKeysInSlots(slotRangeArray *slots) {
if (!slots) return 0;

unsigned long long key_count = 0;
int total_ranges = slots->num_ranges;
for (int i = 0; i < slots->num_ranges; i++) {
for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
key_count += kvstoreDictSize(server.db[0].keys, j);
}
}
return key_count;
}

/* Log a human-readable message for ASM task lifecycle events. */
void asmLogTaskEvent(asmTask *task, int event) {
sds str = slotRangeArrayToString(task->slots);
Expand All @@ -1021,10 +1037,12 @@ void asmLogTaskEvent(asmTask *task, int event) {
serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str);
break;
case ASM_EVENT_IMPORT_COMPLETED:
serverLog(LL_NOTICE, "Import task %s completed for slots: %s", task->id, str);
serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)",
task->id, str, asmCountKeysInSlots(task->slots));
break;
case ASM_EVENT_MIGRATE_STARTED:
serverLog(LL_NOTICE, "Migrate task %s started for slots: %s", task->id, str);
serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)",
task->id, str, asmCountKeysInSlots(task->slots));
break;
case ASM_EVENT_MIGRATE_FAILED:
serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str);
Expand All @@ -1033,7 +1051,8 @@ void asmLogTaskEvent(asmTask *task, int event) {
serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str);
break;
case ASM_EVENT_MIGRATE_COMPLETED:
serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s", task->id, str);
serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)",
task->id, str, asmCountKeysInSlots(task->slots));
break;
default:
break;
Expand Down Expand Up @@ -2847,24 +2866,36 @@ int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) {
if (err) *err = NULL;

switch (event) {
case ASM_EVENT_IMPORT_START:
ret = asmCreateImportTask(task_id, arg, &errsds) ? C_OK : C_ERR;
case ASM_EVENT_IMPORT_START: {
/* Validate the slot ranges. */
slotRangeArray *slots = slotRangeArrayDup(arg);
if (slotRangeArrayNormalizeAndValidate(slots, &errsds) != C_OK) {
slotRangeArrayFree(slots);
ret = C_ERR;
break;
}
ret = asmCreateImportTask(task_id, slots, &errsds) ? C_OK : C_ERR;
break;
case ASM_EVENT_CANCEL:
}
case ASM_EVENT_CANCEL: {
num_cancelled = clusterAsmCancel(task_id, "user request");
if (arg) *((int *)arg) = num_cancelled;
ret = C_OK;
break;
case ASM_EVENT_HANDOFF:
}
case ASM_EVENT_HANDOFF: {
ret = clusterAsmHandoff(task_id, &errsds);
break;
case ASM_EVENT_DONE:
}
case ASM_EVENT_DONE: {
ret = clusterAsmDone(task_id, &errsds);
break;
default:
}
default: {
ret = C_ERR;
errsds = sdscatprintf(sdsempty(), "Unknown operation: %d", event);
break;
}
}

if (ret != C_OK && errsds && err) {
Expand Down Expand Up @@ -3273,10 +3304,7 @@ void asmActiveTrimStart(void) {
asmManager->active_trim_current_job_trimmed = 0;

/* Count the number of keys to trim */
for (int i = 0; i < slots->num_ranges; i++) {
for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++)
asmManager->active_trim_current_job_keys += kvstoreDictSize(server.db[0].keys, slot);
}
asmManager->active_trim_current_job_keys = asmCountKeysInSlots(slots);

RedisModuleClusterSlotMigrationTrimInfoV1 fsi = {
REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION,
Expand Down
36 changes: 36 additions & 0 deletions tests/unit/cluster/atomic-slot-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,42 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout
wait_for_asm_done
}

test "Test IMPORT with unsorted and adjacent ranges" {
# Redis should sort and merge adjacent ranges
# Adjacent means: prev.end + 1 == next.start
# e.g. 7000-7001 7002-7003 7004-7005 => 7000-7005

# Test with adjacent ranges
set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 7001 7002 7100]
wait_for_asm_done
# verify migration is successfully completed on both nodes
assert_equal "completed" [migration_status 0 $task_id state]
assert_equal "completed" [migration_status 1 $task_id state]
# verify slot ranges are merged correctly
assert_equal "7000-7100" [migration_status 0 $task_id slots]
assert_equal "7000-7100" [migration_status 1 $task_id slots]

# Test with unsorted and adjacent ranges
set task_id [R 1 CLUSTER MIGRATION IMPORT 7050 7051 7010 7049 7000 7005]
wait_for_asm_done
# verify migration is successfully completed on both nodes
assert_equal "completed" [migration_status 0 $task_id state]
assert_equal "completed" [migration_status 1 $task_id state]
# verify slot ranges are merged correctly
assert_equal "7000-7005 7010-7051" [migration_status 0 $task_id slots]
assert_equal "7000-7005 7010-7051" [migration_status 1 $task_id slots]

# Another test with unsorted and adjacent ranges
set task_id [R 1 CLUSTER MIGRATION IMPORT 7007 7007 7008 7009 7006 7006]
wait_for_asm_done
# verify migration is successfully completed on both nodes
assert_equal "completed" [migration_status 0 $task_id state]
assert_equal "completed" [migration_status 1 $task_id state]
# verify slot ranges are merged correctly
assert_equal "7006-7009" [migration_status 0 $task_id slots]
assert_equal "7006-7009" [migration_status 1 $task_id slots]
}

test "Simple slot migration with write load" {
# Perform slot migration while traffic is on and verify data consistency.
# Trimming is disabled on source nodes so, we can compare the dbs after
Expand Down
Loading