Skip to content

Commit

Permalink
fix(core): add the posting list risteretto cache back (#9181)
Browse files Browse the repository at this point in the history
Reverting the posting list cache for now while we fix the new cache.
  • Loading branch information
harshil-goel authored Oct 4, 2024
1 parent 345c438 commit 5352e1a
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 5 deletions.
4 changes: 2 additions & 2 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ func (asuite *AclTestSuite) TestValQueryWithACLPermissions() {
n as name
a as age
}
q2(func: eq(val(n), "RandomGuy")) {
q2(func: eq(val(n), "RandomGuy"), orderasc: val(n)) {
val(n)
val(a)
}
Expand Down Expand Up @@ -1433,7 +1433,7 @@ func (asuite *AclTestSuite) TestValQueryWithACLPermissions() {
n as name
a as age
}
q2(func: uid(f), orderdesc: val(a)) {
q2(func: uid(f), orderasc: val(n)) {
name
val(n)
val(a)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
golang.org/x/term v0.24.0
golang.org/x/text v0.18.0
golang.org/x/tools v0.25.0
google.golang.org/grpc v1.67.1
google.golang.org/grpc v1.66.2
gopkg.in/yaml.v2 v2.4.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1032,8 +1032,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo=
google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
30 changes: 30 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ package posting

import (
"bytes"
"context"
"fmt"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/dgraph-io/dgraph/v24/protos/pb"
"github.com/dgraph-io/dgraph/v24/tok/index"
"github.com/dgraph-io/dgraph/v24/x"
"github.com/dgraph-io/ristretto"
"github.com/dgraph-io/ristretto/z"
ostats "go.opencensus.io/stats"
)

const (
Expand All @@ -36,6 +40,7 @@ const (
var (
pstore *badger.DB
closer *z.Closer
lCache *ristretto.Cache[[]byte, *List]
)

// Init initializes the posting lists package, the in memory and dirty list hash.
Expand All @@ -45,6 +50,31 @@ func Init(ps *badger.DB, cacheSize int64) {
go x.MonitorMemoryMetrics(closer)

// Initialize cache.
if cacheSize == 0 {
return
}

var err error
lCache, err = ristretto.NewCache[[]byte, *List](&ristretto.Config[[]byte, *List]{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
Cost: func(val *List) int64 {
return 0
},
})
x.Check(err)
go func() {
m := lCache.Metrics
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
// Record the posting list cache hit ratio
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
}
}()
}

func UpdateMaxCost(maxCost int64) {
Expand Down
44 changes: 44 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
return err
}

RemoveCacheFor(key)

globalCache.Lock()
val, ok := globalCache.items[string(key)]
if ok {
Expand Down Expand Up @@ -344,11 +346,22 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
}

func ResetCache() {
if lCache != nil {
lCache.Clear()
}
globalCache.Lock()
globalCache.items = make(map[string]*CachePL)
globalCache.Unlock()
}

// RemoveCacheFor will delete the list corresponding to the given key.
func RemoveCacheFor(key []byte) {
// TODO: investigate if this can be done by calling Set with a nil value.
if lCache != nil {
lCache.Del(key)
}
}

func NewCachePL() *CachePL {
return &CachePL{
count: 0,
Expand All @@ -364,6 +377,7 @@ func (txn *Txn) UpdateCachedKeys(commitTs uint64) {
}

for key, delta := range txn.cache.deltas {
RemoveCacheFor([]byte(key))
pk, _ := x.Parse([]byte(key))
if !ShouldGoInCache(pk) {
continue
Expand Down Expand Up @@ -537,7 +551,33 @@ func ShouldGoInCache(pk x.ParsedKey) bool {
return (!pk.IsData() && strings.HasSuffix(pk.Attr, "dgraph.type"))
}

func PostingListCacheEnabled() bool {
return lCache != nil
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if PostingListCacheEnabled() {
l, ok := lCache.Get(key)
if ok && l != nil {
// No need to clone the immutable layer or the key since mutations will not modify it.
lCopy := &List{
minTs: l.minTs,
maxTs: l.maxTs,
key: key,
plist: l.plist,
}
l.RLock()
if l.mutationMap != nil {
lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap))
for ts, pl := range l.mutationMap {
lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList)
}
}
l.RUnlock()
return lCopy, nil
}
}

if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}
Expand Down Expand Up @@ -604,5 +644,9 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
globalCache.Unlock()
}

if PostingListCacheEnabled() {
lCache.Set(key, l, 0)
}

return l, nil
}

0 comments on commit 5352e1a

Please sign in to comment.