Skip to content

Commit 676644f

Browse files
committed
Support read only replicas attaching to active replicas (Bug Snapchat#229)
1 parent 22ac56d commit 676644f

File tree

4 files changed

+61
-16
lines changed

4 files changed

+61
-16
lines changed

src/networking.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2260,7 +2260,7 @@ int processMultibulkBuffer(client *c) {
22602260
* 1. The client is reset unless there are reasons to avoid doing it.
22612261
* 2. In the case of master clients, the replication offset is updated.
22622262
* 3. Propagate commands we got from our master to replicas down the line. */
2263-
void commandProcessed(client *c) {
2263+
void commandProcessed(client *c, int flags) {
22642264
long long prev_offset = c->reploff;
22652265
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
22662266
/* Update the applied replication offset of our master. */
@@ -2288,7 +2288,7 @@ void commandProcessed(client *c) {
22882288
ae.arm(c);
22892289
long long applied = c->reploff - prev_offset;
22902290
if (applied) {
2291-
if (!g_pserver->fActiveReplica)
2291+
if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE))
22922292
{
22932293
replicationFeedSlavesFromMasterStream(g_pserver->slaves,
22942294
c->pending_querybuf, applied);
@@ -2312,7 +2312,7 @@ int processCommandAndResetClient(client *c, int flags) {
23122312
serverAssert(GlobalLocksAcquired());
23132313

23142314
if (processCommand(c, flags) == C_OK) {
2315-
commandProcessed(c);
2315+
commandProcessed(c, flags);
23162316
}
23172317
if (serverTL->current_client == NULL) deadclient = 1;
23182318
serverTL->current_client = NULL;

src/replication.cpp

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,6 +1166,7 @@ void processReplconfUuid(client *c, robj *arg)
11661166
* full resync. */
11671167
void replconfCommand(client *c) {
11681168
int j;
1169+
bool fCapaCommand = false;
11691170

11701171
if ((c->argc % 2) == 0) {
11711172
/* Number of arguments must be odd to make sure that every
@@ -1176,6 +1177,7 @@ void replconfCommand(client *c) {
11761177

11771178
/* Process every option-value pair. */
11781179
for (j = 1; j < c->argc; j+=2) {
1180+
fCapaCommand = false;
11791181
if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) {
11801182
long port;
11811183

@@ -1200,6 +1202,8 @@ void replconfCommand(client *c) {
12001202
c->slave_capa |= SLAVE_CAPA_PSYNC2;
12011203
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
12021204
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
1205+
1206+
fCapaCommand = true;
12031207
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
12041208
/* REPLCONF ACK is used by replica to inform the master the amount
12051209
* of replication stream that it processed so far. It is an
@@ -1242,7 +1246,16 @@ void replconfCommand(client *c) {
12421246
return;
12431247
}
12441248
}
1245-
addReply(c,shared.ok);
1249+
1250+
if (fCapaCommand) {
1251+
sds reply = sdsnew("+OK");
1252+
if (g_pserver->fActiveReplica)
1253+
reply = sdscat(reply, " active-replica");
1254+
reply = sdscat(reply, "\r\n");
1255+
addReplySds(c, reply);
1256+
} else {
1257+
addReply(c,shared.ok);
1258+
}
12461259
}
12471260

12481261
/* This function puts a replica in the online state, and should be called just
@@ -2557,6 +2570,30 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
25572570
return PSYNC_NOT_SUPPORTED;
25582571
}
25592572

2573+
void parseMasterCapa(redisMaster *mi, sds strcapa)
2574+
{
2575+
if (sdslen(strcapa) < 1 || strcapa[0] != '+')
2576+
return;
2577+
2578+
char *szStart = strcapa + 1; // skip the +
2579+
char *pchEnd = szStart;
2580+
2581+
mi->isActive = false;
2582+
for (;;)
2583+
{
2584+
if (*pchEnd == ' ' || *pchEnd == '\0') {
2585+
// Parse the word
2586+
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
2587+
mi->isActive = true;
2588+
}
2589+
szStart = pchEnd + 1;
2590+
}
2591+
if (*pchEnd == '\0')
2592+
break;
2593+
++pchEnd;
2594+
}
2595+
}
2596+
25602597
/* This handler fires when the non blocking connect was able to
25612598
* establish a connection with the master. */
25622599
void syncWithMaster(connection *conn) {
@@ -2750,16 +2787,8 @@ void syncWithMaster(connection *conn) {
27502787
*
27512788
* The master will ignore capabilities it does not understand. */
27522789
if (mi->repl_state == REPL_STATE_SEND_CAPA) {
2753-
if (g_pserver->fActiveReplica)
2754-
{
2755-
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
2756-
"capa","eof","capa","psync2","capa","activeExpire",NULL);
2757-
}
2758-
else
2759-
{
2760-
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
2761-
"capa","eof","capa","psync2",NULL);
2762-
}
2790+
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
2791+
"capa","eof","capa","psync2","capa","activeExpire",NULL);
27632792
if (err) goto write_error;
27642793
sdsfree(err);
27652794
mi->repl_state = REPL_STATE_RECEIVE_CAPA;
@@ -2774,6 +2803,8 @@ void syncWithMaster(connection *conn) {
27742803
if (err[0] == '-') {
27752804
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
27762805
"REPLCONF capa: %s", err);
2806+
} else {
2807+
parseMasterCapa(mi, err);
27772808
}
27782809
sdsfree(err);
27792810
mi->repl_state = REPL_STATE_SEND_PSYNC;

src/server.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1735,14 +1735,26 @@ void clientsCron(int iel) {
17351735
freeClientsInAsyncFreeQueue(iel);
17361736
}
17371737

1738+
bool expireOwnKeys()
1739+
{
1740+
if (iAmMaster()) {
1741+
return true;
1742+
} else if (!g_pserver->fActiveReplica && (listLength(g_pserver->masters) == 1)) {
1743+
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
1744+
if (mi->isActive)
1745+
return true;
1746+
}
1747+
return false;
1748+
}
1749+
17381750
/* This function handles 'background' operations we are required to do
17391751
* incrementally in Redis databases, such as active key expiring, resizing,
17401752
* rehashing. */
17411753
void databasesCron(void) {
17421754
/* Expire keys by random sampling. Not required for slaves
17431755
* as master will synthesize DELs for us. */
17441756
if (g_pserver->active_expire_enabled) {
1745-
if (iAmMaster()) {
1757+
if (expireOwnKeys()) {
17461758
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
17471759
} else {
17481760
expireSlaveKeys();
@@ -2461,6 +2473,7 @@ void initMasterInfo(redisMaster *master)
24612473
master->cached_master = NULL;
24622474
master->master_initial_offset = -1;
24632475

2476+
master->isActive = false;
24642477

24652478
master->repl_state = REPL_STATE_NONE;
24662479
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
@@ -3551,7 +3564,7 @@ void call(client *c, int flags) {
35513564
!(flags & CMD_CALL_PROPAGATE_AOF))
35523565
propagate_flags &= ~PROPAGATE_AOF;
35533566

3554-
if (c->cmd->flags & CMD_SKIP_PROPOGATE)
3567+
if ((c->cmd->flags & CMD_SKIP_PROPOGATE) && g_pserver->fActiveReplica)
35553568
propagate_flags &= ~PROPAGATE_REPL;
35563569

35573570
/* Call propagate() only if at least one of AOF / replication

src/server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,6 +1371,7 @@ struct redisMaster {
13711371
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
13721372
long long master_initial_offset; /* Master PSYNC offset. */
13731373

1374+
bool isActive = false;
13741375
int repl_state; /* Replication status if the instance is a replica */
13751376
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
13761377
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */

0 commit comments

Comments
 (0)