Skip to content

Commit

Permalink
Replace traces with spans from OpenCensus (#2781)
Browse files Browse the repository at this point in the history
* Replace traces with spans from OpenCensus.
  • Loading branch information
manishrjain authored Nov 27, 2018
1 parent 87d66e2 commit c4bb411
Show file tree
Hide file tree
Showing 19 changed files with 93 additions and 260 deletions.
1 change: 1 addition & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ func run() {
}()

if Alpha.Conf.GetBool("expose_trace") {
// TODO: Remove this once we get rid of event logs.
trace.AuthRequest = func(req *http.Request) (any, sensitive bool) {
return true, true
}
Expand Down
6 changes: 0 additions & 6 deletions dgraph/cmd/alpha/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"io/ioutil"
"net/http"

"golang.org/x/net/trace"

"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -54,12 +52,8 @@ func shareHandler(w http.ResponseWriter, r *http.Request) {
x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method")
return
}
ctx := context.Background()
defer r.Body.Close()
if rawQuery, err = ioutil.ReadAll(r.Body); err != nil || len(rawQuery) == 0 {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while reading the stringified query payload: %+v", err)
}
x.SetStatus(w, x.ErrorInvalidRequest, "Invalid request encountered.")
return
}
Expand Down
12 changes: 3 additions & 9 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/golang/glog"
"github.com/google/uuid"
"golang.org/x/net/context"
"golang.org/x/net/trace"
)

type node struct {
Expand Down Expand Up @@ -70,6 +69,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
if ctx.Err() != nil {
return ctx.Err()
}
span := otrace.FromContext(ctx)

propose := func(timeout time.Duration) error {
if !n.AmLeader() {
Expand All @@ -88,17 +88,14 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
x.AssertTruef(n.Proposals.Store(key, pctx), "Found existing proposal with key: [%v]", key)
defer n.Proposals.Delete(key)
proposal.Key = key

// TODO: Remove this and use OpenCensus spans.
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Proposing with key: %X", key)
if span != nil {
span.Annotatef(nil, "Proposing with key: %s. Timeout: %v", key, timeout)
}

data, err := proposal.Marshal()
if err != nil {
return err
}

// Propose the change.
if err := n.Raft().Propose(cctx, data); err != nil {
return x.Wrapf(err, "While proposing")
Expand Down Expand Up @@ -505,9 +502,6 @@ func (n *node) trySnapshot(skip uint64) {
data, err := n.server.MarshalMembershipState()
x.Check(err)

if tr, ok := trace.FromContext(n.ctx); ok {
tr.LazyPrintf("Taking snapshot of state at watermark: %d\n", idx)
}
err = n.Store.CreateSnapshot(idx, n.ConfState(), data)
x.Checkf(err, "While creating snapshot")
glog.Infof("Writing snapshot at index: %d, applied mark: %d\n", idx, n.Applied.DoneUntil())
Expand Down
1 change: 1 addition & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func run() {
}

if Zero.Conf.GetBool("expose_trace") {
// TODO: Remove this once we get rid of event logs.
trace.AuthRequest = func(req *http.Request) (any, sensitive bool) {
return true, true
}
Expand Down
13 changes: 0 additions & 13 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/golang/glog"
otrace "go.opencensus.io/trace"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
Expand Down Expand Up @@ -279,9 +278,6 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
}
empty := &api.Payload{}
if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
}
return empty, err
}
if !isMutationAllowed(ctx) {
Expand Down Expand Up @@ -448,9 +444,6 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons
defer span.End()

if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
}
return resp, err
}

Expand Down Expand Up @@ -522,9 +515,6 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx
defer span.End()

if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
}
return &api.TxnContext{}, err
}

Expand All @@ -546,9 +536,6 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx

func (s *Server) CheckVersion(ctx context.Context, c *api.Check) (v *api.Version, err error) {
if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("request rejected %v", err)
}
return v, err
}

Expand Down
23 changes: 5 additions & 18 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import (
"math"
"time"

"golang.org/x/net/trace"

"github.com/dgraph-io/badger"
"github.com/golang/glog"
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/tok"
Expand Down Expand Up @@ -111,10 +110,6 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge,

x.AssertTrue(plist != nil)
if err = plist.AddMutation(ctx, txn, edge); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error adding/deleting %s for attr %s entity %d: %v",
token, edge.Attr, edge.Entity, err)
}
return err
}
x.PredicateStats.Add("i."+edge.Attr, 1)
Expand Down Expand Up @@ -182,10 +177,6 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro
hasCountIndex := schema.State().HasCount(t.Attr)
cp, err := txn.addReverseMutationHelper(ctx, plist, hasCountIndex, edge)
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error adding/deleting reverse edge for attr %s entity %d: %v",
t.Attr, t.Entity, err)
}
return err
}
x.PredicateStats.Add(fmt.Sprintf("r.%s", edge.Attr), 1)
Expand Down Expand Up @@ -260,10 +251,6 @@ func (txn *Txn) addCountMutation(ctx context.Context, t *pb.DirectedEdge, count
x.AssertTruef(plist != nil, "plist is nil [%s] %d",
t.Attr, t.ValueId)
if err = plist.AddMutation(ctx, txn, t); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error adding/deleting count edge for attr %s count %d dst %d: %v",
t.Attr, count, t.ValueId, err)
}
return err
}
x.PredicateStats.Add(fmt.Sprintf("c.%s", t.Attr), 1)
Expand Down Expand Up @@ -302,9 +289,9 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
l.Lock()
defer l.Unlock()
if dur := time.Since(t1); dur > time.Millisecond {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("acquired lock %v %v %v", dur, t.Attr, t.Entity)
}
span := otrace.FromContext(ctx)
span.Annotatef([]otrace.Attribute{otrace.BoolAttribute("slow-lock", true)},
"Acquired lock %v %v %v", dur, t.Attr, t.Entity)
}

if doUpdateIndex {
Expand Down
5 changes: 0 additions & 5 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"sync/atomic"
"unsafe"

"golang.org/x/net/trace"

"github.com/dgryski/go-farm"
"github.com/golang/glog"

Expand Down Expand Up @@ -310,9 +308,6 @@ func fingerprintEdge(t *pb.DirectedEdge) uint64 {

func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error {
if atomic.LoadInt32(&l.deleteMe) == 1 {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("DELETEME set to true. Temporary error.")
}
return ErrRetry
}
if txn.ShouldAbort() {
Expand Down
4 changes: 0 additions & 4 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"strings"
"time"

"golang.org/x/net/trace"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/y"
"github.com/golang/glog"
Expand All @@ -39,7 +37,6 @@ import (

var (
emptyPostingList []byte // Used for indexing.
elog trace.EventLog
)

const (
Expand All @@ -65,7 +62,6 @@ func init() {
emptyPostingList, err = pl.Marshal()
x.Check(err)
})
elog = trace.NewEventLog("Memory", "")
}

func getMemUsage() int {
Expand Down
16 changes: 4 additions & 12 deletions query/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"fmt"
"strings"

otrace "go.opencensus.io/trace"

"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
"golang.org/x/net/trace"
)

func ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) {
Expand All @@ -40,9 +39,6 @@ func ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, erro
return nil, x.Wrapf(err, "While adding pb.edges")
}
m.Edges = edges
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Added Internal edges")
}
} else {
for _, mu := range m.Edges {
if mu.Attr == x.Star && !worker.Config.ExpandEdge {
Expand All @@ -53,9 +49,8 @@ func ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, erro
}
tctx, err := worker.MutateOverNetwork(ctx, m)
if err != nil {
glog.Errorf("MutateOverNetwork Error: %v. Mutation: %v.", err, m)
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while MutateOverNetwork: %+v", err)
if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, "MutateOverNetwork Error: %v. Mutation: %v.", err, m)
}
}
return tctx, err
Expand Down Expand Up @@ -168,9 +163,6 @@ func AssignUids(ctx context.Context, nquads []*api.NQuad) (map[string]uint64, er
// TODO: Optimize later by prefetching. Also consolidate all the UID requests into a single
// pending request from this server to zero.
if res, err = worker.AssignUidsOverNetwork(ctx, num); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while AssignUidsOverNetwork for newUids: %+v", err)
}
return newUids, err
}
curId := res.StartId
Expand Down
26 changes: 2 additions & 24 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/golang/glog"
otrace "go.opencensus.io/trace"
"golang.org/x/net/trace"
"google.golang.org/grpc/metadata"

"github.com/dgraph-io/dgo/protos/api"
Expand Down Expand Up @@ -1902,17 +1901,11 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
} else {
taskQuery, err := createTaskQuery(sg)
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while processing task: %+v", err)
}
rch <- err
return
}
result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery)
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while processing task: %+v", err)
}
rch <- err
return
}
Expand All @@ -1927,9 +1920,6 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
if sg.Params.DoCount {
if len(sg.Filters) == 0 {
// If there is a filter, we need to do more work to get the actual count.
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Zero uids. Only count requested")
}
rch <- nil
return
}
Expand All @@ -1951,8 +1941,8 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {

if sg.DestUIDs == nil || len(sg.DestUIDs.Uids) == 0 {
// Looks like we're done here. Be careful with nil srcUIDs!
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Zero uids for %q. Num attr children: %v", sg.Attr, len(sg.Children))
if span != nil {
span.Annotatef(nil, "Zero uids for %q", sg.Attr)
}
out := sg.Children[:0]
for _, child := range sg.Children {
Expand Down Expand Up @@ -1993,9 +1983,6 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
// Store error in a variable and wait for all filters to run
// before returning. Else tracing causes crashes.
filterErr = err
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while processing filter task: %+v", err)
}
}
}

Expand Down Expand Up @@ -2116,9 +2103,6 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
}
if err = <-childChan; err != nil {
childErr = err
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while processing child task: %+v", err)
}
}
}
rch <- childErr
Expand Down Expand Up @@ -2533,19 +2517,13 @@ func (req *QueryRequest) ProcessQuery(ctx context.Context) (err error) {
} else {
go ProcessGraph(ctx, sg, nil, errChan)
}
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Graph processed")
}
}

var ferr error
// Wait for the execution that was started in this iteration.
for i := 0; i < len(idxList); i++ {
if err = <-errChan; err != nil {
ferr = err
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while processing Query: %+v", err)
}
continue
}
}
Expand Down
Loading

0 comments on commit c4bb411

Please sign in to comment.