diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 0d43255999a..a2a1672176a 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -636,7 +636,7 @@ func (n *node) Run() { go n.updateZeroMembershipPeriodically(closer) go n.checkQuorum(closer) go n.RunReadIndexLoop(closer, readStateCh) - if x.WorkerConfig.LudicrousMode { + if opts.LudicrousMode { closer.AddRunning(1) go x.StoreSync(n.Store, closer) } diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 5667b800269..bec1ac87f2b 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -54,6 +54,7 @@ type options struct { peer string w string rebalanceInterval time.Duration + LudicrousMode bool } var opts options @@ -180,6 +181,7 @@ func run() { peer: Zero.Conf.GetString("peer"), w: Zero.Conf.GetString("wal"), rebalanceInterval: Zero.Conf.GetDuration("rebalance_interval"), + LudicrousMode: Zero.Conf.GetBool("ludicrous_mode"), } x.WorkerConfig = x.WorkerOptions{ diff --git a/posting/list.go b/posting/list.go index 0ceb43d037c..7877d6447d7 100644 --- a/posting/list.go +++ b/posting/list.go @@ -463,6 +463,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed } if x.WorkerConfig.LudicrousMode { + // Conflict detection is not required for ludicrous mode. return nil } diff --git a/worker/background_mutation.go b/worker/background_mutation.go new file mode 100644 index 00000000000..464c09cee26 --- /dev/null +++ b/worker/background_mutation.go @@ -0,0 +1,120 @@ +/* + * Copyright 2016-2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package worker contains code for pb.worker communication to perform +// queries and mutations. +package worker + +import ( + "context" + "sync" + + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/golang/glog" +) + +type subMutation struct { + edges []*pb.DirectedEdge + ctx context.Context + startTs uint64 +} + +type executor struct { + sync.RWMutex + predChan map[string]chan *subMutation +} + +func newExecutor() *executor { + return &executor{ + predChan: make(map[string]chan *subMutation), + } +} + +func (e *executor) processMutationCh(ch chan *subMutation) { + writer := posting.NewTxnWriter(pstore) + for payload := range ch { + select { + case <-ShutdownCh: + // Ignore all the unfinished mutation after shutdown signal. + glog.Infof("Ignoring further unfinished mutations") + return + default: + } + ptxn := posting.NewTxn(payload.startTs) + for _, edge := range payload.edges { + for { + err := runMutation(payload.ctx, edge, ptxn) + if err == nil { + break + } + if err != posting.ErrRetry { + glog.Errorf("Error while mutating: %v", err) + break + } + } + } + ptxn.Update() + if err := ptxn.CommitToDisk(writer, payload.startTs); err != nil { + glog.Errorf("Error while commiting to disk: %v", err) + } + // TODO(Animesh): We might not need this wait. + if err := writer.Wait(); err != nil { + glog.Errorf("Error while waiting for writes: %v", err) + } + } +} + +func (e *executor) getChannel(pred string) (ch chan *subMutation) { + e.RLock() + ch, ok := e.predChan[pred] + e.RUnlock() + if ok { + return ch + } + + // Create a new channel for `pred`. + e.Lock() + defer e.Unlock() + ch, ok = e.predChan[pred] + if ok { + return ch + } + ch = make(chan *subMutation, 1000) + e.predChan[pred] = ch + go e.processMutationCh(ch) + return ch +} + +func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.DirectedEdge) { + payloadMap := make(map[string]*subMutation) + + for _, edge := range edges { + payload, ok := payloadMap[edge.Attr] + if !ok { + payloadMap[edge.Attr] = &subMutation{ + ctx: ctx, + startTs: startTs, + } + payload = payloadMap[edge.Attr] + } + payload.edges = append(payload.edges, edge) + } + + for attr, payload := range payloadMap { + e.getChannel(attr) <- payload + } +} diff --git a/worker/draft.go b/worker/draft.go index 2cb80575ff2..f47100ec019 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -67,6 +67,8 @@ type node struct { elog trace.EventLog pendingSize int64 + + ex *executor } type op int @@ -193,6 +195,9 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) * closer: y.NewCloser(4), // Matches CLOSER:1 ops: make(map[op]*y.Closer), } + if x.WorkerConfig.LudicrousMode { + n.ex = newExecutor() + } return n } @@ -379,14 +384,6 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr } m := proposal.Mutations - txn := posting.Oracle().RegisterStartTs(m.StartTs) - if txn.ShouldAbort() { - span.Annotatef(nil, "Txn %d should abort.", m.StartTs) - return zero.ErrConflict - } - - // Discard the posting lists from cache to release memory at the end. - defer txn.Update() // It is possible that the user gives us multiple versions of the same edge, one with no facets // and another with facets. In that case, use stable sort to maintain the ordering given to us @@ -402,6 +399,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr return ei.GetEntity() < ej.GetEntity() }) + if x.WorkerConfig.LudicrousMode { + n.ex.addEdges(ctx, m.StartTs, m.Edges) + return nil + } + + txn := posting.Oracle().RegisterStartTs(m.StartTs) + if txn.ShouldAbort() { + span.Annotatef(nil, "Txn %d should abort.", m.StartTs) + return zero.ErrConflict + } + // Discard the posting lists from cache to release memory at the end. + defer txn.Update() + process := func(edges []*pb.DirectedEdge) error { var retries int for _, edge := range edges {