Skip to content

Commit

Permalink
Perform incremental rollups instead of rollups at snapshot (#4410)
Browse files Browse the repository at this point in the history
Perform incremental rollups instead of rollups at snapshot
  • Loading branch information
parasssh authored Jan 9, 2020
1 parent 04eafb1 commit 42b2a60
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 71 deletions.
93 changes: 93 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,105 @@ import (
"encoding/hex"
"math"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
)

// incrRollupi is used to batch keys for rollup incrementally.
type incrRollupi struct {
// keysCh is populated with batch of 64 keys that needs to be rolled up during reads
keysCh chan *[][]byte
// keysPool is sync.Pool to share the batched keys to rollup.
keysPool *sync.Pool
}

var (
// ErrTsTooOld is returned when a transaction is too old to be applied.
ErrTsTooOld = errors.Errorf("Transaction is too old")

// IncrRollup is used to batch keys for rollup incrementally.
IncrRollup = &incrRollupi{
keysCh: make(chan *[][]byte),
keysPool: &sync.Pool{
New: func() interface{} {
return new([][]byte)
},
},
}
)

// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger
func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
l, err := GetNoStore(key)
if err != nil {
return err
}

kvs, err := l.Rollup()
if err != nil {
return err
}

return writer.Write(&bpb.KVList{Kv: kvs})
}

func (ir *incrRollupi) addKeyToBatch(key []byte) {
batch := ir.keysPool.Get().(*[][]byte)
*batch = append(*batch, key)
if len(*batch) < 64 {
ir.keysPool.Put(batch)
return
}

select {
case ir.keysCh <- batch:
default:
// Drop keys and build the batch again. Lossy behavior.
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
}
}

// Process will rollup batches of 64 keys in a go routine.
func (ir *incrRollupi) Process() {
m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
writer := NewTxnWriter(pstore)

for batch := range ir.keysCh {
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem, ok := m[hash]; !ok || (currTs-elem >= 10) {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
continue
}
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)

// throttle to 1 batch = 64 rollups per 100 ms.
<-limiter.C
}
// keysCh is closed. This should never happen.
}

// ShouldAbort returns whether the transaction should be aborted.
func (txn *Txn) ShouldAbort() bool {
if txn == nil {
Expand Down Expand Up @@ -144,6 +229,8 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l := new(List)
l.key = key
l.plist = new(pb.PostingList)
const maxDeltaCount = 2
deltaCount := 0

// Iterates from highest Ts to lowest Ts
for it.Valid() {
Expand Down Expand Up @@ -188,6 +275,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
if err != nil {
return nil, err
}
deltaCount++
case BitSchemaPosting:
return nil, errors.Errorf(
"Trying to read schema in ReadPostingList for key: %s", hex.Dump(key))
Expand All @@ -200,6 +288,11 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}
it.Next()
}

if deltaCount >= maxDeltaCount {
IncrRollup.addKeyToBatch(key)
}

return l, nil
}

Expand Down
120 changes: 49 additions & 71 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,13 @@ func (n *node) processRollups() {
if readTs <= last {
break // Break out of the select case.
}
if err := n.rollupLists(readTs); err != nil {
if err := n.calcTabletSizes(readTs); err != nil {
// If we encounter error here, we don't need to do anything about
// it. Just let the user know.
glog.Errorf("Error while rolling up lists at %d: %v\n", readTs, err)
} else {
last = readTs // Update last only if we succeeded.
glog.Infof("List rollup at Ts %d: OK.\n", readTs)
glog.Infof("Last rollup at Ts %d: OK.\n", readTs)
}
}
}
Expand Down Expand Up @@ -1008,20 +1008,19 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
return &bpb.KVList{Kv: []*bpb.KV{kv}}
}

// rollupLists would consolidate all the deltas that constitute one posting
// list, and write back a complete posting list.
func (n *node) rollupLists(readTs uint64) error {
writer := posting.NewTxnWriter(pstore)
// calcTabletSizes updates the tablet sizes for the keys.
func (n *node) calcTabletSizes(readTs uint64) error {
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(readTs)

if !n.AmLeader() {
// Only leader needs to calculate the tablet sizes.
return nil
}

// We're doing rollups. We should use this opportunity to calculate the tablet sizes.
amLeader := n.AmLeader()
m := new(sync.Map)

addTo := func(key []byte, delta int64) {
if !amLeader {
// Only leader needs to calculate the tablet sizes.
return
}
pk, err := x.Parse(key)

// Type keys should not count for tablet size calculations.
Expand All @@ -1043,7 +1042,7 @@ func (n *node) rollupLists(readTs uint64) error {
}

stream := pstore.NewStreamAt(readTs)
stream.LogPrefix = "Rolling up"
stream.LogPrefix = "Tablet Size Calculation"
stream.ChooseKey = func(item *badger.Item) bool {
switch item.UserMeta() {
case posting.BitSchemaPosting, posting.BitCompletePosting, posting.BitEmptyPosting:
Expand All @@ -1052,75 +1051,53 @@ func (n *node) rollupLists(readTs uint64) error {
case x.ByteUnused:
return false
default:
return true
// not doing rollups anymore.
return false
}
}
var numKeys uint64
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return nil, err
}
atomic.AddUint64(&numKeys, 1)
kvs, err := l.Rollup()

// If there are multiple keys, the posting list was split into multiple
// parts. The key of the first part is the right key to use for tablet
// size calculations.
for _, kv := range kvs {
addTo(kvs[0].Key, int64(kv.Size()))
}

return &bpb.KVList{Kv: kvs}, err
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
return nil, nil // no-op
}
stream.Send = func(list *bpb.KVList) error {
return writer.Write(list)
return nil
}
if err := stream.Orchestrate(context.Background()); err != nil {
return err
}
if err := writer.Flush(); err != nil {
return err
}
// For all the keys, let's see if they're in the LRU cache. If so, we can roll them up.
glog.Infof("Rolled up %d keys. Done", atomic.LoadUint64(&numKeys))

// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(readTs)

if amLeader {
// Only leader sends the tablet size updates to Zero. No one else does.
// doSendMembership is also being concurrently called from another goroutine.
go func() {
tablets := make(map[string]*pb.Tablet)
var total int64
m.Range(func(key, val interface{}) bool {
pred := key.(string)
size := atomic.LoadInt64(val.(*int64))
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
}
total += size
return true
})
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
// Only leader sends the tablet size updates to Zero. No one else does.
// doSendMembership is also being concurrently called from another goroutine.
go func() {
tablets := make(map[string]*pb.Tablet)
var total int64
m.Range(func(key, val interface{}) bool {
pred := key.(string)
size := atomic.LoadInt64(val.(*int64))
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
}
}()
}
total += size
return true
})
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
}
}()

return nil
}

Expand Down Expand Up @@ -1455,6 +1432,7 @@ func (n *node) InitAndStartNode() {
go n.processRollups()
go n.processApplyCh()
go n.BatchAndSendMessages()
go posting.IncrRollup.Process()
go n.Run()
}

Expand Down

0 comments on commit 42b2a60

Please sign in to comment.