Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track operations and cancel when needed #4916

Merged
merged 12 commits into from
Mar 16, 2020
Next Next commit
cancel rollup for bgindex and snapshots
  • Loading branch information
mangalaman93 committed Mar 16, 2020
commit 232e09e60aab287e8be8aee6fdc76bd05d26aab3
110 changes: 104 additions & 6 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (
"time"

humanize "github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/net/trace"

ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
Expand All @@ -45,10 +48,6 @@ import (
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"

"github.com/golang/glog"
"golang.org/x/net/trace"
)

type node struct {
Expand All @@ -63,12 +62,99 @@ type node struct {

streaming int32 // Used to avoid calculating snapshot

// Used to track the ops going on in the system.
ops map[int]*operation
opsLock sync.Mutex

canCampaign bool
elog trace.EventLog

pendingSize int64
}

type operation struct {
// cancel is used to signal that task should be cancelled.
cancel context.CancelFunc
// done is used to wait for the task until it is either cancelled or completed.
done chan struct{}
}

const (
opRollUp = iota + 1
opSnapshot
opBGIndex
)

var (
errOpInProgress = errors.New("another operation is already running")
)

// startRollUp will check whether rollup is already going on. If not,
// it will return a context that can be used to cancel the rollup if needed.
func (n *node) startRollUp() (context.Context, error) {
n.opsLock.Lock()
defer n.opsLock.Unlock()

if len(n.ops) > 0 {
return nil, errOpInProgress
}
ctx, cancel := context.WithCancel(context.Background())
n.ops[opRollUp] = &operation{cancel: cancel, done: make(chan struct{}, 1)}
return ctx, nil
}

// startTask is used to check whether the an op is already going on.
// If rollup is going on, we cancel and wait for rollup to complete
// before we return. If the same task is already going, we return error.
func (n *node) startTask(id int) error {
n.opsLock.Lock()
defer n.opsLock.Unlock()

// For bgindex, we may not call stopTask.
if !schema.State().IndexingInProgress() {
delete(n.ops, opBGIndex)
}

switch id {
case opSnapshot, opBGIndex:
if rop, has := n.ops[opRollUp]; has {
glog.Info("Found a rollup going on. Cancelling rollup!")
rop.cancel()
<-rop.done
glog.Info("Rollup cancelled.")
} else if _, has := n.ops[id]; has {
return errOpInProgress
}
n.ops[opSnapshot] = &operation{done: make(chan struct{}, 1)}
default:
glog.Infof("Got an unhandled operation %d. Ignoring...", id)
}

return nil
}

// stopTask will delete the entry from the map that keep tracks of the ops
// and then signal that taks has been cancelled/completed for waiting task.
func (n *node) stopTask(id int) {
n.opsLock.Lock()
defer n.opsLock.Unlock()

op, ok := n.ops[id]
if !ok {
return
}

// Cancel the context and delete op from the operations map.
delete(n.ops, id)
if op.cancel != nil {
op.cancel()
}

// Signal that task is completed or cancelled.
op.done <- struct{}{}
glog.Infof("Operation complete with id: %d", id)
}

// Now that we apply txn updates via Raft, waiting based on Txn timestamps is
// sufficient. We don't need to wait for proposals to be applied.

Expand All @@ -93,6 +179,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
rollupCh: make(chan uint64, 3),
elog: trace.NewEventLog("Dgraph", "ApplyCh"),
closer: y.NewCloser(3), // Matches CLOSER:1
ops: make(map[int]*operation),
}
return n
}
Expand Down Expand Up @@ -625,6 +712,11 @@ func (n *node) Snapshot() (*pb.Snapshot, error) {
}

func (n *node) retrieveSnapshot(snap pb.Snapshot) error {
if err := n.startTask(opSnapshot); err != nil {
return err
}
defer n.stopTask(opSnapshot)

// In some edge cases, the Zero leader might not have been able to update
// the status of Alpha leader. So, instead of blocking forever on waiting
// for Zero to send us the updates info about the leader, we can just use
Expand Down Expand Up @@ -1067,7 +1159,11 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
// rollupLists would consolidate all the deltas that constitute one posting
// list, and write back a complete posting list.
func (n *node) rollupLists(readTs uint64) error {
writer := posting.NewTxnWriter(pstore)
ctx, err := n.startRollUp()
if err != nil {
return err
}
defer n.stopTask(opRollUp)

// We're doing rollups. We should use this opportunity to calculate the tablet sizes.
amLeader := n.AmLeader()
Expand Down Expand Up @@ -1098,6 +1194,7 @@ func (n *node) rollupLists(readTs uint64) error {
atomic.AddInt64(size, delta)
}

writer := posting.NewTxnWriter(pstore)
stream := pstore.NewStreamAt(readTs)
stream.LogPrefix = "Rolling up"
stream.ChooseKey = func(item *badger.Item) bool {
Expand Down Expand Up @@ -1132,7 +1229,8 @@ func (n *node) rollupLists(readTs uint64) error {
stream.Send = func(list *bpb.KVList) error {
return writer.Write(list)
}
if err := stream.Orchestrate(context.Background()); err != nil {

if err := stream.Orchestrate(ctx); err != nil {
return err
}
if err := writer.Flush(); err != nil {
Expand Down
14 changes: 9 additions & 5 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@ import (
"sync"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"
)

var (
Expand Down Expand Up @@ -139,6 +138,11 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
time.Sleep(time.Second * 2)
}

// Ensure that rollup is not running.
if err := gr.Node.startTask(opBGIndex); err != nil {
return err
}

buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error {
wrtCtx := schema.GetWriteContext(context.Background())
if err := rebuild.BuildIndexes(wrtCtx); err != nil {
Expand Down