Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1731,15 +1731,19 @@ void replySlotsFlushAndFree(client *c, slotRangeArray *slots) {
slotRangeArrayFree(slots);
}

/* Checks that slot ranges are well-formed and non-overlapping. */
int validateSlotRanges(slotRangeArray *slots, sds *err) {
/* Normalizes (sorts and merges adjacent ranges), checks that slot ranges are
* well-formed and non-overlapping. */
int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err) {
unsigned char used_slots[CLUSTER_SLOTS] = {0};

if (slots->num_ranges <= 0 || slots->num_ranges >= CLUSTER_SLOTS) {
*err = sdscatprintf(sdsempty(), "invalid number of slot ranges: %d", slots->num_ranges);
return C_ERR;
}

/* Sort and merge adjacent slot ranges. */
slotRangeArraySortAndMerge(slots);

for (int i = 0; i < slots->num_ranges; i++) {
if (slots->ranges[i].start >= CLUSTER_SLOTS ||
slots->ranges[i].end >= CLUSTER_SLOTS)
Expand Down Expand Up @@ -1789,6 +1793,7 @@ void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end) {
/* Create a slot range string in the format of: "1000-2000 3000-4000 ..." */
sds slotRangeArrayToString(slotRangeArray *slots) {
sds s = sdsempty();
if (slots == NULL || slots->num_ranges == 0) return s;

for (int i = 0; i < slots->num_ranges; i++) {
slotRange *sr = &slots->ranges[i];
Expand Down Expand Up @@ -1826,7 +1831,7 @@ slotRangeArray *slotRangeArrayFromString(sds data) {

/* Validate all ranges */
sds err_msg = NULL;
if (validateSlotRanges(slots, &err_msg) != C_OK) {
if (slotRangeArrayNormalizeAndValidate(slots, &err_msg) != C_OK) {
if (err_msg) sdsfree(err_msg);
goto err;
}
Expand All @@ -1847,13 +1852,32 @@ static int compareSlotRange(const void *a, const void *b) {
return 0;
}

/* Sort slot ranges by start slot and merge adjacent ranges.
* Adjacent means: prev.end + 1 == next.start.
* e.g. 1000-2000 2001-3000 0-100 => 0-100 1000-3000
*
* Note: Overlapping ranges are not merged.*/
void slotRangeArraySortAndMerge(slotRangeArray *slots) {
if (!slots || slots->num_ranges <= 1) return;

qsort(slots->ranges, slots->num_ranges, sizeof(slotRange), compareSlotRange);

int idx = 0;
for (int i = 1; i < slots->num_ranges; i++) {
if (slots->ranges[idx].end + 1 == slots->ranges[i].start)
slots->ranges[idx].end = slots->ranges[i].end;
else
slots->ranges[++idx] = slots->ranges[i];
}
slots->num_ranges = idx + 1;
}

/* Compare two slot range arrays, return 1 if equal, 0 otherwise */
int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) {
if (slots1->num_ranges != slots2->num_ranges) return 0;

/* Sort slot ranges first */
qsort(slots1->ranges, slots1->num_ranges, sizeof(slotRange), compareSlotRange);
qsort(slots2->ranges, slots2->num_ranges, sizeof(slotRange), compareSlotRange);
slotRangeArraySortAndMerge(slots1);
slotRangeArraySortAndMerge(slots2);

for (int i = 0; i < slots1->num_ranges; i++) {
if (slots1->ranges[i].start != slots2->ranges[i].start ||
Expand Down Expand Up @@ -1959,13 +1983,18 @@ void slotRangeArrayIteratorFree(slotRangeArrayIter *it) {
zfree(it);
}

/* Parse slot ranges from the command arguments. Returns NULL on error. */
/* Parse slot range pairs from argv starting at `pos`.
* `argc` is the argument count, `pos` is the first slot argument index.
* Returns a slotRangeArray or NULL on error. */
slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) {
int start, end, count;
slotRangeArray *slots;

serverAssert(pos <= argc);
serverAssert((argc - pos) % 2 == 0);
/* Ensure there is at least one (start,end) slot range pairs. */
if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) {
addReplyErrorArity(c);
return NULL;
}

count = (argc - pos) / 2;
slots = slotRangeArrayCreate(count);
Expand All @@ -1983,8 +2012,8 @@ slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) {
}

sds err = NULL;
if (validateSlotRanges(slots, &err) != C_OK) {
addReplyErrorSds(c, err);
if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) {
sdsfree(err);
slotRangeArrayFree(slots);
return NULL;
}
Expand Down
3 changes: 2 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ slotRangeArray *slotRangeArrayDup(slotRangeArray *slots);
void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end);
sds slotRangeArrayToString(slotRangeArray *slots);
slotRangeArray *slotRangeArrayFromString(sds data);
void slotRangeArraySortAndMerge(slotRangeArray *slots);
int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2);
slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot);
int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot);
Expand All @@ -193,7 +194,7 @@ slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots);
int slotRangeArrayNext(slotRangeArrayIter *it);
int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it);
void slotRangeArrayIteratorFree(slotRangeArrayIter *it);
int validateSlotRanges(slotRangeArray *slots, sds *err);
int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err);
slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos);

unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command);
Expand Down
66 changes: 47 additions & 19 deletions src/cluster_asm.c
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,6 @@ static asmTask *lookupAsmTaskBySlotRange(slotRange *req) {
/* Validates the given slot ranges for a migration task:
* - Ensures the current node is a master.
* - Verifies all slots are in a STABLE state.
* - Checks that slot ranges are well-formed and non-overlapping.
* - Confirms all slots belong to a single source node.
* - Confirms no ongoing import task that overlaps with the slot ranges.
*
Expand Down Expand Up @@ -804,11 +803,11 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er
* initiated for them. */
source = validateImportSlotRanges(slots, err, NULL);
if (!source)
return NULL;
goto err;

if (source == getMyClusterNode()) {
*err = sdsnew("this node is already the owner of the slot range");
return NULL;
goto err;
}

/* Only support a single task at a time now. */
Expand All @@ -820,15 +819,15 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er
asmTaskCancel(current, "new import requested");
} else {
*err = sdsnew("another ASM task is already in progress");
return NULL;
goto err;
}
}
/* There should be no task in progress. */
serverAssert(listLength(asmManager->tasks) == 0);

/* Create a slot migration task */
asmTask *task = asmTaskCreate(task_id);
task->slots = slotRangeArrayDup(slots);
task->slots = slots;
task->state = ASM_NONE;
task->operation = ASM_IMPORT;
task->source_node = source;
Expand All @@ -842,6 +841,10 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er
sdsfree(slots_str);

return task;

err:
slotRangeArrayFree(slots);
return NULL;
}

/* CLUSTER MIGRATION IMPORT <start-slot end-slot [start-slot end-slot ...]>
Expand All @@ -860,7 +863,6 @@ static void clusterMigrationCommandImport(client *c) {

sds err = NULL;
asmTask *task = asmCreateImportTask(NULL, slots, &err);
slotRangeArrayFree(slots);
if (!task) {
addReplyErrorSds(c, err);
return;
Expand Down Expand Up @@ -1006,6 +1008,20 @@ void clusterMigrationCommand(client *c) {
}
}

/* Return the number of keys in the specified slot ranges. */
unsigned long long asmCountKeysInSlots(slotRangeArray *slots) {
if (!slots) return 0;

unsigned long long key_count = 0;
int total_ranges = slots->num_ranges;
for (int i = 0; i < slots->num_ranges; i++) {
for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
key_count += kvstoreDictSize(server.db[0].keys, j);
}
}
return key_count;
}

/* Log a human-readable message for ASM task lifecycle events. */
void asmLogTaskEvent(asmTask *task, int event) {
sds str = slotRangeArrayToString(task->slots);
Expand All @@ -1021,10 +1037,12 @@ void asmLogTaskEvent(asmTask *task, int event) {
serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str);
break;
case ASM_EVENT_IMPORT_COMPLETED:
serverLog(LL_NOTICE, "Import task %s completed for slots: %s", task->id, str);
serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)",
task->id, str, asmCountKeysInSlots(task->slots));
break;
case ASM_EVENT_MIGRATE_STARTED:
serverLog(LL_NOTICE, "Migrate task %s started for slots: %s", task->id, str);
serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)",
task->id, str, asmCountKeysInSlots(task->slots));
break;
case ASM_EVENT_MIGRATE_FAILED:
serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str);
Expand All @@ -1033,7 +1051,8 @@ void asmLogTaskEvent(asmTask *task, int event) {
serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str);
break;
case ASM_EVENT_MIGRATE_COMPLETED:
serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s", task->id, str);
serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)",
task->id, str, asmCountKeysInSlots(task->slots));
break;
default:
break;
Expand Down Expand Up @@ -2847,24 +2866,36 @@ int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) {
if (err) *err = NULL;

switch (event) {
case ASM_EVENT_IMPORT_START:
ret = asmCreateImportTask(task_id, arg, &errsds) ? C_OK : C_ERR;
case ASM_EVENT_IMPORT_START: {
/* Validate the slot ranges. */
slotRangeArray *slots = slotRangeArrayDup(arg);
if (slotRangeArrayNormalizeAndValidate(slots, &errsds) != C_OK) {
slotRangeArrayFree(slots);
ret = C_ERR;
break;
}
ret = asmCreateImportTask(task_id, slots, &errsds) ? C_OK : C_ERR;
break;
case ASM_EVENT_CANCEL:
}
case ASM_EVENT_CANCEL: {
num_cancelled = clusterAsmCancel(task_id, "user request");
if (arg) *((int *)arg) = num_cancelled;
ret = C_OK;
break;
case ASM_EVENT_HANDOFF:
}
case ASM_EVENT_HANDOFF: {
ret = clusterAsmHandoff(task_id, &errsds);
break;
case ASM_EVENT_DONE:
}
case ASM_EVENT_DONE: {
ret = clusterAsmDone(task_id, &errsds);
break;
default:
}
default: {
ret = C_ERR;
errsds = sdscatprintf(sdsempty(), "Unknown operation: %d", event);
break;
}
}

if (ret != C_OK && errsds && err) {
Expand Down Expand Up @@ -3273,10 +3304,7 @@ void asmActiveTrimStart(void) {
asmManager->active_trim_current_job_trimmed = 0;

/* Count the number of keys to trim */
for (int i = 0; i < slots->num_ranges; i++) {
for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++)
asmManager->active_trim_current_job_keys += kvstoreDictSize(server.db[0].keys, slot);
}
asmManager->active_trim_current_job_keys = asmCountKeysInSlots(slots);

RedisModuleClusterSlotMigrationTrimInfoV1 fsi = {
REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION,
Expand Down
36 changes: 36 additions & 0 deletions tests/unit/cluster/atomic-slot-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,42 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout
wait_for_asm_done
}

test "Test IMPORT with unsorted and adjacent ranges" {
# Redis should sort and merge adjacent ranges
# Adjacent means: prev.end + 1 == next.start
# e.g. 7000-7001 7002-7003 7004-7005 => 7000-7005

# Test with adjacent ranges
set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 7001 7002 7100]
wait_for_asm_done
# verify migration is successfully completed on both nodes
assert_equal "completed" [migration_status 0 $task_id state]
assert_equal "completed" [migration_status 1 $task_id state]
# verify slot ranges are merged correctly
assert_equal "7000-7100" [migration_status 0 $task_id slots]
assert_equal "7000-7100" [migration_status 1 $task_id slots]

# Test with unsorted and adjacent ranges
set task_id [R 1 CLUSTER MIGRATION IMPORT 7050 7051 7010 7049 7000 7005]
wait_for_asm_done
# verify migration is successfully completed on both nodes
assert_equal "completed" [migration_status 0 $task_id state]
assert_equal "completed" [migration_status 1 $task_id state]
# verify slot ranges are merged correctly
assert_equal "7000-7005 7010-7051" [migration_status 0 $task_id slots]
assert_equal "7000-7005 7010-7051" [migration_status 1 $task_id slots]

# Another test with unsorted and adjacent ranges
set task_id [R 1 CLUSTER MIGRATION IMPORT 7007 7007 7008 7009 7006 7006]
wait_for_asm_done
# verify migration is successfully completed on both nodes
assert_equal "completed" [migration_status 0 $task_id state]
assert_equal "completed" [migration_status 1 $task_id state]
# verify slot ranges are merged correctly
assert_equal "7006-7009" [migration_status 0 $task_id slots]
assert_equal "7006-7009" [migration_status 1 $task_id slots]
}

test "Simple slot migration with write load" {
# Perform slot migration while traffic is on and verify data consistency.
# Trimming is disabled on source nodes so, we can compare the dbs after
Expand Down
Loading