Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform incremental rollups instead of rollups at snapshot #4410

Merged
merged 17 commits into from
Jan 9, 2020
93 changes: 93 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,105 @@ import (
"encoding/hex"
"math"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
)

// incrRollupi is used to batch keys for rollup incrementally.
type incrRollupi struct {
// keysCh is populated with batch of 64 keys that needs to be rolled up during reads
keysCh chan *[][]byte
// keysPool is sync.Pool to share the batched keys to rollup.
keysPool *sync.Pool
}

var (
// ErrTsTooOld is returned when a transaction is too old to be applied.
ErrTsTooOld = errors.Errorf("Transaction is too old")

// IncrRollup is used to batch keys for rollup incrementally.
IncrRollup = &incrRollupi{
keysCh: make(chan *[][]byte),
keysPool: &sync.Pool{
New: func() interface{} {
return new([][]byte)
},
},
}
)

// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger
func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
l, err := GetNoStore(key)
if err != nil {
return err
}

kvs, err := l.Rollup()
if err != nil {
return err
}

return writer.Write(&bpb.KVList{Kv: kvs})
}

func (ir *incrRollupi) addKeyToBatch(key []byte) {
batch := ir.keysPool.Get().(*[][]byte)
*batch = append(*batch, key)
if len(*batch) < 64 {
ir.keysPool.Put(batch)
return
}

select {
case ir.keysCh <- batch:
default:
// Drop keys and build the batch again. Lossy behavior.
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
}
}

// Process will rollup batches of 64 keys in a go routine.
func (ir *incrRollupi) Process() {
m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
writer := NewTxnWriter(pstore)

for batch := range ir.keysCh {
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem, ok := m[hash]; !ok || (currTs-elem >= 10) {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
continue
}
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)

// throttle to 1 batch = 64 rollups per 100 ms.
<-limiter.C
}
// keysCh is closed. This should never happen.
}

// ShouldAbort returns whether the transaction should be aborted.
func (txn *Txn) ShouldAbort() bool {
if txn == nil {
Expand Down Expand Up @@ -144,6 +229,8 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l := new(List)
l.key = key
l.plist = new(pb.PostingList)
const maxDeltaCount = 2
deltaCount := 0

// Iterates from highest Ts to lowest Ts
for it.Valid() {
Expand Down Expand Up @@ -188,6 +275,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
if err != nil {
return nil, err
}
deltaCount++
case BitSchemaPosting:
return nil, errors.Errorf(
"Trying to read schema in ReadPostingList for key: %s", hex.Dump(key))
Expand All @@ -200,6 +288,11 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}
it.Next()
}

if deltaCount >= maxDeltaCount {
IncrRollup.addKeyToBatch(key)
}

return l, nil
}

Expand Down
109 changes: 46 additions & 63 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (n *node) processRollups() {
glog.Errorf("Error while rolling up lists at %d: %v\n", readTs, err)
} else {
last = readTs // Update last only if we succeeded.
glog.Infof("List rollup at Ts %d: OK.\n", readTs)
glog.Infof("Last rollup at Ts %d: OK.\n", readTs)
}
}
}
Expand Down Expand Up @@ -1011,17 +1011,18 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
// rollupLists would consolidate all the deltas that constitute one posting
// list, and write back a complete posting list.
func (n *node) rollupLists(readTs uint64) error {
writer := posting.NewTxnWriter(pstore)
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(readTs)

// We're doing rollups. We should use this opportunity to calculate the tablet sizes.
amLeader := n.AmLeader()
if !n.AmLeader() {
// Only leader needs to calculate the tablet sizes.
return nil
}

m := new(sync.Map)

addTo := func(key []byte, delta int64) {
if !amLeader {
// Only leader needs to calculate the tablet sizes.
return
}
pk, err := x.Parse(key)
if err != nil {
glog.Errorf("Error while parsing key %s: %v", hex.Dump(key), err)
Expand All @@ -1037,7 +1038,7 @@ func (n *node) rollupLists(readTs uint64) error {
}

stream := pstore.NewStreamAt(readTs)
stream.LogPrefix = "Rolling up"
stream.LogPrefix = "Tablet Size Calculation"
stream.ChooseKey = func(item *badger.Item) bool {
switch item.UserMeta() {
case posting.BitSchemaPosting, posting.BitCompletePosting, posting.BitEmptyPosting:
Expand All @@ -1046,75 +1047,56 @@ func (n *node) rollupLists(readTs uint64) error {
case x.ByteUnused:
return false
default:
return true
// not doing rollups anymore.
return false
}
}
var numKeys uint64
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return nil, err
}
atomic.AddUint64(&numKeys, 1)
kvs, err := l.Rollup()

// If there are multiple keys, the posting list was split into multiple
// parts. The key of the first part is the right key to use for tablet
// size calculations.
for _, kv := range kvs {
addTo(kvs[0].Key, int64(kv.Size()))
}

return &bpb.KVList{Kv: kvs}, err
return nil, nil // no-op
}
stream.Send = func(list *bpb.KVList) error {
return writer.Write(list)
return nil
}
if err := stream.Orchestrate(context.Background()); err != nil {
return err
}
if err := writer.Flush(); err != nil {
return err
}

// For all the keys, let's see if they're in the LRU cache. If so, we can roll them up.
glog.Infof("Rolled up %d keys. Done", atomic.LoadUint64(&numKeys))

// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(readTs)

if amLeader {
// Only leader sends the tablet size updates to Zero. No one else does.
// doSendMembership is also being concurrently called from another goroutine.
go func() {
tablets := make(map[string]*pb.Tablet)
var total int64
m.Range(func(key, val interface{}) bool {
pred := key.(string)
size := atomic.LoadInt64(val.(*int64))
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
}
total += size
return true
})
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
// Only leader sends the tablet size updates to Zero. No one else does.
// doSendMembership is also being concurrently called from another goroutine.
go func() {
tablets := make(map[string]*pb.Tablet)
var total int64
m.Range(func(key, val interface{}) bool {
pred := key.(string)
size := atomic.LoadInt64(val.(*int64))
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
}
}()
}
total += size
return true
})
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
}
}()

return nil
}

Expand Down Expand Up @@ -1449,6 +1431,7 @@ func (n *node) InitAndStartNode() {
go n.processRollups()
go n.processApplyCh()
go n.BatchAndSendMessages()
go posting.IncrRollup.Process()
go n.Run()
}

Expand Down