Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Rollup and Tablet Size Calculation #4972

Merged
merged 25 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5f07dc9
Bring incremental rollup commit
parasssh Jan 9, 2020
75233b5
Incremental Rollups -- let's track versions of keys.
manishrjain Mar 16, 2020
76d2838
Merge master
manishrjain Mar 16, 2020
fe10d20
Build works with master merge
manishrjain Mar 16, 2020
527a082
Fix map ini
manishrjain Mar 17, 2020
50e7544
Some evidence to show it's not just writes, but write in the range of…
manishrjain Mar 17, 2020
700b54b
Some more tries to understand the behavior
manishrjain Mar 17, 2020
53b1ae5
Merge branch 'master' into mrjn/incremental
Mar 18, 2020
55de90c
Using Martin's fix.
manishrjain Mar 18, 2020
eabfe9a
Make rollup goroutine work with the task tracking
manishrjain Mar 18, 2020
3bbde8c
Cleaned up a bit of the log messages by using strings and converting …
manishrjain Mar 18, 2020
401d6a7
Merge branch 'master' into mrjn/incremental
Mar 18, 2020
7f2a9d7
Merge branch 'mrjn/incremental' of https://github.com/dgraph-io/dgrap…
Mar 18, 2020
1656b38
Fix an issue where we were only sending the keys for rollup which did…
manishrjain Mar 18, 2020
ace03ac
Switch tablet size calculation to use DB.Tables method.
manishrjain Mar 19, 2020
000727f
Ensure that when closing, all registered tasks are properly stopped.
manishrjain Mar 19, 2020
2743576
Revert unnecessary change in list.go
manishrjain Mar 19, 2020
ea1081b
Better handle if another task is going on.
manishrjain Mar 19, 2020
280c634
Call rollup even if we find one delta posting
manishrjain Mar 19, 2020
42fb5e8
Add a note about iterating over tables
manishrjain Mar 19, 2020
71d89dd
fix GoLang CI comments
Mar 19, 2020
a4a7c59
Merge branch 'mrjn/incremental' of https://github.com/dgraph-io/dgrap…
Mar 19, 2020
d4cea37
Revert CI changes done by Paras. Add comment about why.
manishrjain Mar 19, 2020
b4a07c0
Remove question, add comment
manishrjain Mar 19, 2020
50e5b72
Merge master
manishrjain Mar 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (l *List) SingleListRollup() (*bpb.KV, error) {
l.RLock()
defer l.RUnlock()

out, err := l.rollup(math.MaxUint64, true)
out, err := l.rollup(math.MaxUint64, false)
if err != nil {
return nil, errors.Wrapf(err, "failed when calling List.rollup")
}
Expand Down
23 changes: 15 additions & 8 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type incrRollupi struct {
keysCh chan *[][]byte
// keysPool is sync.Pool to share the batched keys to rollup.
keysPool *sync.Pool
count uint64
}

var (
Expand Down Expand Up @@ -73,6 +74,12 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
if err != nil {
return err
}
const N = uint64(1000)
if glog.V(2) {
if count := atomic.AddUint64(&ir.count, 1); count%N == 0 {
glog.V(2).Infof("Rolled up %d keys", count)
}
}
return writer.Write(&bpb.KVList{Kv: kvs})
}

Expand Down Expand Up @@ -110,13 +117,12 @@ func (ir *incrRollupi) Process(closer *y.Closer) {
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem, ok := m[hash]; !ok || (currTs-elem >= 10) {
if elem := m[hash]; currTs-elem >= 10 {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
continue
}
}
}
Expand Down Expand Up @@ -257,8 +263,14 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l := new(List)
l.key = key
l.plist = new(pb.PostingList)
const maxDeltaCount = 2

// We use the following block of code to trigger incremental rollup on this key.
deltaCount := 0
defer func() {
if deltaCount > 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we want to do this if deltaCount >= 2?

IncrRollup.addKeyToBatch(key)
}
}()

// Iterates from highest Ts to lowest Ts
for it.Valid() {
Expand Down Expand Up @@ -317,11 +329,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}
it.Next()
}

if deltaCount >= maxDeltaCount {
IncrRollup.addKeyToBatch(key)
}

return l, nil
}

Expand Down
171 changes: 74 additions & 97 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package worker
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
Expand All @@ -38,7 +36,6 @@ import (
"go.opencensus.io/tag"
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/conn"
Expand Down Expand Up @@ -83,7 +80,7 @@ func (id op) String() string {
case opIndexing:
return "opIndexing"
default:
return "unknown"
return "opUnknown"
}
}

Expand Down Expand Up @@ -114,7 +111,6 @@ func (n *node) startTask(id op) (*y.Closer, error) {
}

closer := y.NewCloser(1)

switch id {
case opRollup:
if len(n.ops) > 0 {
Expand All @@ -123,10 +119,12 @@ func (n *node) startTask(id op) (*y.Closer, error) {
go posting.IncrRollup.Process(closer)

case opSnapshot, opIndexing:
if roCloser, has := n.ops[opRollup]; has {
roCloser.SignalAndWait()
} else if len(n.ops) > 0 {
return nil, errors.Errorf("another operation is already running")
for otherId, otherCloser := range n.ops {
if otherId == opRollup {
otherCloser.SignalAndWait()
Copy link
Contributor Author

@parasssh parasssh Mar 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we pause rollup only when we know for sure there are no other operation. Let's say,
n.ops has opRollup and opIndexing. Then opRollup will be stopped but since there is also an opIndexing, we will return from line 126 having paused opRollup.

} else {
return nil, errors.Errorf("operation %s is already running", otherId)
}
}
default:
glog.Errorf("Got an unhandled operation %s. Ignoring...", id)
Expand All @@ -152,6 +150,23 @@ func (n *node) waitForTask(id op) {
closer.Wait()
}

func (n *node) stopAllTasks() {
defer n.closer.Done() // CLOSER:1
<-n.closer.HasBeenClosed()

var closers []*y.Closer
n.opsLock.Lock()
for _, closer := range n.ops {
closers = append(closers, closer)
}
n.opsLock.Unlock()

for _, closer := range closers {
closer.SignalAndWait()
}
glog.Infof("Stopped all ongoing registered tasks.")
}

// Now that we apply txn updates via Raft, waiting based on Txn timestamps is
// sufficient. We don't need to wait for proposals to be applied.

Expand All @@ -174,7 +189,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
// to maintain quorum health.
applyCh: make(chan []*pb.Proposal, 1000),
elog: trace.NewEventLog("Dgraph", "ApplyCh"),
closer: y.NewCloser(3), // Matches CLOSER:1
closer: y.NewCloser(4), // Matches CLOSER:1
ops: make(map[op]*y.Closer),
}
return n
Expand Down Expand Up @@ -526,19 +541,15 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error {

func (n *node) processTabletSizes() {
defer n.closer.Done() // CLOSER:1
tick := time.NewTicker(5 * time.Minute) // Rolling up once every 5 minutes seems alright.
tick := time.NewTicker(5 * time.Minute) // Once every 5 minutes seems alright.
defer tick.Stop()

for {
select {
case <-n.closer.HasBeenClosed():
return
case <-tick.C:
if err := n.calcTabletSizes(); err != nil {
// If we encounter error here, we don't need to do anything about
// it. Just let the user know.
glog.Errorf("Error while calculating tablet sizes: %v", err)
}
n.calculateTabletSizes()
}
}
}
Expand Down Expand Up @@ -1144,98 +1155,63 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
return &bpb.KVList{Kv: []*bpb.KV{kv}}
}

// calcTabletSizes updates the tablet sizes for the keys.
func (n *node) calcTabletSizes() error {
// calculateTabletSizes updates the tablet sizes for the keys.
func (n *node) calculateTabletSizes() {
if !n.AmLeader() {
// Only leader needs to calculate the tablet sizes.
return nil
// Only leader sends the tablet size updates to Zero. No one else does.
return
}
var total int64
tablets := make(map[string]*pb.Tablet)

m := new(sync.Map)

addTo := func(key []byte, delta int64) {
pk, err := x.Parse(key)

// Type keys should not count for tablet size calculations.
if pk.IsType() {
return
}

tableInfos := pstore.Tables(false)
glog.V(2).Infof("Calculating tablet sizes. Found %d tables\n", len(tableInfos))
for _, tinfo := range tableInfos {
left, err := x.Parse(tinfo.Left)
if err != nil {
glog.Errorf("Error while parsing key %s: %v", hex.Dump(key), err)
return
glog.V(2).Infof("Unable to parse key: %v", err)
continue
}
val, ok := m.Load(pk.Attr)
if !ok {
sz := new(int64)
val, _ = m.LoadOrStore(pk.Attr, sz)
right, err := x.Parse(tinfo.Right)
if err != nil {
glog.V(2).Infof("Unable to parse key: %v", err)
continue
}
size := val.(*int64)
atomic.AddInt64(size, delta)
}

// TODO: Remove all this.
stream := pstore.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Tablet Size Calculation"
stream.Prefix = []byte{x.DefaultPrefix}
stream.ChooseKey = func(item *badger.Item) bool {
switch item.UserMeta() {
case posting.BitSchemaPosting, posting.BitCompletePosting, posting.BitEmptyPosting:
addTo(item.Key(), item.EstimatedSize())
return false
case x.ByteUnused:
return false
default:
// not doing rollups anymore.
return false
if left.Attr != right.Attr {
// Skip all tables not fully owned by one predicate.
// We could later specifically iterate over these tables to get their estimated sizes.
glog.V(2).Info("Skipping table not owned by one predicate")
continue
}
}

stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
return nil, nil // no-op
}
stream.Send = func(list *bpb.KVList) error {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := stream.Orchestrate(ctx); err != nil {
return err
}

// Only leader sends the tablet size updates to Zero. No one else does.
// doSendMembership is also being concurrently called from another goroutine.
go func() {
tablets := make(map[string]*pb.Tablet)
var total int64
m.Range(func(key, val interface{}) bool {
pred := key.(string)
size := atomic.LoadInt64(val.(*int64))
pred := left.Attr
if tablet, ok := tablets[pred]; ok {
tablet.Space += int64(tinfo.EstimatedSz)
} else {
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
Space: int64(tinfo.EstimatedSz),
}
total += size
return true
})
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
}
}()

return nil
total += int64(tinfo.EstimatedSz)
}
if len(tablets) == 0 {
return
}
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
}
}

var errNoConnection = errors.New("No connection exists")
Expand Down Expand Up @@ -1570,6 +1546,7 @@ func (n *node) InitAndStartNode() {
go n.processApplyCh()
go n.BatchAndSendMessages()
x.Check2(n.startTask(opRollup))
go n.stopAllTasks()
go n.Run()
}

Expand Down