Skip to content

Commit

Permalink
Incremental Rollup and Tablet Size Calculation (#4972)
Browse files Browse the repository at this point in the history
  • Loading branch information
parasssh authored Mar 19, 2020
1 parent cbbdee8 commit dd0728f
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 189 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200316175624-91c31ebe8c22
github.com/dgraph-io/dgo/v2 v2.2.1-0.20200319183917-53c7d5bc32a7
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgryski/go-farm v0.0.0-20191112170834-c2139c5d712b
github.com/dgryski/go-groupvarint v0.0.0-20190318181831-5ce5df8ca4e1
Expand Down
107 changes: 107 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,121 @@ import (
"encoding/hex"
"math"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
)

// incrRollupi is used to batch keys for rollup incrementally.
type incrRollupi struct {
// keysCh is populated with batch of 64 keys that needs to be rolled up during reads
keysCh chan *[][]byte
// keysPool is sync.Pool to share the batched keys to rollup.
keysPool *sync.Pool
count uint64
}

var (
// ErrTsTooOld is returned when a transaction is too old to be applied.
ErrTsTooOld = errors.Errorf("Transaction is too old")
// ErrInvalidKey is returned when trying to read a posting list using
// an invalid key (e.g the key to a single part of a larger multi-part list).
ErrInvalidKey = errors.Errorf("cannot read posting list using multi-part list key")

// IncrRollup is used to batch keys for rollup incrementally.
IncrRollup = &incrRollupi{
keysCh: make(chan *[][]byte),
keysPool: &sync.Pool{
New: func() interface{} {
return new([][]byte)
},
},
}
)

// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger
func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
l, err := GetNoStore(key)
if err != nil {
return err
}

kvs, err := l.Rollup()
if err != nil {
return err
}
const N = uint64(1000)
if glog.V(2) {
if count := atomic.AddUint64(&ir.count, 1); count%N == 0 {
glog.V(2).Infof("Rolled up %d keys", count)
}
}
return writer.Write(&bpb.KVList{Kv: kvs})
}

func (ir *incrRollupi) addKeyToBatch(key []byte) {
batch := ir.keysPool.Get().(*[][]byte)
*batch = append(*batch, key)
if len(*batch) < 64 {
ir.keysPool.Put(batch)
return
}

select {
case ir.keysCh <- batch:
default:
// Drop keys and build the batch again. Lossy behavior.
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
}
}

// Process will rollup batches of 64 keys in a go routine.
func (ir *incrRollupi) Process(closer *y.Closer) {
defer closer.Done()

writer := NewTxnWriter(pstore)
defer writer.Flush()

m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-closer.HasBeenClosed():
return
case batch := <-ir.keysCh:
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem := m[hash]; currTs-elem >= 10 {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
}
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)

// throttle to 1 batch = 64 rollups per 100 ms.
<-limiter.C
}
}
}

// ShouldAbort returns whether the transaction should be aborted.
func (txn *Txn) ShouldAbort() bool {
if txn == nil {
Expand Down Expand Up @@ -166,6 +264,14 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l.key = key
l.plist = new(pb.PostingList)

// We use the following block of code to trigger incremental rollup on this key.
deltaCount := 0
defer func() {
if deltaCount > 0 {
IncrRollup.addKeyToBatch(key)
}
}()

// Iterates from highest Ts to lowest Ts
for it.Valid() {
item := it.Item()
Expand Down Expand Up @@ -210,6 +316,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
if err != nil {
return nil, err
}
deltaCount++
case BitSchemaPosting:
return nil, errors.Errorf(
"Trying to read schema in ReadPostingList for key: %s", hex.Dump(key))
Expand Down
Loading

0 comments on commit dd0728f

Please sign in to comment.