Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(cache): Add a sharded map for global cache #9180

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,7 @@ func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response,
EncodingNs: uint64(l.Json.Nanoseconds()),
TotalNs: uint64((time.Since(l.Start)).Nanoseconds()),
}
//fmt.Println("====Query Resp", qc.req.Query, qc.req.StartTs, qc.req, string(resp.Json))
return resp, gqlErrs
}

Expand Down
1 change: 1 addition & 0 deletions posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32,
}

txn.Update()
txn.UpdateCachedKeys(commitTs)
writer := NewTxnWriter(pstore)
require.NoError(t, txn.CommitToDisk(writer, commitTs))
require.NoError(t, writer.Flush())
Expand Down
2 changes: 2 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,8 @@ func TestReadSingleValue(t *testing.T) {
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
// Delete item from global cache before reading, as we are not updating the cache in the test
globalCache.Del(z.MemHash(key))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
Expand Down
4 changes: 3 additions & 1 deletion posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (vc *viLocalCache) GetWithLockHeld(key []byte) (rval index.Value, rerr erro
func (vc *viLocalCache) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) {
value := pl.findStaticValue(vc.delegate.startTs)

if value == nil {
if value == nil || len(value.Postings) == 0 {
return nil, ErrNoValue
}

Expand Down Expand Up @@ -395,6 +395,8 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
}
}
pl.Postings = pl.Postings[:idx]
//pk, _ := x.Parse([]byte(key))
//fmt.Println("====Getting single posting", lc.startTs, pk, pl.Postings)
return pl, nil
}

Expand Down
218 changes: 191 additions & 27 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import (
"bytes"
"encoding/hex"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -85,7 +85,8 @@
priorityKeys: make([]*pooledKeys, 2),
}

globalCache = &GlobalCache{items: make(map[string]*CachePL, 100)}
globalCache = newShardedMap()
numShards = 256
)

func init() {
Expand Down Expand Up @@ -134,13 +135,9 @@
}

RemoveCacheFor(key)

globalCache.Lock()
val, ok := globalCache.items[string(key)]
if ok {
val.list = nil
}
globalCache.Unlock()
//pk, _ := x.Parse(key)
//fmt.Println("====Setting cache delete rollup", ts, pk)
globalCache.Del(z.MemHash(key))
// TODO Update cache with rolled up results
// If we do a rollup, we typically won't need to update the key in cache.
// The only caveat is that the key written by rollup would be written at +1
Expand Down Expand Up @@ -192,6 +189,8 @@
defer cleanupTick.Stop()
forceRollupTick := time.NewTicker(500 * time.Millisecond)
defer forceRollupTick.Stop()
deleteCacheTick := time.NewTicker(10 * time.Second)
defer deleteCacheTick.Stop()

doRollup := func(batch *[][]byte, priority int) {
currTs := time.Now().Unix()
Expand Down Expand Up @@ -222,6 +221,8 @@
delete(m, hash)
}
}
case <-deleteCacheTick.C:
globalCache.deleteOldItems(ir.getNewTs(false))
case <-forceRollupTick.C:
batch := ir.priorityKeys[0].keysPool.Get().(*[][]byte)
if len(*batch) > 0 {
Expand Down Expand Up @@ -349,9 +350,7 @@
if lCache != nil {
lCache.Clear()
}
globalCache.Lock()
globalCache.items = make(map[string]*CachePL)
globalCache.Unlock()
globalCache.Clear()
}

// RemoveCacheFor will delete the list corresponding to the given key.
Expand All @@ -362,6 +361,149 @@
}
}

type shardedMap struct {
shards []*lockedMap
}

func newShardedMap() *shardedMap {
sm := &shardedMap{
shards: make([]*lockedMap, numShards),
}
for i := range sm.shards {
sm.shards[i] = newLockedMap()
}
return sm
}

func (sm *shardedMap) Get(key uint64) (*CachePL, bool) {
return sm.shards[key%uint64(numShards)].Get(key)

Check failure on line 379 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion int -> uint64 (gosec)
}

func (sm *shardedMap) get(key uint64) (*CachePL, bool) {
return sm.shards[key%uint64(numShards)].get(key)

Check failure on line 383 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion int -> uint64 (gosec)
}

func (sm *shardedMap) set(key uint64, i *CachePL) {
if i == nil {
// If item is nil make this Set a no-op.
return
}

sm.shards[key%uint64(numShards)].set(key, i)

Check failure on line 392 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion int -> uint64 (gosec)
}

func (sm *shardedMap) Set(key uint64, i *CachePL) {
if i == nil {
// If item is nil make this Set a no-op.
return
}

sm.shards[key%uint64(numShards)].Set(key, i)
}

func (sm *shardedMap) del(key uint64) {

Check failure on line 404 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

func `(*shardedMap).del` is unused (unused)
sm.shards[key%uint64(numShards)].del(key)
}

func (sm *shardedMap) Del(key uint64) {
sm.shards[key%uint64(numShards)].Del(key)
}

func (sm *shardedMap) UnlockKey(key uint64) {
sm.shards[key%uint64(numShards)].Unlock()
}

func (sm *shardedMap) LockKey(key uint64) {
sm.shards[key%uint64(numShards)].Lock()
}

func (sm *shardedMap) RLockKey(key uint64) {
sm.shards[key%uint64(numShards)].RLock()
}

func (sm *shardedMap) RUnlockKey(key uint64) {
sm.shards[key%uint64(numShards)].RUnlock()
}

func (sm *shardedMap) Clear() {
for i := 0; i < numShards; i++ {
sm.shards[i].Clear()
}
}

func (sm *shardedMap) deleteOldItems(ts uint64) {
fmt.Println("Deleting old items")
defer func() {
fmt.Println("Done deleting old items")
}()
for i := 0; i < numShards; i++ {
sm.shards[i].Lock()
defer sm.shards[i].Unlock()

for keyHash, pl := range sm.shards[i].data {
if pl.lastUpdate < ts-100 {
delete(sm.shards[i].data, keyHash)
}
}
}
}

type lockedMap struct {
sync.RWMutex
data map[uint64]*CachePL
}

func newLockedMap() *lockedMap {
return &lockedMap{
data: make(map[uint64]*CachePL),
}
}

func (m *lockedMap) get(key uint64) (*CachePL, bool) {
item, ok := m.data[key]
return item, ok
}

func (m *lockedMap) Get(key uint64) (*CachePL, bool) {
m.RLock()
defer m.RUnlock()
item, ok := m.data[key]
return item, ok
}

func (m *lockedMap) set(key uint64, i *CachePL) {
if i == nil {
// If the item is nil make this Set a no-op.
return
}

m.data[key] = i
}

func (m *lockedMap) Set(key uint64, i *CachePL) {
m.Lock()
defer m.Unlock()
m.set(key, i)
}

func (m *lockedMap) del(key uint64) {

Check failure on line 489 in posting/mvcc.go

View workflow job for this annotation

GitHub Actions / lint

func `(*lockedMap).del` is unused (unused)
delete(m.data, key)
}

func (m *lockedMap) Del(key uint64) {
m.Lock()
if l, ok := m.data[key]; ok && l != nil {
l.list = nil
}
m.Unlock()
}

func (m *lockedMap) Clear() {
m.Lock()
m.data = make(map[uint64]*CachePL)
m.Unlock()
}

func NewCachePL() *CachePL {
return &CachePL{
count: 0,
Expand All @@ -382,19 +524,28 @@
if !ShouldGoInCache(pk) {
continue
}
globalCache.Lock()
val, ok := globalCache.items[key]
keyHash := z.MemHash([]byte(key))
// TODO under the same lock
globalCache.LockKey(keyHash)
val, ok := globalCache.get(keyHash)
if !ok {
val = NewCachePL()
val.lastUpdate = commitTs
globalCache.items[key] = val
globalCache.set(keyHash, val)
}
if commitTs != 0 {
// TODO Delete this if the values are too old in an async thread
val.lastUpdate = commitTs
}

if commitTs != 0 {
p := new(pb.PostingList)
x.Check(p.Unmarshal(delta))
//fmt.Println("====Committing ", txn.StartTs, commitTs, pk, p)
}

if !ok {
globalCache.Unlock()
globalCache.UnlockKey(keyHash)
continue
}

Expand All @@ -404,8 +555,10 @@
p := new(pb.PostingList)
x.Check(p.Unmarshal(delta))
val.list.setMutationAfterCommit(txn.StartTs, commitTs, delta)
//fmt.Println("====Setting cache list", commitTs, pk, p, val.list.mutationMap)
}
globalCache.Unlock()

globalCache.UnlockKey(keyHash)
}
}

Expand Down Expand Up @@ -548,7 +701,7 @@
}

func ShouldGoInCache(pk x.ParsedKey) bool {
return (!pk.IsData() && strings.HasSuffix(pk.Attr, "dgraph.type"))
return true
}

func PostingListCacheEnabled() bool {
Expand Down Expand Up @@ -583,13 +736,16 @@
}

pk, _ := x.Parse(key)
keyHash := z.MemHash(key)

if ShouldGoInCache(pk) {
globalCache.Lock()
cacheItem, ok := globalCache.items[string(key)]
globalCache.LockKey(keyHash)
cacheItem, ok := globalCache.get(keyHash)
if !ok {
cacheItem = NewCachePL()
globalCache.items[string(key)] = cacheItem
// TODO see if this is reuqired
//fmt.Println("====Setting empty cache", readTs, pk)
globalCache.set(keyHash, cacheItem)
}
cacheItem.count += 1

Expand All @@ -601,11 +757,14 @@
cacheItem.list.RLock()
lCopy := copyList(cacheItem.list)
cacheItem.list.RUnlock()
globalCache.Unlock()
globalCache.UnlockKey(keyHash)
//allV, _ := lCopy.AllValues(readTs)
//uids, _ := lCopy.Uids(ListOptions{ReadTs: readTs})
//fmt.Println("====Getting cache", readTs, pk, lCopy.mutationMap, allV, uids)
return lCopy, nil
}
}
globalCache.Unlock()
globalCache.UnlockKey(keyHash)
}

txn := pstore.NewTransactionAt(readTs, false)
Expand All @@ -628,20 +787,25 @@
// the latest version of the PL. We also check that we're reading a version
// from Badger, which is higher than the write registered by the cache.
if ShouldGoInCache(pk) {
globalCache.Lock()
globalCache.LockKey(keyHash)
l.RLock()
cacheItem, ok := globalCache.items[string(key)]
// TODO fix Get and Set to be under one lock
cacheItem, ok := globalCache.get(keyHash)
if !ok {
cacheItemNew := NewCachePL()
cacheItemNew.count = 1
cacheItemNew.list = copyList(l)
cacheItemNew.lastUpdate = l.maxTs
globalCache.items[string(key)] = cacheItemNew
globalCache.set(keyHash, cacheItemNew)
} else {
//fmt.Println("====Setting cache", readTs, pk, l.mutationMap)
cacheItem.Set(copyList(l), readTs)
}
l.RUnlock()
globalCache.Unlock()
//allV, _ := l.AllValues(readTs)
//uids, _ := l.Uids(ListOptions{ReadTs: readTs})
//fmt.Println("====Getting from disk", readTs, pk, l.mutationMap, allV, uids)
globalCache.UnlockKey(keyHash)
}

if PostingListCacheEnabled() {
Expand Down
3 changes: 3 additions & 0 deletions posting/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func TestPostingListRead(t *testing.T) {
writer := NewTxnWriter(pstore)
require.NoError(t, writer.SetAt(key, []byte{}, BitEmptyPosting, 6))
require.NoError(t, writer.Flush())
// Delete the key from cache as we have just updated it
globalCache.Del(z.MemHash(key))
assertLength(7, 0)

addEdgeToUID(t, attr, 1, 4, 7, 8)
Expand All @@ -165,6 +167,7 @@ func TestPostingListRead(t *testing.T) {
writer = NewTxnWriter(pstore)
require.NoError(t, writer.SetAt(key, data, BitCompletePosting, 10))
require.NoError(t, writer.Flush())
globalCache.Del(z.MemHash(key))
assertLength(10, 0)

addEdgeToUID(t, attr, 1, 5, 11, 12)
Expand Down
Loading
Loading