Skip to content

Commit 17d1b06

Browse files
committed
Fixed throughput, lpush, 64base encode, temp hack for GC stale values.
- Improved throughput by delaying and batching fsync. (Just mark that fsync is necessary before responding back to client. Fsync once per eventloop iteration.) - Output throughput info to log. - Implemented CGAR on lpush. (!!! Not tested yet) - Switched to 64-base encoding for serialization of clientId, rpcId, opNum, and syncNum. - Temporary hack (timeout based) for cleaning stale witness entries.
1 parent 44615d7 commit 17d1b06

File tree

12 files changed

+252
-26
lines changed

12 files changed

+252
-26
lines changed

src/aof.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ void flushAppendOnlyFile(int force) {
432432
return;
433433

434434
/* Perform the fsync if needed. */
435-
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
435+
if (server.aof_fsync == AOF_FSYNC_ALWAYS || server.must_aof_fsync) {
436436
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
437437
* flushing metadata. */
438438
latencyStartMonitor(latency);
@@ -446,6 +446,7 @@ void flushAppendOnlyFile(int force) {
446446
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
447447
server.aof_last_fsync = server.unixtime;
448448
}
449+
server.must_aof_fsync = false;
449450
}
450451

451452
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
@@ -700,8 +701,9 @@ int loadAppendOnlyFile(char *filename) {
700701
// TODO(seojin): RIFL here?
701702
if (cmd->flags & CMD_AT_MOST_ONCE) {
702703
long long clientId, requestId;
703-
getLongLongFromObject(argv[argc-2], &clientId);
704-
getLongLongFromObject(argv[argc-1], &requestId);
704+
getLongLongFromObjectInBase64(argv[argc-2], &clientId);
705+
getLongLongFromObjectInBase64(argv[argc-1], &requestId);
706+
705707
fakeClient->clientId = clientId;
706708
riflCheckClientIdOk(fakeClient);
707709
riflCheckDuplicate(clientId, requestId);

src/db.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "server.h"
3131
#include "cluster.h"
32+
#include "witnessTracker.h"
3233

3334
#include <signal.h>
3435
#include <ctype.h>
@@ -60,12 +61,16 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
6061
}
6162
// TODO(seojin): verify every path needs this...
6263
if (!(flags & LOOKUP_NOTOUCH) && de->lastModOpNum > server.aof_last_fsync_opNum) {
64+
server.must_aof_fsync = true;
6365
// serverLog(LL_NOTICE, "Redis read is reading non-durable data. "
6466
// "It is blocked by fsync. ModifiedBy: %lld, Synced: %lld, CurrentOp: %lld",
6567
// de->lastModOpNum, server.aof_last_fsync_opNum, server.currentOpNum);
66-
flushAppendOnlyFile(1);
67-
aof_fsync(server.aof_fd);
68-
server.aof_last_fsync_opNum = server.currentOpNum - 1;
68+
// flushAppendOnlyFile(1);
69+
// aof_fsync(server.aof_fd);
70+
// server.aof_last_fsync_opNum = server.currentOpNum - 1;
71+
//scheduleFsyncAndWitnessGc();
72+
// TODO(seojin): move these into after handling all clients.
73+
// so that we only fsync once per loop.
6974
}
7075
return val;
7176
} else {

src/networking.c

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "server.h"
3131
#include <sys/uio.h>
32+
#include <sys/time.h>
3233
#include <math.h>
3334

3435
static void setProtocolError(client *c, int pos);
@@ -382,6 +383,31 @@ void addReplyString(client *c, const char *s, size_t len) {
382383
_addReplyStringToList(c,s,len);
383384
}
384385

386+
void addReplyOkCgar(client *c) {//, long long arg1, long long arg2) {
387+
char reply[31]; // "@OK " + two 64-bit ints encoded in 64-base + "\r\n\0".
388+
bzero(reply, 31);
389+
memcpy(reply, shared.unsyncedOk->ptr, 4);
390+
int offset = 4;
391+
int appended = ulltoa64(reply + offset, 28 - offset, server.currentOpNum);
392+
if (appended == 0) {
393+
serverLog(LL_WARNING,"Error encoding currentOpNum (%lld) to ASCII."
394+
" Exiting.",server.currentOpNum);
395+
exit(1);
396+
}
397+
offset += appended;
398+
reply[offset++] = ' ';
399+
appended = ulltoa64(reply + offset, 28 - offset, server.aof_last_fsync_opNum);
400+
if (appended == 0) {
401+
serverLog(LL_WARNING,"Error encoding currentOpNum (%lld) to ASCII."
402+
" Exiting.",server.currentOpNum);
403+
exit(1);
404+
}
405+
offset += appended;
406+
memcpy(reply + offset, "\r\n", 2);
407+
// serverLog(LL_NOTICE,"Reply is constructed: %s (length: %d)", reply, offset);
408+
addReplyString(c, reply, offset + 2);
409+
}
410+
385411
void addReplyErrorLength(client *c, const char *s, size_t len) {
386412
addReplyString(c,"-ERR ",5);
387413
addReplyString(c,s,len);
@@ -705,6 +731,19 @@ static void acceptCommonHandler(int fd, int flags, char *ip, bool isRecovery) {
705731

706732
server.stat_numconnections++;
707733
c->flags |= flags;
734+
735+
// For throughput benchmark, print the throughput from last new connection.
736+
struct timeval tv;
737+
gettimeofday(&tv,NULL);
738+
unsigned long long currentTime = 1000000 * tv.tv_sec + tv.tv_usec;
739+
serverLog(LL_NOTICE, "PerfStat (clientNum, throughput (kops/sec), timestamp): "
740+
"%4lu %7.2f %12llu",
741+
listLength(server.clients) - 1,
742+
(double)(server.currentOpNum - server.last_client_connected_opNum)
743+
* 1e3 / (currentTime - server.last_client_connected_usec),
744+
currentTime);
745+
server.last_client_connected_usec = currentTime;
746+
server.last_client_connected_opNum = server.currentOpNum;
708747
}
709748

710749
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

src/object.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,25 @@ int getLongLongFromObject(robj *o, long long *target) {
631631
return C_OK;
632632
}
633633

634+
int getLongLongFromObjectInBase64(robj *o, long long *target) {
635+
long long value;
636+
637+
if (o == NULL) {
638+
value = 0;
639+
} else {
640+
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
641+
if (sdsEncodedObject(o)) {
642+
if (base64int2ll(o->ptr,sdslen(o->ptr),&value) == 0) return C_ERR;
643+
} else if (o->encoding == OBJ_ENCODING_INT) {
644+
value = (long)o->ptr;
645+
} else {
646+
serverPanic("Unknown string encoding");
647+
}
648+
}
649+
if (target) *target = value;
650+
return C_OK;
651+
}
652+
634653
int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) {
635654
long long value;
636655
if (getLongLongFromObject(o, &value) != C_OK) {

src/server.c

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ struct redisCommand redisCommandTable[] = {
144144
{"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
145145
{"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
146146
{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
147-
{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
147+
{"lpush",lpushCommand,-5,"wmFO",0,NULL,1,1,1,0,0},
148148
{"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
149149
{"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
150150
{"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
@@ -1296,6 +1296,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
12961296
run_with_period(1000) {
12971297
/* If server haven't received client's replay request for a while,
12981298
* assume completion of recover and start normal processing. */
1299+
// TODO(seojin): check recovered by witness. And don't wait if
1300+
// recovered by witness.
12991301
if (server.serverState == SERVER_STATE_ACCEPTING_REPLAY &&
13001302
server.unixtime - server.last_client_replay > SECONDS_WAITING_REPLAY) {
13011303
server.serverState = SERVER_STATE_NORMAL;
@@ -1482,6 +1484,8 @@ void initServerConfig(void) {
14821484
server.currentOpNum = 0;
14831485
server.port = CONFIG_DEFAULT_SERVER_PORT;
14841486
server.portForRecovery = CONFIG_DEFAULT_RECOVERY_PORT;
1487+
server.last_client_connected_usec = 0;
1488+
server.last_client_connected_opNum = 0;
14851489
server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
14861490
server.bindaddr_count = 0;
14871491
server.unixsocket = NULL;
@@ -1506,6 +1510,7 @@ void initServerConfig(void) {
15061510
server.supervised_mode = SUPERVISED_NONE;
15071511
server.aof_state = AOF_OFF;
15081512
server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC;
1513+
server.must_aof_fsync = false;
15091514
server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
15101515
server.aof_rewrite_perc = AOF_REWRITE_PERC;
15111516
server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
@@ -2296,8 +2301,13 @@ void call(client *c, int flags) {
22962301

22972302
// RIFL check.
22982303
if (c->cmd->flags & CMD_AT_MOST_ONCE) {
2299-
getLongLongFromObject(c->argv[c->argc-2], &c->clientId);
2300-
getLongLongFromObject(c->argv[c->argc-1], &c->requestId);
2304+
// getLongLongFromObject(c->argv[c->argc-2], &c->clientId);
2305+
// getLongLongFromObject(c->argv[c->argc-1], &c->requestId);
2306+
getLongLongFromObjectInBase64(c->argv[c->argc-2], &c->clientId);
2307+
getLongLongFromObjectInBase64(c->argv[c->argc-1], &c->requestId);
2308+
2309+
// serverLog(LL_NOTICE,"clientId: %lld, requestId: %lld", c->clientId, c->requestId);
2310+
23012311
if (!riflCheckClientIdOk(c)) {
23022312
addReply(c, shared.riflClientIdCollision);
23032313
return;
@@ -2307,13 +2317,13 @@ void call(client *c, int flags) {
23072317
return;
23082318
}
23092319
}
2310-
if (c->cmd->proc != selectCommand) {
2320+
if (c->cmd->proc != selectCommand && c->cmd->flags & CMD_WRITE) {
23112321
++server.currentOpNum;
23122322
}
23132323
c->cmd->proc(c);
23142324

23152325
// Track unsynced change.
2316-
if (c->cmd->flags & CMD_AT_MOST_ONCE) {
2326+
if (c->cmd->flags & CMD_AT_MOST_ONCE && server.numWitness > 0) {
23172327
trackUnsyncedRpc(c);
23182328
}
23192329

src/server.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ typedef long long mstime_t; /* millisecond time type. */
446446
#define SERVER_STATE_ACCEPTING_REPLAY 1 /* Server only takes client's replay requests. */
447447
#define SERVER_STATE_NORMAL 2 /* Server takes normal request now. */
448448

449-
#define SECONDS_WAITING_REPLAY 10 /* server waits extra seconds after last
449+
#define SECONDS_WAITING_REPLAY 2 /* server waits extra seconds after last
450450
client replay before swithing from
451451
SERVER_STATE_ACCEPTING_REPLAY to SERVER_STATE_NORMAL. */
452452

@@ -770,6 +770,9 @@ struct redisServer {
770770
char *addrToWitness[CONFIG_WITNESS_MAX];
771771
int fdToWitness[CONFIG_WITNESS_MAX];
772772
int numWitness;
773+
/* For throughput benchmark */
774+
unsigned long long last_client_connected_usec;
775+
long long last_client_connected_opNum;
773776
/* RDB / AOF loading information */
774777
int loading; /* We are loading data from disk if true */
775778
off_t loading_total_bytes;
@@ -823,6 +826,7 @@ struct redisServer {
823826
/* AOF persistence */
824827
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
825828
int aof_fsync; /* Kind of fsync() policy */
829+
bool must_aof_fsync; /* indicate this ae cycle need to fsync before responding to client. */
826830
char *aof_filename; /* Name of the AOF file */
827831
int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
828832
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */

src/t_hash.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,9 +530,10 @@ void hmsetCommand(client *c) {
530530
hashTypeSet(o,c->argv[i],c->argv[i+1]);
531531
}
532532
// addReply(c, shared.ok);
533-
sds s = sdsempty();
534-
addReplySds(c, sdscatfmt(s, "%S %U %U\r\n", shared.unsyncedOk->ptr,
535-
server.currentOpNum, server.aof_last_fsync_opNum));
533+
addReplyOkCgar(c);
534+
// sds s = sdsempty();
535+
// addReplySds(c, sdscatfmt(s, "%S %U %U\r\n", shared.unsyncedOk->ptr,
536+
// server.currentOpNum, server.aof_last_fsync_opNum));
536537

537538
signalModifiedKey(c->db,c->argv[1]);
538539
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);

src/t_list.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ void pushGenericCommand(client *c, int where) {
203203
return;
204204
}
205205

206-
for (j = 2; j < c->argc; j++) {
206+
for (j = 2; j < c->argc - 2; j++) {
207207
c->argv[j] = tryObjectEncoding(c->argv[j]);
208208
if (!lobj) {
209209
lobj = createQuicklistObject();

src/t_string.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,18 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
9494
if (ok_reply) {
9595
addReply(c, ok_reply);
9696
} else {
97-
sds s = sdsempty();
98-
addReplySds(c, sdscatfmt(s, "%S %U %U\r\n", shared.unsyncedOk->ptr, server.currentOpNum, server.aof_last_fsync_opNum));
97+
addReplyOkCgar(c);
98+
// char reply[28]; // "@OK " + two 64-bit ints encoded in 64-base.
99+
// bzero(reply, 28);
100+
// memcpy(reply, shared.unsyncedOk->ptr, 4);
101+
// int offset = 4;
102+
// offset += ulltoa64(reply + offset, 28 - offset, server.currentOpNum);
103+
// reply[offset++] = ' ';
104+
// ulltoa64(reply + offset, 28 - offset, server.aof_last_fsync_opNum);
105+
// addReplyString(c, reply, offset);
106+
107+
// sds s = sdsempty();
108+
// addReplySds(c, sdscatfmt(s, "%S %U %U\r\n", shared.unsyncedOk->ptr, server.currentOpNum, server.aof_last_fsync_opNum));
99109
}
100110
}
101111

@@ -382,8 +392,9 @@ void incrDecrCommand(client *c, long long incr) {
382392
addReply(c,shared.colon);
383393
addReply(c,new);
384394

385-
sds s = sdsempty();
386-
addReplySds(c, sdscatfmt(s, " %U %U\r\n", server.currentOpNum, server.aof_last_fsync_opNum));
395+
addReplyOkCgar(c);
396+
// sds s = sdsempty();
397+
// addReplySds(c, sdscatfmt(s, " %U %U\r\n", server.currentOpNum, server.aof_last_fsync_opNum));
387398
// addReply(c,shared.crlf);
388399
}
389400

src/util.c

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,78 @@ int ll2string(char* dst, size_t dstlen, long long svalue) {
328328
return length;
329329
}
330330

331+
int ulltoa64(char* dst, size_t dstlen, long long svalue) {
332+
char* ptr = dst, *ptr1 = dst, tmp_char;
333+
uint64_t tmp_value;
334+
size_t plen = 0;
335+
336+
do {
337+
if (plen >= dstlen) {
338+
return 0; // Error. Not enough space.
339+
}
340+
tmp_value = svalue & 63;
341+
svalue >>= 6;
342+
*ptr++ = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=" [tmp_value];
343+
plen++;
344+
} while (svalue);
345+
346+
*ptr-- = '\0';
347+
while (ptr1 < ptr) {
348+
tmp_char = *ptr;
349+
*ptr-- = *ptr1;
350+
*ptr1++ = tmp_char;
351+
}
352+
return (int)plen;
353+
}
354+
355+
/* Convert a string of 64-base encoded integer into a long long.
356+
* Returns 1 if the string could be parsed into a (non-overflowing) long long,
357+
* 0 otherwise. The value will be set to the parsed value when appropriate. */
358+
int base64int2ll(const char *s, size_t slen, long long *value) {
359+
long long v = 0;
360+
const char *p = s;
361+
uint plen = 0;
362+
// int base64inv[128];
363+
// const char* code = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
364+
// size_t codeLen = strlen(code);
365+
// for (int i = 0; i < 128; ++i) {
366+
// base64inv[i] = -1;
367+
// }
368+
// for (uint i = 0; i < codeLen; ++i) {
369+
// base64inv[(uint)code[i]] = i;
370+
// }
371+
//
372+
// printf("{");
373+
// for (int i = 0; i < 128; ++i) {
374+
// printf("%d, ", base64inv[i]);
375+
// }
376+
// printf("}");
377+
378+
// This table is generated by the code above.
379+
int base64inv[] = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
380+
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
381+
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
382+
-1, 62, -1, -1, -1, 63, 52, 53, 54, 55, 56, 57, 58, 59,
383+
60, 61, -1, -1, -1, 64, -1, -1, -1, 0, 1, 2, 3, 4, 5,
384+
6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
385+
21, 22, 23, 24, 25, -1, -1, -1, -1, -1, -1, 26, 27, 28,
386+
29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42,
387+
43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1};
388+
389+
while (plen < slen && p[0] != 0) {
390+
int digit = base64inv[(uint)p[0]];
391+
if (digit < 0) {
392+
return 0; // error!
393+
}
394+
v <<= 6;
395+
v += digit;
396+
p++;
397+
plen++;
398+
}
399+
*value = v;
400+
return 1;
401+
}
402+
331403
/* Convert a string into a long long. Returns 1 if the string could be parsed
332404
* into a (non-overflowing) long long, 0 otherwise. The value will be set to
333405
* the parsed value when appropriate. */

0 commit comments

Comments
 (0)