Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Rollup and Tablet Size Calculation #4972

Merged
merged 25 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5f07dc9
Bring incremental rollup commit
parasssh Jan 9, 2020
75233b5
Incremental Rollups -- let's track versions of keys.
manishrjain Mar 16, 2020
76d2838
Merge master
manishrjain Mar 16, 2020
fe10d20
Build works with master merge
manishrjain Mar 16, 2020
527a082
Fix map ini
manishrjain Mar 17, 2020
50e7544
Some evidence to show it's not just writes, but write in the range of…
manishrjain Mar 17, 2020
700b54b
Some more tries to understand the behavior
manishrjain Mar 17, 2020
53b1ae5
Merge branch 'master' into mrjn/incremental
Mar 18, 2020
55de90c
Using Martin's fix.
manishrjain Mar 18, 2020
eabfe9a
Make rollup goroutine work with the task tracking
manishrjain Mar 18, 2020
3bbde8c
Cleaned up a bit of the log messages by using strings and converting …
manishrjain Mar 18, 2020
401d6a7
Merge branch 'master' into mrjn/incremental
Mar 18, 2020
7f2a9d7
Merge branch 'mrjn/incremental' of https://github.com/dgraph-io/dgrap…
Mar 18, 2020
1656b38
Fix an issue where we were only sending the keys for rollup which did…
manishrjain Mar 18, 2020
ace03ac
Switch tablet size calculation to use DB.Tables method.
manishrjain Mar 19, 2020
000727f
Ensure that when closing, all registered tasks are properly stopped.
manishrjain Mar 19, 2020
2743576
Revert unnecessary change in list.go
manishrjain Mar 19, 2020
ea1081b
Better handle if another task is going on.
manishrjain Mar 19, 2020
280c634
Call rollup even if we find one delta posting
manishrjain Mar 19, 2020
42fb5e8
Add a note about iterating over tables
manishrjain Mar 19, 2020
71d89dd
fix GoLang CI comments
Mar 19, 2020
a4a7c59
Merge branch 'mrjn/incremental' of https://github.com/dgraph-io/dgrap…
Mar 19, 2020
d4cea37
Revert CI changes done by Paras. Add comment about why.
manishrjain Mar 19, 2020
b4a07c0
Remove question, add comment
manishrjain Mar 19, 2020
50e5b72
Merge master
manishrjain Mar 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make rollup goroutine work with the task tracking
  • Loading branch information
manishrjain committed Mar 18, 2020
commit eabfe9ad318c0a5e3bc8b62ea1fdd9bce61ab5c7
58 changes: 29 additions & 29 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -90,13 +91,6 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
for _, kv := range kvs {
hash := z.MemHash(kv.Key)
ir.versions[hash] = kv.Version

// Just to see if writes are causing the issue or if it is writes in the range of reads.
// Artificially set the value to nil and version to really low.
// The following three lines can be taken out.
// kv.Value = nil
// kv.UserMeta = []byte{BitEmptyPosting}
// kv.Version = 1 // Artifically set it to really low.
}
return writer.Write(&bpb.KVList{Kv: kvs})
}
Expand All @@ -119,34 +113,40 @@ func (ir *incrRollupi) addKeyToBatch(key []byte) {
}

// Process will rollup batches of 64 keys in a go routine.
func (ir *incrRollupi) Process() {
m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
func (ir *incrRollupi) Process(closer *y.Closer) {
defer closer.Done()

writer := NewTxnWriter(pstore)
defer writer.Flush()

for batch := range ir.keysCh {
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem, ok := m[hash]; !ok || (currTs-elem >= 10) {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
glog.Infof("Rolling up key: %x %q\n", key, key)
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
continue
m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-closer.HasBeenClosed():
return
case batch := <-ir.keysCh:
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem, ok := m[hash]; !ok || (currTs-elem >= 10) {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
continue
}
}
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)

// throttle to 1 batch = 64 rollups per 100 ms.
<-limiter.C
// throttle to 1 batch = 64 rollups per 100 ms.
<-limiter.C
}
}
// keysCh is closed. This should never happen.
}

// ShouldAbort returns whether the transaction should be aborted.
Expand Down
119 changes: 53 additions & 66 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -54,16 +55,15 @@ type node struct {
*conn.Node

// Fields which are never changed after init.
applyCh chan []*pb.Proposal
rollupCh chan uint64 // Channel to run posting list rollups.
ctx context.Context
gid uint32
closer *y.Closer
applyCh chan []*pb.Proposal
ctx context.Context
gid uint32
closer *y.Closer

streaming int32 // Used to avoid calculating snapshot

// Used to track the ops going on in the system.
ops map[int]*operation
ops map[int]*y.Closer
opsLock sync.Mutex

canCampaign bool
Expand All @@ -72,13 +72,6 @@ type node struct {
pendingSize int64
}

type operation struct {
// id of the operation.
id int
// closer is used to signal and wait for the operation to finish/cancel.
closer *y.Closer
}

const (
opRollup = iota + 1
opSnapshot
Expand All @@ -88,53 +81,59 @@ const (
// startTask is used to check whether an op is already going on.
// If rollup is going on, we cancel and wait for rollup to complete
// before we return. If the same task is already going, we return error.
func (n *node) startTask(id int) (*operation, error) {
func (n *node) startTask(id int) (*y.Closer, error) {
n.opsLock.Lock()
defer n.opsLock.Unlock()

stopTask := func(id int) {
n.opsLock.Lock()
delete(n.ops, id)
n.opsLock.Unlock()
glog.Infof("Operation completed with id: %v", id)

// If we were doing any other operation, let's restart rollups.
if id != opRollup {
x.Check2(n.startTask(opRollup))
}
}

closer := y.NewCloser(1)

switch id {
case opRollup:
if len(n.ops) > 0 {
return nil, errors.Errorf("another operation is already running, ops:%v", n.ops)
return nil, errors.Errorf("another operation is already running")
}
go posting.IncrRollup.Process(closer)

case opSnapshot, opIndexing:
if op, has := n.ops[opRollup]; has {
glog.Info("Found a rollup going on. Cancelling rollup!")
op.closer.SignalAndWait()
glog.Info("Rollup cancelled.")
if roCloser, has := n.ops[opRollup]; has {
roCloser.SignalAndWait()
} else if len(n.ops) > 0 {
return nil, errors.Errorf("another operation is already running, ops:%v", n.ops)
return nil, errors.Errorf("another operation is already running")
}
default:
glog.Errorf("Got an unhandled operation %d. Ignoring...", id)
glog.Errorf("Got an unhandled operation %v. Ignoring...", id)
return nil, nil
}

op := &operation{id: id, closer: y.NewCloser(1)}
n.ops[id] = op
glog.Infof("Operation started with id: %d", id)
return op, nil
}

// stopTask will delete the entry from the map that keep tracks of the ops
// and then signal that tasks has been cancelled/completed for waiting task.
func (n *node) stopTask(op *operation) {
op.closer.Done()

n.opsLock.Lock()
delete(n.ops, op.id)
n.opsLock.Unlock()
glog.Infof("Operation completed with id: %d", op.id)
n.ops[id] = closer
glog.Infof("Operation started with id: %v", id)
go func(id int, closer *y.Closer) {
closer.Wait()
stopTask(id)
}(id, closer)
return closer, nil
}

func (n *node) waitForTask(id int) {
n.opsLock.Lock()
op, ok := n.ops[id]
closer, ok := n.ops[id]
n.opsLock.Unlock()
if !ok {
return
}
op.closer.Wait()
closer.Wait()
}

// Now that we apply txn updates via Raft, waiting based on Txn timestamps is
Expand All @@ -157,11 +156,10 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
// We need a generous size for applyCh, because raft.Tick happens every
// 10ms. If we restrict the size here, then Raft goes into a loop trying
// to maintain quorum health.
applyCh: make(chan []*pb.Proposal, 1000),
rollupCh: make(chan uint64, 3),
elog: trace.NewEventLog("Dgraph", "ApplyCh"),
closer: y.NewCloser(3), // Matches CLOSER:1
ops: make(map[int]*operation),
applyCh: make(chan []*pb.Proposal, 1000),
elog: trace.NewEventLog("Dgraph", "ApplyCh"),
closer: y.NewCloser(3), // Matches CLOSER:1
ops: make(map[int]*y.Closer),
}
return n
}
Expand Down Expand Up @@ -502,37 +500,28 @@ func (n *node) applyCommitted(proposal *pb.Proposal) error {
}
glog.Warningf("Error while calling CreateSnapshot: %v. Retrying...", err)
}
// Roll up all posting lists as a best-effort operation.
n.rollupCh <- snap.ReadTs
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(snap.ReadTs)
return nil
}
x.Fatalf("Unknown proposal: %+v", proposal)
return nil
}

func (n *node) processRollups() {
func (n *node) processTabletSizes() {
defer n.closer.Done() // CLOSER:1
tick := time.NewTicker(5 * time.Minute) // Rolling up once every 5 minutes seems alright.
defer tick.Stop()

var readTs, last uint64
for {
select {
case <-n.closer.HasBeenClosed():
return
case readTs = <-n.rollupCh:
case <-tick.C:
glog.V(3).Infof("Evaluating rollup readTs:%d last:%d rollup:%v", readTs, last, readTs > last)
if readTs <= last {
break // Break out of the select case.
}
if err := n.calcTabletSizes(readTs); err != nil {
if err := n.calcTabletSizes(); err != nil {
// If we encounter error here, we don't need to do anything about
// it. Just let the user know.
glog.Errorf("Error while rolling up lists at %d: %v\n", readTs, err)
} else {
last = readTs // Update last only if we succeeded.
glog.Infof("Last rollup at Ts %d: OK.\n", readTs)
glog.Errorf("Error while calculating tablet sizes: %v", err)
}
}
}
Expand Down Expand Up @@ -694,11 +683,11 @@ func (n *node) Snapshot() (*pb.Snapshot, error) {
}

func (n *node) retrieveSnapshot(snap pb.Snapshot) error {
op, err := n.startTask(opSnapshot)
closer, err := n.startTask(opSnapshot)
if err != nil {
return err
}
defer n.stopTask(op)
defer closer.Done()

// In some edge cases, the Zero leader might not have been able to update
// the status of Alpha leader. So, instead of blocking forever on waiting
Expand Down Expand Up @@ -1140,10 +1129,7 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
}

// calcTabletSizes updates the tablet sizes for the keys.
func (n *node) calcTabletSizes(readTs uint64) error {
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(readTs)

func (n *node) calcTabletSizes() error {
if !n.AmLeader() {
// Only leader needs to calculate the tablet sizes.
return nil
Expand Down Expand Up @@ -1172,7 +1158,8 @@ func (n *node) calcTabletSizes(readTs uint64) error {
atomic.AddInt64(size, delta)
}

stream := pstore.NewStreamAt(readTs)
// TODO: Remove all this.
stream := pstore.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Tablet Size Calculation"
stream.Prefix = []byte{x.DefaultPrefix}
stream.ChooseKey = func(item *badger.Item) bool {
Expand Down Expand Up @@ -1563,10 +1550,10 @@ func (n *node) InitAndStartNode() {
n.canCampaign = true
}
}
go n.processRollups()
go n.processTabletSizes()
go n.processApplyCh()
go n.BatchAndSendMessages()
go posting.IncrRollup.Process()
n.startTask(opRollup)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of n.startTask is not checked (from errcheck)

go n.Run()
}

Expand Down
12 changes: 7 additions & 5 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/conn"
Expand Down Expand Up @@ -146,24 +147,25 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs

// done is used to ensure that we only stop the indexing task once.
var done uint32
stopIndexing := func(op *operation) {
stopIndexing := func(closer *y.Closer) {
if !schema.State().IndexingInProgress() {
if atomic.CompareAndSwapUint32(&done, 0, 1) {
gr.Node.stopTask(op)
// Q: Shouldn't this done be called via defer in this func?
closer.Done()
}
}
}

// Ensure that rollup is not running.
op, err := gr.Node.startTask(opIndexing)
closer, err := gr.Node.startTask(opIndexing)
if err != nil {
return err
}
defer stopIndexing(op)
defer stopIndexing(closer)

buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error {
// in case background indexing is running, we should call it here again.
defer stopIndexing(op)
defer stopIndexing(closer)

wrtCtx := schema.GetWriteContext(context.Background())
if err := rebuild.BuildIndexes(wrtCtx); err != nil {
Expand Down