Skip to content

Commit 8f7d977

Browse files
committed
Avoid memory overhead in situations it won't be used (active rep only)
1 parent 064440e commit 8f7d977

File tree

9 files changed

+106
-65
lines changed

9 files changed

+106
-65
lines changed

src/Makefile

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,6 @@ endif
4747

4848
USEASM?=true
4949

50-
ifeq ($(NOMVCC),)
51-
CFLAGS+= -DENABLE_MVCC
52-
CXXFLAGS+= -DENABLE_MVCC
53-
endif
54-
5550
ifneq ($(SANITIZE),)
5651
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
5752
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE

src/aof.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1426,7 +1426,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
14261426
/* Iterate this DB writing every entry */
14271427
while((de = dictNext(di)) != NULL) {
14281428
sds keystr;
1429-
robj key, *o;
1429+
redisObjectStack key;
1430+
robj *o = nullptr;
14301431

14311432
keystr = (sds)dictGetKey(de);
14321433
o = (robj*)dictGetVal(de);

src/db.cpp

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,9 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
9292

9393
updateDbValAccess(de, flags);
9494

95-
#ifdef ENABLE_MVCC
9695
if (flags & LOOKUP_UPDATEMVCC) {
97-
val->mvcc_tstamp = getMvccTstamp();
96+
setMvccTstamp(val, getMvccTstamp());
9897
}
99-
#endif
10098
return val;
10199
} else {
102100
return NULL;
@@ -208,9 +206,9 @@ int dbAddCore(redisDb *db, robj *key, robj *val) {
208206
serverAssert(!val->FExpires());
209207
sds copy = sdsdup(szFromObj(key));
210208
int retval = dictAdd(db->pdict, copy, val);
211-
#ifdef ENABLE_MVCC
212-
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
213-
#endif
209+
uint64_t mvcc = getMvccTstamp();
210+
setMvccTstamp(key, mvcc);
211+
setMvccTstamp(val, mvcc);
214212

215213
if (retval == DICT_OK)
216214
{
@@ -260,9 +258,7 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd
260258
if (fUpdateMvcc) {
261259
if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
262260
val = dupStringObject(val);
263-
#ifdef ENABLE_MVCC
264-
val->mvcc_tstamp = getMvccTstamp();
265-
#endif
261+
setMvccTstamp(val, getMvccTstamp());
266262
}
267263

268264
dictSetVal(db->pdict, de, val);
@@ -296,14 +292,12 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
296292
if (de == nullptr)
297293
return (dbAddCore(db, key, val) == DICT_OK);
298294

299-
#ifdef ENABLE_MVCC
300295
robj *old = (robj*)dictGetVal(de);
301-
if (old->mvcc_tstamp <= val->mvcc_tstamp)
296+
if (mvccFromObj(old) <= mvccFromObj(val))
302297
{
303298
dbOverwriteCore(db, de, key, val, false, true);
304299
return true;
305300
}
306-
#endif
307301

308302
return false;
309303
}
@@ -1494,7 +1488,6 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
14941488
void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey)
14951489
{
14961490
robj *argv[3];
1497-
robj objT;
14981491
redisCommand *cmd = nullptr;
14991492
switch (type)
15001493
{

src/defrag.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
5555
* returns NULL in case the allocatoin wasn't moved.
5656
* when it returns a non-null value, the old pointer was already released
5757
* and should NOT be accessed. */
58-
void* activeDefragAlloc(void *ptr) {
58+
template<typename TPTR>
59+
TPTR* activeDefragAlloc(TPTR *ptr) {
5960
size_t size;
6061
void *newptr;
6162
if(!je_get_defrag_hint(ptr)) {
@@ -70,7 +71,14 @@ void* activeDefragAlloc(void *ptr) {
7071
newptr = zmalloc_no_tcache(size);
7172
memcpy(newptr, ptr, size);
7273
zfree_no_tcache(ptr);
73-
return newptr;
74+
return (TPTR*)newptr;
75+
}
76+
77+
template<>
78+
robj* activeDefragAlloc(robj *o) {
79+
void *pvSrc = allocPtrFromObj(o);
80+
void *pvDst = activeDefragAlloc(pvSrc);
81+
return objFromAllocPtr(pvDst);
7482
}
7583

7684
/*Defrag helper for sds strings

src/expire.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
8080
robj *val = (robj*)dictGetVal(de);
8181
int deleted = 0;
8282

83-
robj objKey;
83+
redisObjectStack objKey;
8484
initStaticStringObject(objKey, (char*)e.key());
8585
bool fTtlChanged = false;
8686

@@ -145,7 +145,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
145145
serverAssert(false);
146146
}
147147

148-
robj objSubkey;
148+
redisObjectStack objSubkey;
149149
initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get());
150150
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
151151

src/networking.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ size_t getStringObjectSdsUsedMemory(robj *o) {
5656
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
5757
switch(o->encoding) {
5858
case OBJ_ENCODING_RAW: return sdsZmallocSize((sds)ptrFromObj(o));
59-
case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
59+
case OBJ_ENCODING_EMBSTR: return zmalloc_size(allocPtrFromObj(o))-sizeof(robj);
6060
default: return 0; /* Just integer encoding for now. */
6161
}
6262
}

src/object.cpp

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@
4141
/* ===================== Creation and parsing of objects ==================== */
4242

4343
robj *createObject(int type, void *ptr) {
44-
robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED);
44+
size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0;
45+
char *oB = (char*)zcalloc(sizeof(robj)+mvccExtraBytes, MALLOC_SHARED);
46+
robj *o = reinterpret_cast<robj*>(oB + mvccExtraBytes);
47+
4548
o->type = type;
4649
o->encoding = OBJ_ENCODING_RAW;
4750
o->m_ptr = ptr;
4851
o->setrefcount(1);
49-
#ifdef ENABLE_MVCC
50-
o->mvcc_tstamp = OBJ_MVCC_INVALID;
51-
#endif
52+
setMvccTstamp(o, OBJ_MVCC_INVALID);
5253

5354
/* Set the LRU to the current lruclock (minutes resolution), or
5455
* alternatively the LFU counter. */
@@ -97,15 +98,16 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
9798
size_t allocsize = sizeof(struct sdshdr8)+len+1;
9899
if (allocsize < sizeof(void*))
99100
allocsize = sizeof(void*);
100-
robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
101+
102+
size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0;
103+
char *oB = (char*)zcalloc(sizeof(robj)+allocsize-sizeof(redisObject::m_ptr)+mvccExtraBytes, MALLOC_SHARED);
104+
robj *o = reinterpret_cast<robj*>(oB + mvccExtraBytes);
101105
struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr);
102106

103107
o->type = OBJ_STRING;
104108
o->encoding = OBJ_ENCODING_EMBSTR;
105109
o->setrefcount(1);
106-
#ifdef ENABLE_MVCC
107-
o->mvcc_tstamp = OBJ_MVCC_INVALID;
108-
#endif
110+
setMvccTstamp(o, OBJ_MVCC_INVALID);
109111

110112
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
111113
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
@@ -133,11 +135,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
133135
*
134136
* The current limit of 52 is chosen so that the biggest string object
135137
* we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
136-
#ifdef ENABLE_MVCC
137-
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48
138-
#else
139-
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 256
140-
#endif
138+
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 52
141139

142140
//static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total");
143141
robj *createStringObject(const char *ptr, size_t len) {
@@ -399,7 +397,11 @@ void decrRefCount(robj_roptr o) {
399397
case OBJ_CRON: freeCronObject(o); break;
400398
default: serverPanic("Unknown object type"); break;
401399
}
402-
zfree(o.unsafe_robjcast());
400+
if (g_pserver->fActiveReplica) {
401+
zfree(reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast())-1);
402+
} else {
403+
zfree(o.unsafe_robjcast());
404+
}
403405
} else {
404406
if (prev <= 0) serverPanic("decrRefCount against refcount <= 0");
405407
}
@@ -1326,12 +1328,11 @@ NULL
13261328
* because we update the access time only
13271329
* when the key is read or overwritten. */
13281330
addReplyLongLong(c,LFUDecrAndReturn(o));
1329-
#ifdef ENABLE_MVCC
13301331
} else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) {
13311332
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
13321333
== NULL) return;
1333-
addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000);
1334-
#endif
1334+
uint64_t mvcc = mvccFromObj(o);
1335+
addReplyLongLong(c, (g_pserver->mstime - (mvcc >> MVCC_MS_SHIFT)) / 1000);
13351336
} else {
13361337
addReplySubcommandSyntaxError(c);
13371338
}
@@ -1511,3 +1512,39 @@ void redisObject::setrefcount(unsigned ref)
15111512
serverAssert(!FExpires());
15121513
refcount.store(ref, std::memory_order_relaxed);
15131514
}
1515+
1516+
redisObjectStack::redisObjectStack()
1517+
{
1518+
// We need to ensure the Extended Object is first in the class layout
1519+
serverAssert(reinterpret_cast<ptrdiff_t>(static_cast<redisObject*>(this)) != reinterpret_cast<ptrdiff_t>(this));
1520+
}
1521+
1522+
void *allocPtrFromObj(robj_roptr o) {
1523+
if (g_pserver->fActiveReplica)
1524+
return reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast()) - 1;
1525+
return o.unsafe_robjcast();
1526+
}
1527+
1528+
robj *objFromAllocPtr(void *pv) {
1529+
if (g_pserver->fActiveReplica) {
1530+
return reinterpret_cast<robj*>(reinterpret_cast<redisObjectExtended*>(pv)+1);
1531+
}
1532+
return reinterpret_cast<robj*>(pv);
1533+
}
1534+
1535+
uint64_t mvccFromObj(robj_roptr o)
1536+
{
1537+
if (g_pserver->fActiveReplica) {
1538+
redisObjectExtended *oe = reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast()) - 1;
1539+
return oe->mvcc_tstamp;
1540+
}
1541+
return OBJ_MVCC_INVALID;
1542+
}
1543+
1544+
void setMvccTstamp(robj *o, uint64_t mvcc)
1545+
{
1546+
if (!g_pserver->fActiveReplica)
1547+
return;
1548+
redisObjectExtended *oe = reinterpret_cast<redisObjectExtended*>(o) - 1;
1549+
oe->mvcc_tstamp = mvcc;
1550+
}

src/rdb.cpp

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,10 +1089,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) {
10891089
}
10901090

10911091
char szT[32];
1092-
#ifdef ENABLE_MVCC
1093-
snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp);
1094-
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
1095-
#endif
1092+
if (g_pserver->fActiveReplica) {
1093+
snprintf(szT, 32, "%" PRIu64, mvccFromObj(val));
1094+
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
1095+
}
10961096

10971097
/* Save type, key, value */
10981098
if (rdbSaveObjectType(rdb,val) == -1) return -1;
@@ -1146,7 +1146,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
11461146

11471147
int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o)
11481148
{
1149-
robj key;
1149+
redisObjectStack key;
11501150

11511151
initStaticStringObject(key,(char*)keystr);
11521152
expireEntry *pexpire = getExpire(db, &key);
@@ -1999,7 +1999,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
19991999
exit(1);
20002000
}
20012001
RedisModuleIO io;
2002-
robj keyobj;
2002+
redisObjectStack keyobj;
20032003
initStaticStringObject(keyobj,key);
20042004
moduleInitIOContext(io,mt,rdb,&keyobj);
20052005
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
@@ -2048,9 +2048,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
20482048
return NULL;
20492049
}
20502050

2051-
#ifdef ENABLE_MVCC
2052-
o->mvcc_tstamp = mvcc_tstamp;
2053-
#endif
2051+
setMvccTstamp(o, mvcc_tstamp);
20542052
serverAssert(!o->FExpires());
20552053
return o;
20562054
}
@@ -2318,7 +2316,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
23182316
}
23192317
}
23202318
else {
2321-
redisObject keyobj;
2319+
redisObjectStack keyobj;
23222320
initStaticStringObject(keyobj,key);
23232321
setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10));
23242322
decrRefCount(subexpireKey);
@@ -2402,18 +2400,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
24022400
key = nullptr;
24032401
goto eoferr;
24042402
}
2405-
#ifdef ENABLE_MVCC
2406-
bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false;
2407-
#else
2408-
bool fStaleMvccKey = false;
2409-
#endif
2403+
bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false;
24102404

24112405
/* Check if the key already expired. This function is used when loading
24122406
* an RDB file from disk, either at startup, or when an RDB was
24132407
* received from the master. In the latter case, the master is
24142408
* responsible for key expiry. If we would expire keys here, the
24152409
* snapshot taken by the master may not be reflected on the replica. */
2416-
robj keyobj;
2410+
redisObjectStack keyobj;
24172411
initStaticStringObject(keyobj,key);
24182412
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
24192413
if (fStaleMvccKey || fExpiredKey) {

src/server.h

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,16 @@ typedef struct RedisModuleDigest {
799799

800800
#define MVCC_MS_SHIFT 20
801801

802-
typedef struct redisObject {
802+
// This struct will be allocated ahead of the ROBJ when needed
803+
struct redisObjectExtended {
804+
uint64_t mvcc_tstamp;
805+
};
806+
807+
typedef class redisObject {
808+
protected:
809+
redisObject() {}
810+
811+
public:
803812
unsigned type:4;
804813
unsigned encoding:4;
805814
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
@@ -808,9 +817,6 @@ typedef struct redisObject {
808817
private:
809818
mutable std::atomic<unsigned> refcount {0};
810819
public:
811-
#ifdef ENABLE_MVCC
812-
uint64_t mvcc_tstamp;
813-
#endif
814820
void *m_ptr;
815821

816822
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
@@ -821,11 +827,18 @@ typedef struct redisObject {
821827
void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); }
822828
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); }
823829
} robj;
824-
#ifdef ENABLE_MVCC
825-
static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase");
826-
#else
827830
static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase");
828-
#endif
831+
832+
class redisObjectStack : public redisObjectExtended, public redisObject
833+
{
834+
public:
835+
redisObjectStack();
836+
};
837+
838+
uint64_t mvccFromObj(robj_roptr o);
839+
void setMvccTstamp(redisObject *o, uint64_t mvcc);
840+
void *allocPtrFromObj(robj_roptr o);
841+
robj *objFromAllocPtr(void *pv);
829842

830843
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
831844
{

0 commit comments

Comments
 (0)