Skip to content

Commit

Permalink
Revert "Perform incremental rollups instead of rollups at snapshot (#…
Browse files Browse the repository at this point in the history
…4410)"

This reverts commit 42b2a60.

Incremental rollups PR is causing Jepsen failures. Reverting this PR for
now, until we figure out a fix for it.
  • Loading branch information
manishrjain committed Jan 12, 2020
1 parent 4833c85 commit 57474e0
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 142 deletions.
93 changes: 0 additions & 93 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,105 +21,20 @@ import (
"encoding/hex"
"math"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/pkg/errors"
)

// incrRollupi is used to batch keys for rollup incrementally.
type incrRollupi struct {
// keysCh is populated with batch of 64 keys that needs to be rolled up during reads
keysCh chan *[][]byte
// keysPool is sync.Pool to share the batched keys to rollup.
keysPool *sync.Pool
}

var (
// ErrTsTooOld is returned when a transaction is too old to be applied.
ErrTsTooOld = errors.Errorf("Transaction is too old")

// IncrRollup is used to batch keys for rollup incrementally.
IncrRollup = &incrRollupi{
keysCh: make(chan *[][]byte),
keysPool: &sync.Pool{
New: func() interface{} {
return new([][]byte)
},
},
}
)

// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger
func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
l, err := GetNoStore(key)
if err != nil {
return err
}

kvs, err := l.Rollup()
if err != nil {
return err
}

return writer.Write(&bpb.KVList{Kv: kvs})
}

func (ir *incrRollupi) addKeyToBatch(key []byte) {
batch := ir.keysPool.Get().(*[][]byte)
*batch = append(*batch, key)
if len(*batch) < 64 {
ir.keysPool.Put(batch)
return
}

select {
case ir.keysCh <- batch:
default:
// Drop keys and build the batch again. Lossy behavior.
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
}
}

// Process will rollup batches of 64 keys in a go routine.
func (ir *incrRollupi) Process() {
m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
writer := NewTxnWriter(pstore)

for batch := range ir.keysCh {
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem, ok := m[hash]; !ok || (currTs-elem >= 10) {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
continue
}
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)

// throttle to 1 batch = 64 rollups per 100 ms.
<-limiter.C
}
// keysCh is closed. This should never happen.
}

// ShouldAbort returns whether the transaction should be aborted.
func (txn *Txn) ShouldAbort() bool {
if txn == nil {
Expand Down Expand Up @@ -229,8 +144,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l := new(List)
l.key = key
l.plist = new(pb.PostingList)
const maxDeltaCount = 2
deltaCount := 0

// Iterates from highest Ts to lowest Ts
for it.Valid() {
Expand Down Expand Up @@ -275,7 +188,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
if err != nil {
return nil, err
}
deltaCount++
case BitSchemaPosting:
return nil, errors.Errorf(
"Trying to read schema in ReadPostingList for key: %s", hex.Dump(key))
Expand All @@ -288,11 +200,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}
it.Next()
}

if deltaCount >= maxDeltaCount {
IncrRollup.addKeyToBatch(key)
}

return l, nil
}

Expand Down
120 changes: 71 additions & 49 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,13 @@ func (n *node) processRollups() {
if readTs <= last {
break // Break out of the select case.
}
if err := n.calcTabletSizes(readTs); err != nil {
if err := n.rollupLists(readTs); err != nil {
// If we encounter error here, we don't need to do anything about
// it. Just let the user know.
glog.Errorf("Error while rolling up lists at %d: %v\n", readTs, err)
} else {
last = readTs // Update last only if we succeeded.
glog.Infof("Last rollup at Ts %d: OK.\n", readTs)
glog.Infof("List rollup at Ts %d: OK.\n", readTs)
}
}
}
Expand Down Expand Up @@ -1008,19 +1008,20 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
return &bpb.KVList{Kv: []*bpb.KV{kv}}
}

// calcTabletSizes updates the tablet sizes for the keys.
func (n *node) calcTabletSizes(readTs uint64) error {
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(readTs)

if !n.AmLeader() {
// Only leader needs to calculate the tablet sizes.
return nil
}
// rollupLists would consolidate all the deltas that constitute one posting
// list, and write back a complete posting list.
func (n *node) rollupLists(readTs uint64) error {
writer := posting.NewTxnWriter(pstore)

// We're doing rollups. We should use this opportunity to calculate the tablet sizes.
amLeader := n.AmLeader()
m := new(sync.Map)

addTo := func(key []byte, delta int64) {
if !amLeader {
// Only leader needs to calculate the tablet sizes.
return
}
pk, err := x.Parse(key)

// Type keys should not count for tablet size calculations.
Expand All @@ -1042,7 +1043,7 @@ func (n *node) calcTabletSizes(readTs uint64) error {
}

stream := pstore.NewStreamAt(readTs)
stream.LogPrefix = "Tablet Size Calculation"
stream.LogPrefix = "Rolling up"
stream.ChooseKey = func(item *badger.Item) bool {
switch item.UserMeta() {
case posting.BitSchemaPosting, posting.BitCompletePosting, posting.BitEmptyPosting:
Expand All @@ -1051,53 +1052,75 @@ func (n *node) calcTabletSizes(readTs uint64) error {
case x.ByteUnused:
return false
default:
// not doing rollups anymore.
return false
return true
}
}

var numKeys uint64
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
return nil, nil // no-op
l, err := posting.ReadPostingList(key, itr)
if err != nil {
return nil, err
}
atomic.AddUint64(&numKeys, 1)
kvs, err := l.Rollup()

// If there are multiple keys, the posting list was split into multiple
// parts. The key of the first part is the right key to use for tablet
// size calculations.
for _, kv := range kvs {
addTo(kvs[0].Key, int64(kv.Size()))
}

return &bpb.KVList{Kv: kvs}, err
}
stream.Send = func(list *bpb.KVList) error {
return nil
return writer.Write(list)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return err
}
if err := writer.Flush(); err != nil {
return err
}
// For all the keys, let's see if they're in the LRU cache. If so, we can roll them up.
glog.Infof("Rolled up %d keys. Done", atomic.LoadUint64(&numKeys))

// Only leader sends the tablet size updates to Zero. No one else does.
// doSendMembership is also being concurrently called from another goroutine.
go func() {
tablets := make(map[string]*pb.Tablet)
var total int64
m.Range(func(key, val interface{}) bool {
pred := key.(string)
size := atomic.LoadInt64(val.(*int64))
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
}
total += size
return true
})
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
}
}()
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(readTs)

if amLeader {
// Only leader sends the tablet size updates to Zero. No one else does.
// doSendMembership is also being concurrently called from another goroutine.
go func() {
tablets := make(map[string]*pb.Tablet)
var total int64
m.Range(func(key, val interface{}) bool {
pred := key.(string)
size := atomic.LoadInt64(val.(*int64))
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
}
total += size
return true
})
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
}
}()
}
return nil
}

Expand Down Expand Up @@ -1432,7 +1455,6 @@ func (n *node) InitAndStartNode() {
go n.processRollups()
go n.processApplyCh()
go n.BatchAndSendMessages()
go posting.IncrRollup.Process()
go n.Run()
}

Expand Down

0 comments on commit 57474e0

Please sign in to comment.