Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform incremental rollups instead of rollups at snapshot #4410

Merged
merged 17 commits into from
Jan 9, 2020
Next Next commit
best-effort / lossy on sender; batch on receiver; use sync.pool
  • Loading branch information
Paras Shah committed Nov 21, 2019
commit 31d62e4a2f4adafc9e76b5de5e34a633a25a09a2
29 changes: 29 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/hex"
"math"
"strconv"
"sync"
"sync/atomic"

"github.com/dgraph-io/badger"
Expand All @@ -33,6 +34,17 @@ import (
var (
// ErrTsTooOld is returned when a transaction is too old to be applied.
ErrTsTooOld = errors.Errorf("Transaction is too old")
// IncrRollupCh is populated with keys that needs to be rolled up during reads
IncrRollupCh chan *bytes.Buffer = make(chan *bytes.Buffer, 16)
// RollUpPool is the sync.Pool. Used mainly for performance.
RollUpPool = sync.Pool{
New: func() interface{} {
// The Pool's New function should generally only return pointer
// types, since a pointer can be put into the return interface
// value without an allocation:
return new(bytes.Buffer)
},
}
)

// ShouldAbort returns whether the transaction should be aborted.
Expand Down Expand Up @@ -145,6 +157,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l.key = key
l.mutationMap = make(map[uint64]*pb.PostingList)
l.plist = new(pb.PostingList)
start := it

// Iterates from highest Ts to lowest Ts
for it.Valid() {
Expand Down Expand Up @@ -186,6 +199,22 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
if err != nil {
return nil, err
}
if (it == start) {
// if while reading a posting list, the first item is a Delta Posting, then
// this key needs to go through rollbacks. Add this key to the rollbackChan.
// This channel is drained by the go routine which performs the rollbacks
k := RollUpPool.Get().(*bytes.Buffer)
k.Write(key)

// The send to the channel is Best-effort (lossy) so we dont block here.
// Hence, if channel is blocked or full, drop key and move on.
select {
case IncrRollupCh <- k:
default:
// return buffer back to Sync.pool
RollUpPool.Put(k)
}
}
case BitSchemaPosting:
return nil, errors.Errorf(
"Trying to read schema in ReadPostingList for key: %s", hex.Dump(key))
Expand Down
32 changes: 31 additions & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,35 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error {
return nil
}

func (n* node) handleIncrementalRollups() {
var batch [16] *bytes.Buffer
i := 0;
for k := range posting.IncrRollupCh {

batch[i] = k
i++

// Check if batch if full
if (i == 16) {
// process the batch
for j:=0 ; j<=15; j++ {
var key []byte
batch[j].Read(key)
// put the buffer back in the pool.
posting.RollUpPool.Put(batch[j])

// roll up key here
}

// reset the batch
i=0
}

}

// IncrRollupCh is closed. This should never happen.
}

func (n *node) processRollups() {
defer n.closer.Done() // CLOSER:1
tick := time.NewTicker(5 * time.Minute) // Rolling up once every 5 minutes seems alright.
Expand All @@ -419,7 +448,7 @@ func (n *node) processRollups() {
glog.Errorf("Error while rolling up lists at %d: %v\n", readTs, err)
} else {
last = readTs // Update last only if we succeeded.
glog.Infof("List rollup at Ts %d: OK.\n", readTs)
glog.Infof("Last rollup at Ts %d: OK.\n", readTs)
}
}
}
Expand Down Expand Up @@ -1419,6 +1448,7 @@ func (n *node) InitAndStartNode() {
go n.processRollups()
go n.processApplyCh()
go n.BatchAndSendMessages()
go n.handleIncrementalRollups()
go n.Run()
}

Expand Down