Skip to content

Commit 44615d7

Browse files
committed
4-way associative witness table.
1 parent 3e31bc1 commit 44615d7

File tree

3 files changed

+66
-41
lines changed

3 files changed

+66
-41
lines changed

src/server.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ struct redisCommand redisCommandTable[] = {
299299
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
300300
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
301301
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0},
302-
{"wrecord",wrecordCommand,6,"wm",0,NULL,0,0,0,0,0},
302+
{"wrecord",wrecordCommand,7,"wm",0,NULL,0,0,0,0,0},
303303
{"wgc",witnessGcCommand,-5,"wm",0,NULL,0,0,0,0,0},
304304
{"wgetrecoverydata",witnessGetRecoveryDataCommand,2,"wm",0,NULL,0,0,0,0,0}
305305
};

src/witness.c

Lines changed: 63 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818

1919
#define MAX_WITNESS_REQUEST_SIZE 2048
2020
// static const int NUM_ENTRIES_PER_TABLE = 512; // Must be power of 2.
21-
#define WITNESS_NUM_ENTRIES_PER_TABLE 4096 // Must be power of 2.
22-
#define HASH_BITMASK 4095
21+
//#define WITNESS_NUM_ENTRIES_PER_TABLE 4096 // Must be power of 2.
22+
//#define HASH_BITMASK 4095
23+
#define WITNESS_NUM_ENTRIES_PER_TABLE 1024 // Must be power of 2.
24+
#define HASH_BITMASK 1023
25+
#define WITNESS_ASSOCIATIVITY 4
2326

2427
/**
2528
* Holds information to recover an RPC request in case of the master's crash
2629
*/
2730
struct Entry {
28-
bool occupied; // TODO(seojin): check padding to 64-bit improves perf?
29-
int16_t requestSize;
30-
int64_t clientId;
31-
int64_t requestId;
32-
char request[MAX_WITNESS_REQUEST_SIZE];
31+
bool occupied[WITNESS_ASSOCIATIVITY]; // TODO(seojin): check padding to 64-bit improves perf?
32+
int16_t requestSize[WITNESS_ASSOCIATIVITY];
33+
uint32_t keyHash[WITNESS_ASSOCIATIVITY];
34+
int64_t clientId[WITNESS_ASSOCIATIVITY];
35+
int64_t requestId[WITNESS_ASSOCIATIVITY];
36+
char request[WITNESS_ASSOCIATIVITY][MAX_WITNESS_REQUEST_SIZE];
3337
};
3438

3539
/**
@@ -55,13 +59,14 @@ void witnessInit() {
5559

5660
void wrecordCommand(client *c) {
5761
long masterIdx, hashIndex;
58-
long long clientId, requestId;
62+
long long keyHash, clientId, requestId;
5963
if (getLongFromObjectOrReply(c, c->argv[1], &masterIdx, NULL) != C_OK) return;
6064
if (getLongFromObjectOrReply(c, c->argv[2], &hashIndex, NULL) != C_OK) return;
61-
if (getLongLongFromObjectOrReply(c, c->argv[3], &clientId, NULL) != C_OK) return;
62-
if (getLongLongFromObjectOrReply(c, c->argv[4], &requestId, NULL) != C_OK) return;
63-
size_t requestSize = sdslen(c->argv[5]->ptr);
64-
void* data = c->argv[5]->ptr;
65+
if (getLongLongFromObjectOrReply(c, c->argv[3], &keyHash, NULL) != C_OK) return;
66+
if (getLongLongFromObjectOrReply(c, c->argv[4], &clientId, NULL) != C_OK) return;
67+
if (getLongLongFromObjectOrReply(c, c->argv[5], &requestId, NULL) != C_OK) return;
68+
size_t requestSize = sdslen(c->argv[6]->ptr);
69+
void* data = c->argv[6]->ptr;
6570

6671
struct Master* buffer = &masters[masterIdx];
6772
assert(requestSize <= MAX_WITNESS_REQUEST_SIZE);
@@ -73,12 +78,25 @@ void wrecordCommand(client *c) {
7378
return;
7479
}
7580

76-
if (!buffer->table[hashIndex].occupied) {
77-
buffer->table[hashIndex].occupied = true;
78-
buffer->table[hashIndex].requestSize = requestSize;
79-
buffer->table[hashIndex].clientId = clientId;
80-
buffer->table[hashIndex].requestId = requestId;
81-
memcpy(buffer->table[hashIndex].request, data, requestSize);
81+
int slot = WITNESS_ASSOCIATIVITY; // This means not available.
82+
for (int i = 0; i < WITNESS_ASSOCIATIVITY; ++i) {
83+
if (buffer->table[hashIndex].occupied[i]) {
84+
if (buffer->table[hashIndex].keyHash[i] == (uint32_t)keyHash) {
85+
// KeyHash collision with existing request.
86+
slot = WITNESS_ASSOCIATIVITY;
87+
break;
88+
}
89+
} else {
90+
slot = i;
91+
}
92+
}
93+
if (slot < WITNESS_ASSOCIATIVITY) {
94+
buffer->table[hashIndex].occupied[slot] = true;
95+
buffer->table[hashIndex].keyHash[slot] = (uint32_t)keyHash;
96+
buffer->table[hashIndex].requestSize[slot] = requestSize;
97+
buffer->table[hashIndex].clientId[slot] = clientId;
98+
buffer->table[hashIndex].requestId[slot] = requestId;
99+
memcpy(buffer->table[hashIndex].request[slot], data, requestSize);
82100
addReply(c, shared.witnessAccept);
83101
++buffer->occupiedCount;
84102
} else {
@@ -111,20 +129,23 @@ witnessGcCommand(client *c) {
111129
if (getLongLongFromObjectOrReply(c, c->argv[i+1], &clientId, NULL) != C_OK) return;
112130
if (getLongLongFromObjectOrReply(c, c->argv[i+2], &requestId, NULL) != C_OK) return;
113131

114-
if (buffer->table[hashIndex].occupied &&
115-
buffer->table[hashIndex].clientId == clientId &&
116-
buffer->table[hashIndex].requestId == requestId) {
117-
buffer->table[hashIndex].occupied = false;
118-
--buffer->occupiedCount;
119-
// succeeded++;
120-
} else {
121-
// serverLog(LL_NOTICE,"Witness GC failed. hashIndex: %ld, occupied: %d"
122-
// " clientId: %"PRId64" (given %lld) requestId: %"PRId64" (given %lld)",
123-
// hashIndex,
124-
// buffer->table[hashIndex].occupied,
125-
// buffer->table[hashIndex].clientId, clientId,
126-
// buffer->table[hashIndex].requestId, requestId);
127-
// failed++;
132+
for (int slot = 0; slot < WITNESS_ASSOCIATIVITY; ++slot) {
133+
if (buffer->table[hashIndex].occupied[slot] &&
134+
buffer->table[hashIndex].clientId[slot] == clientId &&
135+
buffer->table[hashIndex].requestId[slot] == requestId) {
136+
buffer->table[hashIndex].occupied[slot] = false;
137+
--buffer->occupiedCount;
138+
// succeeded++;
139+
break;
140+
} else {
141+
// serverLog(LL_NOTICE,"Witness GC failed. hashIndex: %ld, occupied: %d"
142+
// " clientId: %"PRId64" (given %lld) requestId: %"PRId64" (given %lld)",
143+
// hashIndex,
144+
// buffer->table[hashIndex].occupied,
145+
// buffer->table[hashIndex].clientId, clientId,
146+
// buffer->table[hashIndex].requestId, requestId);
147+
// failed++;
148+
}
128149
}
129150
}
130151
addReply(c, shared.ok);
@@ -145,17 +166,21 @@ void witnessGetRecoveryDataCommand(client *c) {
145166
int count = 0;
146167
// int totalSize = 0;
147168
for (int i = 0; i < WITNESS_NUM_ENTRIES_PER_TABLE; ++i) {
148-
if (buffer->table[i].occupied) {
149-
// totalSize += buffer->table[i].requestSize;
150-
count++;
169+
for (int slot = 0; slot < WITNESS_ASSOCIATIVITY; ++slot) {
170+
if (buffer->table[i].occupied[slot]) {
171+
// totalSize += buffer->table[i].requestSize;
172+
count++;
173+
}
151174
}
152175
}
153176
addReplyMultiBulkLen(c, count);
154177
// addReplyMultiBulkLen(c, totalSize);
155178
for (int i = 0; i < WITNESS_NUM_ENTRIES_PER_TABLE; ++i) {
156-
if (buffer->table[i].occupied) {
157-
addReplySds(c, sdsnewlen(buffer->table[i].request,
158-
buffer->table[i].requestSize));
179+
for (int slot = 0; slot < WITNESS_ASSOCIATIVITY; ++slot) {
180+
if (buffer->table[i].occupied[slot]) {
181+
addReplySds(c, sdsnewlen(buffer->table[i].request[slot],
182+
buffer->table[i].requestSize[slot]));
183+
}
159184
}
160185
}
161186
}

src/witnessTracker.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ void scheduleFsyncAndWitnessGc() {
7373
void trackUnsyncedRpc(client *c) {
7474
unsyncedRpcs[unsyncedRpcsSize].clientId = c->clientId;
7575
unsyncedRpcs[unsyncedRpcsSize].requestId = c->requestId;
76-
int keyHash;
76+
uint32_t keyHash;
7777
MurmurHash3_x86_32(c->argv[1]->ptr, sdslen(c->argv[1]->ptr), c->db->id, &keyHash);
7878
// serverLog(LL_NOTICE, "dictid: %d, key: %s keyLen: %d", c->db->id, (sds)c->argv[1]->ptr, sdslen(c->argv[1]->ptr));
79-
unsyncedRpcs[unsyncedRpcsSize].hashIndex = keyHash & 4095;
79+
unsyncedRpcs[unsyncedRpcsSize].hashIndex = keyHash & 1023;
8080
++unsyncedRpcsSize;
8181

8282
if (unsyncedRpcsSize == WITNESS_BATCH_SIZE) {

0 commit comments

Comments
 (0)