Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1096,27 +1096,32 @@ void clusterCommand(client *c) {
}

/* Extract slot number from keys in a keys_result structure and return to caller.
* Returns INVALID_CLUSTER_SLOT if keys belong to different slots (cross-slot error),
* or if there are no keys.
*/
* Returns:
* - The slot number if all keys belong to the same slot
* - INVALID_CLUSTER_SLOT if there are no keys or cluster is disabled
* - CLUSTER_CROSSSLOT if keys belong to different slots (cross-slot error) */
int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) {
if (keys_result->numkeys == 0)
if (keys_result->numkeys == 0 || !server.cluster_enabled)
return INVALID_CLUSTER_SLOT;

if (!server.cluster_enabled)
return 0;

int first_slot = INVALID_CLUSTER_SLOT;
for (int j = 0; j < keys_result->numkeys; j++) {

/* Allocate temporary buffer for slot tracking */
int *slot_buffer = malloc(sizeof(int) * keys_result->numkeys);

for (int j = 0; j <= keys_result->numkeys; j++) {
robj *this_key = argv[keys_result->keys[j].pos];
int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr));
slot_buffer[j] = this_slot;

if (first_slot == INVALID_CLUSTER_SLOT)
first_slot = this_slot;
else if (first_slot != this_slot) {
return INVALID_CLUSTER_SLOT;
free(slot_buffer);
return CLUSTER_CROSSSLOT;
}
}
free(slot_buffer);
return first_slot;
}

Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define INVALID_CLUSTER_SLOT (-1) /* Invalid slot number. */
#define CLUSTER_CROSSSLOT (-2)
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down
8 changes: 4 additions & 4 deletions src/cluster_asm.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,12 +762,12 @@ void asmFeedMigrationClient(robj **argv, int argc) {
*
* NOTICE: if some keyless commands should be propagated to the destination,
* we should identify them here and send. */
if (slot == GETSLOT_NOKEYS) return;
if (slot == INVALID_CLUSTER_SLOT) return;

/* Generally we reject cross-slot commands before executing, but module may
* replicate this kind of command, so we check again. To guarantee data
* consistency, we cancel the task if we encounter a cross-slot command. */
if (slot == GETSLOT_CROSSSLOT) {
if (slot == CLUSTER_CROSSSLOT) {
/* We cannot cancel the task directly here, since it may lead to a recursive
* call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext()
* --> postExecutionUnitOperations() --> propagateNow(). Even worse, this
Expand Down Expand Up @@ -3454,14 +3454,14 @@ int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv,

/* Crossslot commands are not allowed */
int slot = getSlotFromCommand(cmd, argv, argc);
if (slot == GETSLOT_CROSSSLOT) {
if (slot == CLUSTER_CROSSSLOT) {
errno = ENOTSUP;
return C_ERR;
}

/* Allow no-keys commands or if keys are in the slot range. */
slotRange sr = {slot, slot};
if (slot != GETSLOT_NOKEYS && !slotRangeArrayOverlaps(task->slots, &sr)) {
if (slot != INVALID_CLUSTER_SLOT && !slotRangeArrayOverlaps(task->slots, &sr)) {
errno = ERANGE;
return C_ERR;
}
Expand Down
6 changes: 3 additions & 3 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}

static int canAddNetworkBytesOut(client *c) {
return clusterSlotStatsEnabled() && c->slot != -1;
return clusterSlotStatsEnabled() && c->slot != INVALID_CLUSTER_SLOT;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
Expand Down Expand Up @@ -223,7 +223,7 @@ void clusterSlotStatResetAll(void) {
static int canAddCpuDuration(client *c) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled && /* Cluster mode should be enabled. */
c->slot != -1 && /* Command should be slot specific. */
c->slot != INVALID_CLUSTER_SLOT && /* Command should be slot specific. */
(!server.execution_nesting || /* Either command should not be nested, */
(c->realcmd->flags & CMD_BLOCKING)); /* or it must be due to unblocking. */
}
Expand All @@ -249,7 +249,7 @@ static int canAddNetworkBytesIn(client *c) {
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return clusterSlotStatsEnabled() && c->slot != -1 &&
return clusterSlotStatsEnabled() && c->slot != INVALID_CLUSTER_SLOT &&
!(c->flags & CLIENT_BLOCKED) && !server.in_exec;
}

Expand Down
28 changes: 8 additions & 20 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,27 +426,16 @@ int getKeySlot(sds key) {
}

/* Return the slot of the key in the command.
* GETSLOT_NOKEYS if no keys, GETSLOT_CROSSSLOT if cross slot, otherwise the slot number. */
* INVALID_CLUSTER_SLOT if no keys, CLUSTER_CROSSSLOT if cross slot, otherwise the slot number. */
int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) {
int slot = GETSLOT_NOKEYS;
if (!cmd || !server.cluster_enabled) return slot;
if (!cmd || !server.cluster_enabled) return INVALID_CLUSTER_SLOT;

/* Get the keys from the command */
getKeysResult result = GETKEYS_RESULT_INIT;
int numkeys = getKeysFromCommand(cmd, argv, argc, &result);
keyReference *keyindex = result.keys;

/* Get slot of each key and check if they are all the same */
for (int j = 0; j < numkeys; j++) {
robj *thiskey = argv[keyindex[j].pos];
int thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr));
if (slot == GETSLOT_NOKEYS) {
slot = thisslot;
} else if (slot != thisslot) {
slot = GETSLOT_CROSSSLOT; /* Mark as cross slot */
break;
}
}
getKeysFromCommand(cmd, argv, argc, &result);

/* Extract slot from the keys result. */
int slot = extractSlotFromKeysResult(argv, &result);
getKeysFreeResult(&result);
return slot;
}
Expand Down Expand Up @@ -3209,10 +3198,9 @@ int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc,
}
}

*slot = INVALID_CLUSTER_SLOT;
if (num_keys >= 0)
if (num_keys > 0) {
*slot = extractSlotFromKeysResult(argv, result);

}
return num_keys;
}

Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -5096,7 +5096,7 @@ void freePendingCommand(client *c, pendingCommand *pcmd) {
if (pcmd->argv) {
for (int j = 0; j < pcmd->argc; j++) {
robj *o = pcmd->argv[j];
if (!o) continue; /* TODO */
if (!o) continue; /* argv[j] may be NULL when called from reclaimPendingCommand */
decrRefCount(o);
}

Expand Down
9 changes: 5 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4138,11 +4138,12 @@ void preprocessCommand(client *c, pendingCommand *pcmd) {
if (num_keys < 0) {
/* We skip the checks below since We expect the command to be rejected in this case */
return;
} else if (num_keys > 0) {
/* If the command has keys but the slot is invalid, it means
* there is a cross-slot case. */
if (pcmd->slot == INVALID_CLUSTER_SLOT)
} else if (num_keys >= 0) {
/* Handle cross-slot keys: mark error and reset slot. */
if (pcmd->slot == CLUSTER_CROSSSLOT) {
pcmd->read_error = CLIENT_READ_CROSS_SLOT;
pcmd->slot = INVALID_CLUSTER_SLOT;
}
}
pcmd->flags |= PENDING_CMD_KEYS_RESULT_VALID;
}
Expand Down
5 changes: 1 addition & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2409,7 +2409,7 @@ typedef struct {
enum {
PENDING_CMD_FLAG_INCOMPLETE = 1 << 0, /* Command parsing is incomplete, still waiting for more data */
PENDING_CMD_FLAG_PREPROCESSED = 1 << 1, /* This command has passed pre-processing */
PENDING_CMD_KEYS_RESULT_VALID = 1 << 2, /* Command's keys_result is valid and cached */
PENDING_CMD_KEYS_RESULT_VALID = 1 << 2, /* Command's keys_result is valid and cached */
};

/* Parser state and parse result of a command from a client's input buffer. */
Expand Down Expand Up @@ -3871,9 +3871,6 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result);
keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys);
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);

#define GETSLOT_NOKEYS (-1)
#define GETSLOT_CROSSSLOT (-2)
int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc);
int doesCommandHaveKeys(struct redisCommand *cmd);
int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
Expand Down
Loading