Skip to content

Commit

Permalink
track operations and cancel when needed (#4916)
Browse files Browse the repository at this point in the history
we now track operations in the system such as rollup,
indexing and snapshots. We cancel rollup whenever
indexing or snapshot needs to be taken. We also do
not allow multiple operations at the same time.
  • Loading branch information
mangalaman93 authored Mar 16, 2020
1 parent dc0e6eb commit 2e7d0a1
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 30 deletions.
101 changes: 95 additions & 6 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (
"time"

humanize "github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/net/trace"

ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
Expand All @@ -45,10 +48,6 @@ import (
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"

"github.com/golang/glog"
"golang.org/x/net/trace"
)

type node struct {
Expand All @@ -63,12 +62,81 @@ type node struct {

streaming int32 // Used to avoid calculating snapshot

// Used to track the ops going on in the system.
ops map[int]*operation
opsLock sync.Mutex

canCampaign bool
elog trace.EventLog

pendingSize int64
}

type operation struct {
// id of the operation.
id int
// closer is used to signal and wait for the operation to finish/cancel.
closer *y.Closer
}

const (
opRollup = iota + 1
opSnapshot
opIndexing
)

// startTask is used to check whether an op is already going on.
// If rollup is going on, we cancel and wait for rollup to complete
// before we return. If the same task is already going, we return error.
func (n *node) startTask(id int) (*operation, error) {
n.opsLock.Lock()
defer n.opsLock.Unlock()

switch id {
case opRollup:
if len(n.ops) > 0 {
return nil, errors.Errorf("another operation is already running, ops:%v", n.ops)
}
case opSnapshot, opIndexing:
if op, has := n.ops[opRollup]; has {
glog.Info("Found a rollup going on. Cancelling rollup!")
op.closer.SignalAndWait()
glog.Info("Rollup cancelled.")
} else if len(n.ops) > 0 {
return nil, errors.Errorf("another operation is already running, ops:%v", n.ops)
}
default:
glog.Errorf("Got an unhandled operation %d. Ignoring...", id)
return nil, nil
}

op := &operation{id: id, closer: y.NewCloser(1)}
n.ops[id] = op
glog.Infof("Operation started with id: %d", id)
return op, nil
}

// stopTask will delete the entry from the map that keep tracks of the ops
// and then signal that tasks has been cancelled/completed for waiting task.
func (n *node) stopTask(op *operation) {
op.closer.Done()

n.opsLock.Lock()
delete(n.ops, op.id)
n.opsLock.Unlock()
glog.Infof("Operation completed with id: %d", op.id)
}

func (n *node) waitForTask(id int) {
n.opsLock.Lock()
op, ok := n.ops[id]
n.opsLock.Unlock()
if !ok {
return
}
op.closer.Wait()
}

// Now that we apply txn updates via Raft, waiting based on Txn timestamps is
// sufficient. We don't need to wait for proposals to be applied.

Expand All @@ -93,6 +161,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
rollupCh: make(chan uint64, 3),
elog: trace.NewEventLog("Dgraph", "ApplyCh"),
closer: y.NewCloser(3), // Matches CLOSER:1
ops: make(map[int]*operation),
}
return n
}
Expand Down Expand Up @@ -625,6 +694,12 @@ func (n *node) Snapshot() (*pb.Snapshot, error) {
}

func (n *node) retrieveSnapshot(snap pb.Snapshot) error {
op, err := n.startTask(opSnapshot)
if err != nil {
return err
}
defer n.stopTask(op)

// In some edge cases, the Zero leader might not have been able to update
// the status of Alpha leader. So, instead of blocking forever on waiting
// for Zero to send us the updates info about the leader, we can just use
Expand Down Expand Up @@ -1067,7 +1142,11 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
// rollupLists would consolidate all the deltas that constitute one posting
// list, and write back a complete posting list.
func (n *node) rollupLists(readTs uint64) error {
writer := posting.NewTxnWriter(pstore)
op, err := n.startTask(opRollup)
if err != nil {
return err
}
defer n.stopTask(op)

// We're doing rollups. We should use this opportunity to calculate the tablet sizes.
amLeader := n.AmLeader()
Expand Down Expand Up @@ -1098,6 +1177,7 @@ func (n *node) rollupLists(readTs uint64) error {
atomic.AddInt64(size, delta)
}

writer := posting.NewTxnWriter(pstore)
stream := pstore.NewStreamAt(readTs)
stream.LogPrefix = "Rolling up"
stream.ChooseKey = func(item *badger.Item) bool {
Expand Down Expand Up @@ -1132,7 +1212,16 @@ func (n *node) rollupLists(readTs uint64) error {
stream.Send = func(list *bpb.KVList) error {
return writer.Write(list)
}
if err := stream.Orchestrate(context.Background()); err != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-ctx.Done():
case <-op.closer.HasBeenClosed():
cancel()
}
}()
if err := stream.Orchestrate(ctx); err != nil {
return err
}
if err := writer.Flush(); err != nil {
Expand Down
66 changes: 44 additions & 22 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"
)

var (
Expand Down Expand Up @@ -117,6 +117,17 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e
return plist.AddMutationWithIndex(ctx, edge, txn)
}

func undoSchemaUpdate(predicate string) {
maxRetries := 10
loadErr := x.RetryUntilSuccess(maxRetries, 10*time.Millisecond, func() error {
return schema.Load(predicate)
})

if loadErr != nil {
glog.Fatalf("failed to load schema after %d retries: %v", maxRetries, loadErr)
}
}

func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs uint64) error {
// Wait until schema modification for all predicates is complete. There cannot be two
// background tasks running as this is a race condition. We typically won't propose an
Expand All @@ -131,15 +142,29 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
// not indexing, it would accept and propose the request.
// It is possible that a receiver R of the proposal is still indexing. In that case, R would
// block here and wait for indexing to be finished.
for {
gr.Node.waitForTask(opIndexing)

// done is used to ensure that we only stop the indexing task once.
var done uint32
stopIndexing := func(op *operation) {
if !schema.State().IndexingInProgress() {
break
if atomic.CompareAndSwapUint32(&done, 0, 1) {
gr.Node.stopTask(op)
}
}
glog.Infoln("waiting for indexing to complete")
time.Sleep(time.Second * 2)
}

// Ensure that rollup is not running.
op, err := gr.Node.startTask(opIndexing)
if err != nil {
return err
}
defer stopIndexing(op)

buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error {
// in case background indexing is running, we should call it here again.
defer stopIndexing(op)

wrtCtx := schema.GetWriteContext(context.Background())
if err := rebuild.BuildIndexes(wrtCtx); err != nil {
return err
Expand All @@ -153,7 +178,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
}

// This wg allows waiting until setup for all the predicates is complete
// befor running buildIndexes for any of those predicates.
// before running buildIndexes for any of those predicates.
var wg sync.WaitGroup
wg.Add(1)
defer wg.Done()
Expand All @@ -167,15 +192,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
// undo schema changes in case re-indexing fails.
if err := buildIndexesHelper(update, rebuild); err != nil {
glog.Errorf("error in building indexes, aborting :: %v\n", err)

maxRetries := 10
loadErr := x.RetryUntilSuccess(maxRetries, 10*time.Millisecond, func() error {
return schema.Load(update.Predicate)
})

if loadErr != nil {
glog.Fatalf("failed to load schema after %d retries: %v", maxRetries, loadErr)
}
undoSchemaUpdate(update.Predicate)
}
}

Expand Down Expand Up @@ -204,10 +221,15 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
schema.State().SetMutSchema(su.Predicate, su)

// TODO(Aman): If we return an error, we may not have right schema reflected.
if err := rebuild.DropIndexes(ctx); err != nil {
return err
setup := func() error {
if err := rebuild.DropIndexes(ctx); err != nil {
return err
}
return rebuild.BuildData(ctx)
}
if err := rebuild.BuildData(ctx); err != nil {
if err := setup(); err != nil {
glog.Errorf("error in building indexes, aborting :: %v\n", err)
undoSchemaUpdate(su.Predicate)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions worker/proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestLimiterDeadlock(t *testing.T) {
"Total proposals: ", atomic.LoadInt64(&currentCount),
"Pending proposal: ", atomic.LoadInt64(&pending),
"Completed Proposals: ", atomic.LoadInt64(&completed),
"Aboted Proposals: ", atomic.LoadInt64(&aborted),
"Aborted Proposals: ", atomic.LoadInt64(&aborted),
"IOU: ", l.iou)
}
}()
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestLimiterDeadlock(t *testing.T) {

// After trying all the proposals, (completed + aborted) should be equal to tried proposal.
require.True(t, toTry == completed+aborted,
fmt.Sprintf("Tried: %d, Compteted: %d, Aboted: %d", toTry, completed, aborted))
fmt.Sprintf("Tried: %d, Compteted: %d, Aborted: %d", toTry, completed, aborted))
}

func BenchmarkRateLimiter(b *testing.B) {
Expand Down

0 comments on commit 2e7d0a1

Please sign in to comment.