Skip to content

Commit 7c7a6ff

Browse files
committed
Adding GC commands from witnesses
1 parent d3a2783 commit 7c7a6ff

File tree

9 files changed

+51
-58
lines changed

9 files changed

+51
-58
lines changed

src/Makefile

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ endif
5353
# Override default settings if possible
5454
-include .make-settings
5555

56-
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) -I../deps/geohash-int
57-
FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG)
58-
FINAL_LIBS=-lm
56+
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) -I../deps/geohash-int -I/home/p4/Desktop/witnesscmd -L/home/p4/Desktop/witnesscmd
57+
FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG) -L/home/p4/Desktop/witnesscmd
58+
FINAL_LIBS=-ldl -lwitnesscmd -lm
5959
DEBUG=-g -ggdb
6060

6161
ifeq ($(uname_S),SunOS)
@@ -105,6 +105,7 @@ endif
105105
ifeq ($(MALLOC),jemalloc)
106106
DEPENDENCY_TARGETS+= jemalloc
107107
FINAL_CFLAGS+= -DUSE_JEMALLOC -I../deps/jemalloc/include
108+
FINAL_LDFLAGS+= -L../deps/jemalloc/lib/
108109
FINAL_LIBS+= ../deps/jemalloc/lib/libjemalloc.a
109110
endif
110111

src/aof.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,8 @@ ssize_t aofRewriteBufferWrite(int fd) {
200200
/* Starts a background task that performs fsync() against the specified
201201
* file descriptor (the one of the AOF file) in another thread. */
202202
void aof_background_fsync(int fd) {
203-
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,server.currentOpNum);
203+
bioCreateBackgroundJob(BIO_AOF_FSYNC,
204+
(void*)(long)fd,NULL,server.currentOpNum, 0);
204205
}
205206

206207
/* Called when the user switches from "appendonly yes" to "appendonly no"
@@ -1504,7 +1505,9 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
15041505
server.aof_state = AOF_ON;
15051506

15061507
/* Asynchronously close the overwritten AOF. */
1507-
if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,0);
1508+
if (oldfd != -1) {
1509+
bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,0,0);
1510+
}
15081511

15091512
serverLog(LL_VERBOSE,
15101513
"Background AOF rewrite signal handler took %lldus", ustime()-now);

src/bio.c

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@
6060

6161
#include "server.h"
6262
#include "bio.h"
63+
#include "udp.h"
64+
#include "witnesscmd.h"
65+
66+
#define SRC_ADDR "10.10.10.104"
67+
#define WITNESS_PORT 1111
6368

6469
static pthread_t bio_threads[BIO_NUM_OPS];
6570
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
@@ -81,6 +86,7 @@ struct bio_job {
8186
* arguments we can just pass a pointer to a structure or alike. */
8287
void *arg1, *arg2;
8388
long long arg3;
89+
uint32_t arg4;
8490
};
8591

8692
void *bioProcessBackgroundJobs(void *arg);
@@ -124,13 +130,15 @@ void bioInit(void) {
124130
}
125131
}
126132

127-
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, long long arg3) {
133+
void bioCreateBackgroundJob(int type,
134+
void *arg1, void *arg2, long long arg3, uint32_t arg4) {
128135
struct bio_job *job = zmalloc(sizeof(*job));
129136

130137
job->time = time(NULL);
131138
job->arg1 = arg1;
132139
job->arg2 = arg2;
133140
job->arg3 = arg3;
141+
job->arg4 = arg4;
134142
pthread_mutex_lock(&bio_mutex[type]);
135143
listAddNodeTail(bio_jobs[type],job);
136144
bio_pending[type]++;
@@ -186,17 +194,26 @@ void *bioProcessBackgroundJobs(void *arg) {
186194
aof_fsync((long)job->arg1);
187195
server.aof_last_fsync_opNum = job->arg3;
188196
} else if (type == BIO_FSYNC_AND_GC_WITNESS) {
189-
sds gcCmd = (sds)job->arg1;
197+
/* Create a UDP socket */
198+
int s = createSocket();
199+
/* Get the GC commands and the number of commands */
200+
witnesscmd_t* cmds = (witnesscmd_t*) job->arg1;
201+
uint32_t num_cmds = job->arg4;
202+
/* lastOpNum used for FSYNC */
190203
long long lastOpNum = job->arg3;
191204
if (lastOpNum > server.aof_last_fsync_opNum) {
192205
aof_fsync((long)job->arg2);
193206
server.aof_last_fsync_opNum = lastOpNum;
194207
}
208+
/* Iterate through the witnesses and send GC cmds */
195209
for (int i = 0; i < server.numWitness; ++i) {
196-
if (anetWrite(server.fdToWitness[i], gcCmd, sdslen(gcCmd)) == -1) {
197-
fprintf(stderr, "Error while sending witness GC. %s", strerror(errno));
210+
for (uint32_t j = 0; j < num_cmds; j++) {
211+
udpWrite(s, SRC_ADDR, server.addrToWitness[i], WITNESS_PORT, WITNESS_PORT, witness_data(&cmds[j]), false);
198212
}
199213
}
214+
/* Free cmds and close the socket */
215+
zfree(cmds);
216+
close(s);
200217
} else {
201218
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
202219
}

src/bio.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@
2727
* POSSIBILITY OF SUCH DAMAGE.
2828
*/
2929

30+
#include <stdlib.h>
31+
3032
/* Exported API */
3133
void bioInit(void);
32-
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, long long arg3);
34+
void bioCreateBackgroundJob(int type,
35+
void *arg1, void *arg2, long long arg3, uint32_t arg4);
3336
unsigned long long bioPendingJobsOfType(int type);
3437
void bioWaitPendingJobsLE(int type, unsigned long long num);
3538
time_t bioOlderJobOfType(int type);

src/debug.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
#include "crc64.h"
3333

3434
#include <arpa/inet.h>
35-
#include <signal.h>
3635

3736
#ifdef HAVE_BACKTRACE
3837
#include <execinfo.h>

src/networking.c

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "server.h"
3131
#include "timeTrace.h"
32+
#include "udp.h"
3233
#include <sys/uio.h>
3334
#include <sys/time.h>
3435
#include <math.h>
@@ -2083,17 +2084,3 @@ int processEventsWhileBlocked(void) {
20832084
}
20842085
return count;
20852086
}
2086-
2087-
void connectToWitness() {
2088-
for (int i = 0; i < server.numWitness; ++i) {
2089-
if (server.fdToWitness[i] > 0) {
2090-
continue;
2091-
}
2092-
char err[ANET_ERR_LEN];
2093-
server.fdToWitness[i] = anetTcpConnect(err, server.addrToWitness[i], server.port);
2094-
if (server.fdToWitness[i] == ANET_ERR) {
2095-
serverLog(LL_WARNING, "Error connecting to witness:%s", server.addrToWitness[i]);
2096-
continue;
2097-
}
2098-
}
2099-
}

src/server.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2041,9 +2041,6 @@ void initServer(void) {
20412041
slowlogInit();
20422042
latencyMonitorInit();
20432043
bioInit();
2044-
2045-
/* Connect to witness servers */
2046-
connectToWitness();
20472044
}
20482045

20492046
/* Populates the Redis Command Table starting from the hard coded list

src/server.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1175,7 +1175,6 @@ int handleClientsWithPendingWrites(void);
11751175
int clientHasPendingReplies(client *c);
11761176
void unlinkClient(client *c);
11771177
int writeToClient(int fd, client *c, int handler_installed);
1178-
void connectToWitness();
11791178

11801179
#ifdef __GNUC__
11811180
void addReplyErrorFormat(client *c, const char *fmt, ...)

src/witnessTracker.c

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "rifl.h"
2323
#include "timeTrace.h"
2424
#include "udp.h"
25+
#include "witnesscmd.h"
2526

2627
/* functions from aof.c */
2728
struct client *createFakeClient();
@@ -41,7 +42,7 @@ struct WitnessGcInfo {
4142

4243
/* 0th index is not used. */
4344
struct WitnessGcInfo unsyncedRpcs[WITNESS_BATCH_SIZE] = {{0,0,0}, };
44-
int unsyncedRpcsSize = 0;
45+
uint32_t unsyncedRpcsSize = 0;
4546

4647
struct WitnessGcBioContext {
4748
long long maxOpNum;
@@ -51,37 +52,23 @@ struct WitnessGcBioContext {
5152
/*================================= Functions =============================== */
5253
void scheduleFsyncAndWitnessGc() {
5354
record("start constructing gc RPC.", 0, 0, 0, 0);
54-
// TODO: Construct GC packet
55-
char* masterIdxStr = "1";
56-
int s = createSocket();
57-
sds cmdstr = sdscatprintf(sdsempty(), "*%d\r\n$3\r\nWGC\r\n$%d\r\n%s\r\n",
58-
2 + 3 * unsyncedRpcsSize, (int)strlen(masterIdxStr), masterIdxStr);
59-
for (int i = 0; i < unsyncedRpcsSize; ++i) {
60-
int hashIndex_len, clientId_len, requestId_len;
61-
char hashIndex_str[LONG_STR_SIZE];
62-
char clientId_str[LONG_STR_SIZE];
63-
char requestId_str[LONG_STR_SIZE];
64-
65-
hashIndex_len = ulltoa64(hashIndex_str, sizeof(hashIndex_str),
66-
unsyncedRpcs[i].hashIndex);
67-
clientId_len = ulltoa64(clientId_str, sizeof(clientId_str),
68-
unsyncedRpcs[i].clientId);
69-
requestId_len = ulltoa64(requestId_str, sizeof(requestId_str),
70-
unsyncedRpcs[i].requestId);
71-
// hashIndex_len = ll2string(hashIndex_str, sizeof(hashIndex_str),
72-
// unsyncedRpcs[i].hashIndex);
73-
// clientId_len = ll2string(clientId_str, sizeof(clientId_str),
74-
// unsyncedRpcs[i].clientId);
75-
// requestId_len = ll2string(requestId_str, sizeof(requestId_str),
76-
// unsyncedRpcs[i].requestId);
77-
cmdstr = sdscatprintf(cmdstr, "$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n",
78-
hashIndex_len, hashIndex_str, clientId_len, clientId_str, requestId_len, requestId_str);
55+
/* Create a set of GC Cmds */
56+
witnesscmd_t* cmds =
57+
(witnesscmd_t *) zmalloc(unsyncedRpcsSize * sizeof(witnesscmd_t));
58+
for (uint32_t i = 0; i < unsyncedRpcsSize; ++i) {
59+
init_witnesscmd(cmds, "D", unsyncedRpcs[i].clientId,
60+
unsyncedRpcs[i].requestId,
61+
unsyncedRpcs[i].hashIndex, "", 0);
62+
cmds++;
7963
}
8064
unsyncedRpcsSize = 0;
81-
close(s);
8265
record("constructed gc RPC.", 0, 0, 0, 0);
83-
bioCreateBackgroundJob(BIO_FSYNC_AND_GC_WITNESS, (void*)cmdstr,
84-
(void*)(long)server.aof_fd, server.currentOpNum);
66+
67+
/* Submit the GC job to the background thread */
68+
bioCreateBackgroundJob(BIO_FSYNC_AND_GC_WITNESS,
69+
cmds,
70+
(void*)(long)server.aof_fd, server.currentOpNum,
71+
unsyncedRpcsSize);
8572
record("bioBackgroundJob Created.", 0, 0, 0, 0);
8673
}
8774

0 commit comments

Comments
 (0)