Skip to content

perf(query): Batch multiple get calls to badger while querying in handleValuePostings #8999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/IBM/sarama v1.45.1
github.com/Masterminds/semver/v3 v3.3.1
github.com/blevesearch/bleve/v2 v2.5.1
github.com/dgraph-io/badger/v4 v4.7.0
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e
github.com/dgraph-io/dgo/v250 v250.0.0-preview4
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v4 v4.7.0 h1:Q+J8HApYAY7UMpL8d9owqiB+odzEc0zn/aqOD9jhc6Y=
github.com/dgraph-io/badger/v4 v4.7.0/go.mod h1:He7TzG3YBy3j4f5baj5B7Zl2XyfNe5bl4Udl0aPemVA=
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e h1:sZmnvDqloFjehWjr6f/G5O8ANbhenwSYdkGxkTR2Bww=
github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e/go.mod h1:aSwx/bXKT3/WRl9rn2BrTU+tfRQlFPKlOsqRTdcpHB8=
github.com/dgraph-io/dgo/v250 v250.0.0-preview4 h1:DkS6iFI6RwStXRzQxT5v8b6NLqqHQi0xKSK6FvcEwYo=
github.com/dgraph-io/dgo/v250 v250.0.0-preview4/go.mod h1:6nnKW4tYiai9xI6NSCrxaBgUGG1YI/+KlY+Tc7smqXY=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
Expand Down
44 changes: 44 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ type MutableLayer struct {

// We also cache some things required for us to update currentEntries faster
currentUids map[uint64]int // Stores the uid to index mapping in the currentEntries posting list

// Cache for calculated UIDS
isUidsCalculated bool
calculatedUids []uint64
}

func newMutableLayer() *MutableLayer {
Expand All @@ -110,6 +114,8 @@ func newMutableLayer() *MutableLayer {
length: math.MaxInt,
committedUids: make(map[uint64]*pb.Posting),
committedUidsTime: math.MaxUint64,
isUidsCalculated: false,
calculatedUids: []uint64{},
}
}

Expand All @@ -136,6 +142,8 @@ func (mm *MutableLayer) clone() *MutableLayer {
length: mm.length,
lastEntry: mm.lastEntry,
committedUidsTime: mm.committedUidsTime,
isUidsCalculated: mm.isUidsCalculated,
calculatedUids: mm.calculatedUids,
}
}

Expand Down Expand Up @@ -1691,6 +1699,36 @@ func (l *List) ApproxLen() int {
return l.mutationMap.len() + codec.ApproxLen(l.plist.Pack)
}

func (l *List) calculateUids() error {
l.RLock()
if l.mutationMap == nil || l.mutationMap.isUidsCalculated {
l.RUnlock()
return nil
}
res := make([]uint64, 0, l.ApproxLen())

err := l.iterate(l.mutationMap.committedUidsTime, 0, func(p *pb.Posting) error {
if p.PostingType == pb.Posting_REF {
res = append(res, p.Uid)
}
return nil
})

l.RUnlock()

if err != nil {
return err
}

l.Lock()
defer l.Unlock()

l.mutationMap.calculatedUids = res
l.mutationMap.isUidsCalculated = true

return nil
}

// Uids returns the UIDs given some query params.
// We have to apply the filtering before applying (offset, count).
// WARNING: Calling this function just to get UIDs is expensive
Expand All @@ -1700,6 +1738,12 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
}

getUidList := func() (*pb.List, error, bool) {
if l.mutationMap != nil && l.mutationMap.isUidsCalculated {
l.RLock()
defer l.RUnlock()
out := &pb.List{Uids: l.mutationMap.calculatedUids}
return out, nil, opt.Intersect != nil
}
// Pre-assign length to make it faster.
l.RLock()
defer l.RUnlock()
Expand Down
91 changes: 72 additions & 19 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,32 +330,32 @@ func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) {
return pl, err
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
// This would return an error if there is some data in the local cache, but we couldn't read it.
getListFromLocalCache := func() (*pb.PostingList, error) {
lc.RLock()

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := proto.Unmarshal(delta, pl)
lc.RUnlock()
return pl, err
}
func (lc *LocalCache) GetSinglePostingFromLocalCache(key []byte) (*pb.PostingList, error) {
lc.RLock()

l := lc.plists[string(key)]
pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := proto.Unmarshal(delta, pl)
lc.RUnlock()
return pl, err
}

if l != nil {
return l.StaticValue(lc.startTs)
}
l := lc.plists[string(key)]
lc.RUnlock()

return nil, nil
if l != nil {
return l.StaticValue(lc.startTs)
}

return nil, nil
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
// This would return an error if there is some data in the local cache, but we couldn't read it.
getPostings := func() (*pb.PostingList, error) {
pl, err := getListFromLocalCache()
pl, err := lc.GetSinglePostingFromLocalCache(key)
// If both pl and err are empty, that means that there was no data in local cache, hence we should
// read the data from badger.
if pl != nil || err != nil {
Expand Down Expand Up @@ -388,6 +388,59 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
return pl, nil
}

func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, error) {
results := make([]*pb.PostingList, len(keys))
remaining_keys := make([][]byte, 0)
for i, key := range keys {
if pl, err := lc.GetSinglePostingFromLocalCache(key); pl != nil && err != nil {
results[i] = pl
} else {
remaining_keys = append(remaining_keys, key)
}
}

txn := pstore.NewTransactionAt(lc.startTs, false)
items, err := txn.GetBatch(remaining_keys)
if err != nil {
fmt.Println(err, keys)
return nil, err
}
idx := 0

for i := 0; i < len(results); i++ {
if results[i] != nil {
continue
}
pl := &pb.PostingList{}
err = items[idx].Value(func(val []byte) error {
if err := proto.Unmarshal(val, pl); err != nil {
return err
}
return nil
})
idx += 1
results[i] = pl
}

for i := 0; i < len(results); i++ {
pl := results[i]
idx := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
return nil, nil
}
if postings.Op != Del {
pl.Postings[idx] = postings
idx++
}
}
pl.Postings = pl.Postings[:idx]
results[i] = pl
}

return results, err
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
Expand Down
12 changes: 10 additions & 2 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
"strconv"
"sync"
Expand Down Expand Up @@ -415,7 +416,7 @@
ml.removeOnUpdate = removeOnUpdate
ml.statsHolder = NewStatsHolder()
if cacheSize > 0 {
cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{
cache, err := ristretto.NewCache(&ristretto.Config[[]byte, *CachePL]{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
Expand All @@ -436,9 +437,10 @@
for range ticker.C {
// Record the posting list cache hit ratio
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
fmt.Println("CACHE STATS: ", ml.cache.numCacheRead.Load(), ml.cache.numCacheReadFails.Load(), ml.cache.numCacheSave.Load())

if EnableDetailedMetrics {
x.NumPostingListCacheSave.M(ml.cache.numCacheRead.Load())
x.NumPostingListCacheSave.M(ml.cache.numCacheSave.Load())
x.NumPostingListCacheRead.M(ml.cache.numCacheRead.Load())
x.NumPostingListCacheReadFail.M(ml.cache.numCacheReadFails.Load())
}
Expand Down Expand Up @@ -605,8 +607,12 @@
}

l.minTs = item.Version()
if l.mutationMap == nil {
l.mutationMap = newMutableLayer()
}
// No need to do Next here. The outer loop can take care of skipping
// more versions of the same key.
l.calculateUids()

Check failure on line 615 in posting/mvcc.go

View check run for this annotation

Trunk.io / Trunk Check

golangci-lint2(errcheck)

[new] Error return value of `l.calculateUids` is not checked
return l, nil
case BitDeltaPosting:
err := item.Value(func(val []byte) error {
Expand Down Expand Up @@ -637,6 +643,8 @@
}
it.Next()
}

l.calculateUids()

Check failure on line 647 in posting/mvcc.go

View check run for this annotation

Trunk.io / Trunk Check

golangci-lint2(errcheck)

[new] Error return value of `l.calculateUids` is not checked
return l, nil
}

Expand Down
20 changes: 17 additions & 3 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
out := &pb.Result{}
outputs[start/width] = out

cache := make([]*pb.PostingList, 0)
for i := start; i < end; i++ {
select {
case <-ctx.Done():
Expand All @@ -442,9 +443,22 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored

if !getMultiplePosting {
pl, err := qs.cache.GetSinglePosting(key)
if err != nil {
return err
if len(cache) == 0 {
keys := make([][]byte, 10)
keys[0] = key
lastI := 0
for j := i + 1; j < i+10 && j < end; j++ {
keys[j-i] = x.DataKey(q.Attr, q.UidList.Uids[j])
lastI = j - i
}
cache, err = qs.cache.GetBatchSinglePosting(keys[:lastI+1])
if err != nil {
return err
}
}
pl := cache[0]
if len(cache) > 1 {
cache = cache[1:]
}
if pl == nil || len(pl.Postings) == 0 {
out.UidMatrix = append(out.UidMatrix, &pb.List{})
Expand Down
6 changes: 2 additions & 4 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,10 +909,8 @@ func DivideAndRule(num int) (numGo, width int) {
numGo, width = 64, 0
for ; numGo >= 1; numGo /= 2 {
widthF := math.Ceil(float64(num) / float64(numGo))
if numGo == 1 || widthF >= 256.0 {
width = int(widthF)
return
}
width = int(widthF)
return
}
return
}
Expand Down
Loading