Skip to content

Commit 302b91b

Browse files
committed
Simplified RIFL is implemented with AOF recovery support.
1 parent a8bf3b4 commit 302b91b

File tree

9 files changed

+152
-8
lines changed

9 files changed

+152
-8
lines changed

redis.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,8 +620,8 @@ appendfilename "appendonly.aof"
620620
#
621621
# If unsure, use "everysec".
622622

623-
appendfsync always
624-
# appendfsync everysec
623+
# appendfsync always
624+
appendfsync everysec
625625
# appendfsync no
626626

627627
# When the AOF fsync policy is set to always or everysec, and a background

src/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ endif
127127

128128
REDIS_SERVER_NAME=redis-server
129129
REDIS_SENTINEL_NAME=redis-sentinel
130-
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o
130+
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rifl.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o
131131
REDIS_GEOHASH_OBJ=../deps/geohash-int/geohash.o ../deps/geohash-int/geohash_helper.o
132132
REDIS_CLI_NAME=redis-cli
133133
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o

src/Makefile.dep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ sentinel.o: sentinel.c server.h fmacros.h config.h solarisfixes.h \
132132
server.o: server.c server.h fmacros.h config.h solarisfixes.h \
133133
../deps/lua/src/lua.h ../deps/lua/src/luaconf.h ae.h sds.h dict.h \
134134
adlist.h zmalloc.h anet.h ziplist.h intset.h version.h util.h latency.h \
135-
sparkline.h quicklist.h zipmap.h sha1.h endianconv.h crc64.h rdb.h rio.h \
135+
sparkline.h quicklist.h zipmap.h sha1.h endianconv.h crc64.h rdb.h rifl.h rio.h \
136136
cluster.h slowlog.h bio.h asciilogo.h
137137
setproctitle.o: setproctitle.c
138138
sha1.o: sha1.c solarisfixes.h sha1.h config.h

src/aof.c

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "server.h"
3131
#include "bio.h"
32+
#include "rifl.h"
3233
#include "rio.h"
3334

3435
#include <signal.h>
@@ -695,6 +696,14 @@ int loadAppendOnlyFile(char *filename) {
695696
exit(1);
696697
}
697698

699+
// TODO(seojin): RIFL here?
700+
if (cmd->flags & CMD_AT_MOST_ONCE) {
701+
long long clientId, requestId;
702+
getLongLongFromObject(argv[argc-2], &clientId);
703+
getLongLongFromObject(argv[argc-1], &requestId);
704+
riflCheckDuplicate(clientId, requestId);
705+
}
706+
698707
/* Run the command in the context of a fake client */
699708
cmd->proc(fakeClient);
700709

@@ -1069,11 +1078,13 @@ int rewriteAppendOnlyFile(char *filename) {
10691078
/* Save the key and associated value */
10701079
if (o->type == OBJ_STRING) {
10711080
/* Emit a SET command */
1072-
char cmd[]="*3\r\n$3\r\nSET\r\n";
1081+
char cmd[]="*5\r\n$3\r\nSET\r\n";
10731082
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
10741083
/* Key and value */
10751084
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
10761085
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
1086+
char rifl[]="$1\r\n0\r\n$1\r\n0\r\n"; // fill dummy now.
1087+
if (rioWrite(&aof,rifl,sizeof(rifl)-1) == 0) goto werr;
10771088
} else if (o->type == OBJ_LIST) {
10781089
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
10791090
} else if (o->type == OBJ_SET) {
@@ -1102,6 +1113,20 @@ int rewriteAppendOnlyFile(char *filename) {
11021113
di = NULL;
11031114
}
11041115

1116+
/* Now, sweep RIFL table and write SET log accordingly. */
1117+
// TODO(seojin): Make new command. RIFLCR clientId rpcId
1118+
long long clientId;
1119+
for (clientId = riflGetNext(-1); clientId != -1; clientId = riflGetNext(clientId)) {
1120+
long long requestId = riflGetprocessedRpcId(clientId);
1121+
/* Emit a SET command */
1122+
char cmd[]="*5\r\n$3\r\nSET\r\n$5\r\n_rifl\r\n$1\r\n0\r\n";
1123+
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
1124+
/* Key and value */
1125+
if (rioWriteBulkLongLong(&aof, clientId) == 0) goto werr;
1126+
if (rioWriteBulkLongLong(&aof, requestId) == 0) goto werr;
1127+
}
1128+
1129+
11051130
/* Do an initial slow fsync here while the parent is still sending
11061131
* data, in order to make the next final fsync faster. */
11071132
if (fflush(fp) == EOF) goto werr;

src/rifl.c

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) 2017 Stanford University.
3+
* All rights reserved.
4+
*/
5+
6+
#include "server.h"
7+
#include "redisassert.h"
8+
9+
/*================================= Globals ================================= */
10+
11+
/* Global vars */
12+
#define RIFL_TABLE_SIZE 1048576 /* 1024*1024. This must be power of two. */
13+
14+
/* 0th index is not used. */
15+
long long processedRpcIds[RIFL_TABLE_SIZE] = {0, };
16+
long long clientIds[RIFL_TABLE_SIZE] = {0, };
17+
int bitmask = RIFL_TABLE_SIZE - 1;
18+
bool witnessRecoveryMode = false; /* Don't bump processedRpcId while recovery */
19+
20+
/*================================= Functions =============================== */
21+
bool riflCheckClientIdOk(client *c) {
22+
if (clientIds[c->clientId & bitmask]) {
23+
return clientIds[c->clientId & bitmask] == c->clientId;
24+
}
25+
clientIds[c->clientId & bitmask] = c->clientId;
26+
return true;
27+
}
28+
29+
bool riflCheckDuplicate(long long clientId, long long requestId) {
30+
if (clientId == 0) {
31+
assert(requestId == 0);
32+
return false;
33+
}
34+
35+
int index = clientId & bitmask;
36+
if (processedRpcIds[index] >= requestId) {
37+
return true;
38+
}
39+
if (!witnessRecoveryMode) {
40+
processedRpcIds[index] = requestId;
41+
}
42+
return false;
43+
}
44+
45+
void riflStartRecoveryByWitness() {
46+
witnessRecoveryMode = true;
47+
}
48+
49+
void riflEndRecoveryByWitness() {
50+
witnessRecoveryMode = false;
51+
}
52+
53+
54+
long long riflGetNext(long long clientId) {
55+
int index = (clientId + 1) & bitmask;
56+
while (index < RIFL_TABLE_SIZE) {
57+
if (clientIds[index]) {
58+
return clientIds[index];
59+
}
60+
++index;
61+
}
62+
return -1; // End of table.
63+
}
64+
65+
long long riflGetprocessedRpcId(long long clientId) {
66+
int index = clientId & bitmask;
67+
assert(clientIds[index] == clientId);
68+
return processedRpcIds[index];
69+
}

src/rifl.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2017 Stanford University.
3+
* All rights reserved.
4+
*/
5+
6+
#ifndef __RIFL_H
7+
#define __RIFL_H
8+
9+
#include <stdio.h>
10+
11+
/* TBD: include only necessary headers. */
12+
#include "server.h"
13+
14+
/*
15+
* Assumption: client only sends RPCs in order.
16+
* We can assume this since Redis uses TCP socket.
17+
*/
18+
bool riflCheckClientIdOk(client *c);
19+
bool riflCheckDuplicate(long long clientId, long long requestId);
20+
void riflStartRecoveryByWitness();
21+
void riflEndRecoveryByWitness();
22+
23+
/* Used for AOF rewrite. */
24+
long long riflGetNext(long long clientId);
25+
long long riflGetprocessedRpcId(long long clientId);
26+
27+
#endif

src/server.c

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "slowlog.h"
3333
#include "bio.h"
3434
#include "latency.h"
35+
#include "rifl.h"
3536

3637
#include <time.h>
3738
#include <signal.h>
@@ -124,7 +125,7 @@ struct redisServer server; /* server global state */
124125
*/
125126
struct redisCommand redisCommandTable[] = {
126127
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
127-
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
128+
{"set",setCommand,-5,"wmO",0,NULL,1,1,1,0,0},
128129
{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
129130
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
130131
{"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
@@ -1446,6 +1447,8 @@ void createSharedObjects(void) {
14461447
* string in string comparisons for the ZRANGEBYLEX command. */
14471448
shared.minstring = createStringObject("minstring",9);
14481449
shared.maxstring = createStringObject("maxstring",9);
1450+
shared.riflDuplicate = createObject(OBJ_STRING,sdsnew("+OK (RIFL duplicate)\r\n"));
1451+
shared.riflClientIdCollision = createObject(OBJ_STRING,sdsnew("-ERR (RIFL clientId collision)\r\n"));
14491452
}
14501453

14511454
void initServerConfig(void) {
@@ -2027,6 +2030,7 @@ void populateCommandTable(void) {
20272030
case 'M': c->flags |= CMD_SKIP_MONITOR; break;
20282031
case 'k': c->flags |= CMD_ASKING; break;
20292032
case 'F': c->flags |= CMD_FAST; break;
2033+
case 'O': c->flags |= CMD_AT_MOST_ONCE; break;
20302034
default: serverPanic("Unsupported command flag"); break;
20312035
}
20322036
f++;
@@ -2249,6 +2253,19 @@ void call(client *c, int flags) {
22492253
/* Call the command. */
22502254
dirty = server.dirty;
22512255
start = ustime();
2256+
// TODO(seojin): Add RIFL check.
2257+
if (c->cmd->flags & CMD_AT_MOST_ONCE) {
2258+
getLongLongFromObject(c->argv[c->argc-2], &c->clientId);
2259+
getLongLongFromObject(c->argv[c->argc-1], &c->requestId);
2260+
if (!riflCheckClientIdOk(c)) {
2261+
addReply(c, shared.riflClientIdCollision);
2262+
return;
2263+
}
2264+
if (riflCheckDuplicate(c->clientId, c->requestId)){
2265+
addReply(c, shared.riflDuplicate);
2266+
return;
2267+
}
2268+
}
22522269
c->cmd->proc(c);
22532270
duration = ustime()-start;
22542271
dirty = server.dirty-dirty;
@@ -2752,6 +2769,7 @@ void addReplyCommand(client *c, struct redisCommand *cmd) {
27522769
flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
27532770
flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
27542771
flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
2772+
flagcount += addReplyCommandFlag(c,cmd,CMD_AT_MOST_ONCE, "execute_at_most_once");
27552773
if (cmd->getkeys_proc) {
27562774
addReplyStatus(c, "movablekeys");
27572775
flagcount += 1;

src/server.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ typedef long long mstime_t; /* millisecond time type. */
187187
#define CMD_SKIP_MONITOR 2048 /* "M" flag */
188188
#define CMD_ASKING 4096 /* "k" flag */
189189
#define CMD_FAST 8192 /* "F" flag */
190+
#define CMD_AT_MOST_ONCE 16384 /* "O" flag (safe to retry) */
190191

191192
/* Object types */
192193
#define OBJ_STRING 0
@@ -609,6 +610,8 @@ typedef struct client {
609610
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
610611
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
611612
sds peerid; /* Cached peer ID. */
613+
long long clientId; /* RIFL client id. */
614+
long long requestId; /* RIFL request sequence number of current request.*/
612615

613616
/* Response buffer */
614617
int bufpos;
@@ -629,6 +632,7 @@ struct sharedObjectsStruct {
629632
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
630633
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
631634
*lpush, *emptyscan, *minstring, *maxstring,
635+
*riflDuplicate, *riflClientIdCollision,
632636
*select[PROTO_SHARED_SELECT_CMDS],
633637
*integers[OBJ_SHARED_INTEGERS],
634638
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */

src/t_string.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,10 @@ void setCommand(client *c) {
9999
int unit = UNIT_SECONDS;
100100
int flags = OBJ_SET_NO_FLAGS;
101101

102-
for (j = 3; j < c->argc; j++) {
102+
// exclude last 3 RIFL arguments from original Redis argument parser.
103+
for (j = 3; j < c->argc - 3; j++) {
103104
char *a = c->argv[j]->ptr;
104-
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
105+
robj *next = (j == c->argc-1 - 3) ? NULL : c->argv[j+1];
105106

106107
if ((a[0] == 'n' || a[0] == 'N') &&
107108
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&

0 commit comments

Comments
 (0)