Skip to content

Commit ab374e6

Browse files
committed
Supports client-driven retry recovery for SET.
1 parent 6f2b02c commit ab374e6

File tree

9 files changed

+155
-18
lines changed

9 files changed

+155
-18
lines changed

deps/hiredis/hiredis.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,9 @@ static int processItem(redisReader *r) {
518518
case '+':
519519
cur->type = REDIS_REPLY_STATUS;
520520
break;
521+
case '@':
522+
cur->type = REDIS_REPLY_STATUS;
523+
break;
521524
case ':':
522525
cur->type = REDIS_REPLY_INTEGER;
523526
break;

redis.conf

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,9 @@ databases 16
199199
# save ""
200200

201201
save ""
202-
save 900 1
203-
save 300 10
204-
save 60 10000
202+
#save 900 1
203+
#save 300 10
204+
#save 60 10000
205205

206206
# By default Redis will stop accepting writes if RDB snapshots are enabled
207207
# (at least one save point) and the latest background save failed.
@@ -621,8 +621,8 @@ appendfilename "appendonly.aof"
621621
# If unsure, use "everysec".
622622

623623
# appendfsync always
624-
# appendfsync everysec
625-
appendfsync no
624+
appendfsync everysec
625+
# appendfsync no
626626

627627
# When the AOF fsync policy is set to always or everysec, and a background
628628
# saving process (a background save or AOF log background rewriting) is

src/aof.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,8 @@ int loadAppendOnlyFile(char *filename) {
702702
long long clientId, requestId;
703703
getLongLongFromObject(argv[argc-2], &clientId);
704704
getLongLongFromObject(argv[argc-1], &requestId);
705+
fakeClient->clientId = clientId;
706+
riflCheckClientIdOk(fakeClient);
705707
riflCheckDuplicate(clientId, requestId);
706708
}
707709

src/db.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
6060
}
6161
// TODO(seojin): verify every path needs this...
6262
if (!(flags & LOOKUP_NOTOUCH) && de->lastModOpNum > server.aof_last_fsync_opNum) {
63-
serverLog(LL_NOTICE, "Redis read is reading non-durable data. "
64-
"It is blocked by fsync. ModifiedBy: %lld, Synced: %lld, CurrentOp: %lld",
65-
de->lastModOpNum, server.aof_last_fsync_opNum, server.currentOpNum);
63+
// serverLog(LL_NOTICE, "Redis read is reading non-durable data. "
64+
// "It is blocked by fsync. ModifiedBy: %lld, Synced: %lld, CurrentOp: %lld",
65+
// de->lastModOpNum, server.aof_last_fsync_opNum, server.currentOpNum);
6666
flushAppendOnlyFile(1);
6767
aof_fsync(server.aof_fd);
6868
server.aof_last_fsync_opNum = server.currentOpNum - 1;

src/networking.c

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ int clientHasPendingReplies(client *c) {
613613
}
614614

615615
#define MAX_ACCEPTS_PER_CALL 1000
616-
static void acceptCommonHandler(int fd, int flags, char *ip) {
616+
static void acceptCommonHandler(int fd, int flags, char *ip, bool isRecovery) {
617617
client *c;
618618
if ((c = createClient(fd)) == NULL) {
619619
serverLog(LL_WARNING,
@@ -622,6 +622,30 @@ static void acceptCommonHandler(int fd, int flags, char *ip) {
622622
close(fd); /* May be already closed, just ignore errors */
623623
return;
624624
}
625+
c->isRecovery = isRecovery;
626+
627+
if (isRecovery && server.serverState >= SERVER_STATE_NORMAL) {
628+
// Trial of recovery was too late. Server already served normal requests.
629+
char *err = "-ERR server is already in normal mode. Too late for replay.\r\n";
630+
if (write(c->fd,err,strlen(err)) == -1) {
631+
}
632+
server.stat_rejected_conn++;
633+
freeClient(c);
634+
return;
635+
}
636+
637+
if (!isRecovery && server.serverState <= SERVER_STATE_ACCEPTING_REPLAY) {
638+
// char *err = "-RETRY server is not ready.\r\n";
639+
// if (write(c->fd,err,strlen(err)) == -1) {
640+
// }
641+
// Don't send any message. Sending random text causes protocol error
642+
// instead of connection_error, which can be automatically handled
643+
// and retry later.
644+
server.stat_rejected_conn++;
645+
freeClient(c);
646+
return;
647+
}
648+
625649
/* If maxclient directive is set and this is one client more... close the
626650
* connection. Note that we create the client instead to check before
627651
* for this condition, since now the socket is already set in non-blocking
@@ -699,7 +723,27 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
699723
return;
700724
}
701725
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
702-
acceptCommonHandler(cfd,0,cip);
726+
acceptCommonHandler(cfd,0,cip,false);
727+
}
728+
}
729+
730+
void acceptTcpHandler4replay(aeEventLoop *el, int fd, void *privdata, int mask) {
731+
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
732+
char cip[NET_IP_STR_LEN];
733+
UNUSED(el);
734+
UNUSED(mask);
735+
UNUSED(privdata);
736+
737+
while(max--) {
738+
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
739+
if (cfd == ANET_ERR) {
740+
if (errno != EWOULDBLOCK)
741+
serverLog(LL_WARNING,
742+
"Accepting client connection for recovery: %s", server.neterr);
743+
return;
744+
}
745+
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
746+
acceptCommonHandler(cfd,0,cip,true);
703747
}
704748
}
705749

@@ -718,7 +762,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
718762
return;
719763
}
720764
serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
721-
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL);
765+
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL,false);
722766
}
723767
}
724768

@@ -1360,6 +1404,28 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
13601404
freeClient(c);
13611405
return;
13621406
}
1407+
1408+
if (c->isRecovery && server.serverState >= SERVER_STATE_NORMAL) {
1409+
// Trial of recovery was too late. Server already served normal requests.
1410+
char *err = "-ERR server is already in normal mode. Too late for replay.\r\n";
1411+
if (write(c->fd,err,strlen(err)) == -1) {
1412+
}
1413+
freeClient(c);
1414+
return;
1415+
}
1416+
1417+
if (!c->isRecovery && server.serverState <= SERVER_STATE_ACCEPTING_REPLAY) {
1418+
serverLog(LL_WARNING,"Non-recovery connection was accepted while recovery. Dying..");
1419+
char *err = "-RETRY server is not ready.\r\n";
1420+
if (write(c->fd,err,strlen(err)) == -1) {
1421+
}
1422+
resetClient(c);
1423+
return;
1424+
// server.stat_rejected_conn++;
1425+
// freeClient(c);
1426+
// exit(1);
1427+
}
1428+
13631429
processInputBuffer(c);
13641430
}
13651431

src/rifl.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ bool riflCheckDuplicate(long long clientId, long long requestId) {
3434

3535
int index = clientId & bitmask;
3636
if (processedRpcIds[index] >= requestId) {
37+
// serverLog(LL_NOTICE,"RIFL found duplicate. ClientId: %lld, requestId: %lld, lastRpcId: %lld", clientIds[index], requestId, processedRpcIds[index]);
3738
return true;
3839
}
3940
if (!witnessRecoveryMode) {
@@ -42,15 +43,25 @@ bool riflCheckDuplicate(long long clientId, long long requestId) {
4243
return false;
4344
}
4445

46+
void riflPrintData() {
47+
serverLog(LL_NOTICE,"RIFL Table dump after recovery.");
48+
for (int i = 0; i < RIFL_TABLE_SIZE; ++i) {
49+
if (clientIds[i] != 0) {
50+
serverLog(LL_NOTICE,"ClientId: %lld, lastRpcId: %lld", clientIds[i], processedRpcIds[i]);
51+
}
52+
}
53+
}
54+
4555
void riflStartRecoveryByWitness() {
4656
witnessRecoveryMode = true;
57+
riflPrintData();
4758
}
4859

4960
void riflEndRecoveryByWitness() {
5061
witnessRecoveryMode = false;
62+
riflPrintData();
5163
}
5264

53-
5465
long long riflGetNext(long long clientId) {
5566
int index = (clientId + 1) & bitmask;
5667
while (index < RIFL_TABLE_SIZE) {

src/server.c

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,18 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
12891289
migrateCloseTimedoutSockets();
12901290
}
12911291

1292+
run_with_period(1000) {
1293+
/* If server haven't received client's replay request for a while,
1294+
* assume completion of recover and start normal processing. */
1295+
if (server.serverState == SERVER_STATE_ACCEPTING_REPLAY &&
1296+
server.unixtime - server.last_client_replay > SECONDS_WAITING_REPLAY) {
1297+
server.serverState = SERVER_STATE_NORMAL;
1298+
riflEndRecoveryByWitness();
1299+
serverLog(LL_NOTICE,"Recovery finished (recovered %lld operations). "
1300+
"Started to take normal requests.", server.currentOpNum);
1301+
}
1302+
}
1303+
12921304
/* Start a scheduled BGSAVE if the corresponding flag is set. This is
12931305
* useful when we are forced to postpone a BGSAVE because an AOF
12941306
* rewrite is in progress.
@@ -1449,6 +1461,7 @@ void createSharedObjects(void) {
14491461
shared.maxstring = createStringObject("maxstring",9);
14501462
shared.riflDuplicate = createObject(OBJ_STRING,sdsnew("+OK (RIFL duplicate)\r\n"));
14511463
shared.riflClientIdCollision = createObject(OBJ_STRING,sdsnew("-ERR (RIFL clientId collision)\r\n"));
1464+
shared.unsyncedOk = createObject(OBJ_STRING,sdsnew("@OK "));
14521465
}
14531466

14541467
void initServerConfig(void) {
@@ -1462,6 +1475,7 @@ void initServerConfig(void) {
14621475
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
14631476
server.currentOpNum = 0;
14641477
server.port = CONFIG_DEFAULT_SERVER_PORT;
1478+
server.portForRecovery = CONFIG_DEFAULT_RECOVERY_PORT;
14651479
server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
14661480
server.bindaddr_count = 0;
14671481
server.unixsocket = NULL;
@@ -1901,6 +1915,10 @@ void initServer(void) {
19011915
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
19021916
exit(1);
19031917

1918+
if (server.portForRecovery != 0 &&
1919+
listenToPort(server.portForRecovery,server.ipfd4replay,&server.ipfd4replay_count) == C_ERR)
1920+
exit(1);
1921+
19041922
/* Open the listening Unix domain socket. */
19051923
if (server.unixsocket != NULL) {
19061924
unlink(server.unixsocket); /* don't care if this fails */
@@ -1935,6 +1953,7 @@ void initServer(void) {
19351953
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
19361954
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
19371955
server.cronloops = 0;
1956+
server.serverState = SERVER_STATE_ACCEPTING_REPLAY;
19381957
server.rdb_child_pid = -1;
19391958
server.aof_child_pid = -1;
19401959
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
@@ -1956,6 +1975,7 @@ void initServer(void) {
19561975
server.aof_last_write_errno = 0;
19571976
server.repl_good_slaves_count = 0;
19581977
updateCachedTime();
1978+
server.last_client_replay = server.unixtime + 5; // give extra 5 secs for bootstrapping.
19591979

19601980
/* Create the serverCron() time event, that's our main way to process
19611981
* background operations. */
@@ -1974,6 +1994,14 @@ void initServer(void) {
19741994
"Unrecoverable error creating server.ipfd file event.");
19751995
}
19761996
}
1997+
for (j = 0; j < server.ipfd4replay_count; j++) {
1998+
if (aeCreateFileEvent(server.el, server.ipfd4replay[j], AE_READABLE,
1999+
acceptTcpHandler4replay,NULL) == AE_ERR)
2000+
{
2001+
serverPanic(
2002+
"Unrecoverable error creating server.ipfd4replay file event.");
2003+
}
2004+
}
19772005
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
19782006
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
19792007

@@ -2268,7 +2296,9 @@ void call(client *c, int flags) {
22682296
return;
22692297
}
22702298
}
2271-
++server.currentOpNum;
2299+
if (c->cmd->proc != selectCommand) {
2300+
++server.currentOpNum;
2301+
}
22722302
c->cmd->proc(c);
22732303
duration = ustime()-start;
22742304
dirty = server.dirty-dirty;
@@ -4126,6 +4156,9 @@ int main(int argc, char **argv) {
41264156
sentinelIsRunning();
41274157
}
41284158

4159+
// Call this only after AOF recovery.
4160+
riflStartRecoveryByWitness();
4161+
41294162
/* Warning the user about suspicious maxmemory setting. */
41304163
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
41314164
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);

src/server.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ typedef long long mstime_t; /* millisecond time type. */
8080
#define CONFIG_MIN_HZ 1
8181
#define CONFIG_MAX_HZ 500
8282
#define CONFIG_DEFAULT_SERVER_PORT 6379 /* TCP port */
83+
#define CONFIG_DEFAULT_RECOVERY_PORT 6380 /* TCP port for recovery*/
8384
#define CONFIG_DEFAULT_TCP_BACKLOG 511 /* TCP listen backlog */
8485
#define CONFIG_DEFAULT_CLIENT_TIMEOUT 0 /* default client timeout: infinite */
8586
#define CONFIG_DEFAULT_DBNUM 16
@@ -439,6 +440,14 @@ typedef long long mstime_t; /* millisecond time type. */
439440
#define NOTIFY_EVICTED (1<<9) /* e */
440441
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED) /* A */
441442

443+
/* Server's opration state. */
444+
#define SERVER_STATE_ACCEPTING_REPLAY 1 /* Server only takes client's replay requests. */
445+
#define SERVER_STATE_NORMAL 2 /* Server takes normal request now. */
446+
447+
#define SECONDS_WAITING_REPLAY 10 /* server waits extra seconds after last
448+
client replay before swithing from
449+
SERVER_STATE_ACCEPTING_REPLAY to SERVER_STATE_NORMAL. */
450+
442451
/* Get the first bind addr or NULL */
443452
#define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL)
444453

@@ -613,7 +622,7 @@ typedef struct client {
613622
sds peerid; /* Cached peer ID. */
614623
long long clientId; /* RIFL client id. */
615624
long long requestId; /* RIFL request sequence number of current request.*/
616-
625+
bool isRecovery; /* Indicates this connection is for recovery. */
617626
/* Response buffer */
618627
int bufpos;
619628
char buf[PROTO_REPLY_CHUNK_BYTES];
@@ -633,7 +642,7 @@ struct sharedObjectsStruct {
633642
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
634643
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
635644
*lpush, *emptyscan, *minstring, *maxstring,
636-
*riflDuplicate, *riflClientIdCollision,
645+
*riflDuplicate, *riflClientIdCollision, *unsyncedOk,
637646
*select[PROTO_SHARED_SELECT_CMDS],
638647
*integers[OBJ_SHARED_INTEGERS],
639648
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@@ -727,15 +736,20 @@ struct redisServer {
727736
char runid[CONFIG_RUN_ID_SIZE+1]; /* ID always different at every exec. */
728737
int sentinel_mode; /* True if this instance is a Sentinel. */
729738
long long currentOpNum; /* Operation number that we are working on. */
739+
int serverState; /* State of server (accepting recovery, normal).*/
740+
time_t last_client_replay; /* Unix time at which clients stopped sending replay */
730741
/* Networking */
731742
int port; /* TCP listening port */
743+
int portForRecovery; /* TCP listening port for client replay */
732744
int tcp_backlog; /* TCP listen() backlog */
733745
char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
734746
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
735747
char *unixsocket; /* UNIX socket path */
736748
mode_t unixsocketperm; /* UNIX socket permission */
737749
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
738750
int ipfd_count; /* Used slots in ipfd[] */
751+
int ipfd4replay[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
752+
int ipfd4replay_count; /* Used slots in ipfd[] */
739753
int sofd; /* Unix socket file descriptor */
740754
int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */
741755
int cfd_count; /* Used slots in cfd[] */
@@ -1107,6 +1121,7 @@ void setDeferredMultiBulkLength(client *c, void *node, long length);
11071121
void processInputBuffer(client *c);
11081122
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
11091123
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
1124+
void acceptTcpHandler4replay(aeEventLoop *el, int fd, void *privdata, int mask);
11101125
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
11111126
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
11121127
void addReplyBulk(client *c, robj *obj);

src/t_string.c

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,14 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
8989
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
9090
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
9191
"expire",key,c->db->id);
92-
addReply(c, ok_reply ? ok_reply : shared.ok);
92+
93+
// addReply(c, ok_reply ? ok_reply : shared.ok);
94+
if (ok_reply) {
95+
addReply(c, ok_reply);
96+
} else {
97+
sds s = sdsempty();
98+
addReplySds(c, sdscatfmt(s, "%S %U %U\r\n", shared.unsyncedOk->ptr, server.currentOpNum, server.aof_last_fsync_opNum));
99+
}
93100
}
94101

95102
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
@@ -99,8 +106,8 @@ void setCommand(client *c) {
99106
int unit = UNIT_SECONDS;
100107
int flags = OBJ_SET_NO_FLAGS;
101108

102-
// exclude last 3 RIFL arguments from original Redis argument parser.
103-
for (j = 3; j < c->argc - 3; j++) {
109+
// exclude last 2 RIFL arguments from original Redis argument parser.
110+
for (j = 3; j < c->argc - 2; j++) {
104111
char *a = c->argv[j]->ptr;
105112
robj *next = (j == c->argc-1 - 3) ? NULL : c->argv[j+1];
106113

0 commit comments

Comments
 (0)