Skip to content

Commit

Permalink
Use errGroup in handleValuePostings (dgraph-io#5138)
Browse files Browse the repository at this point in the history
This PR improves the handling of goroutines in handleValuePostings
function. The function starts a bunch of goroutines but doesn't
wait for them to complete. It returns when the first goroutine
returns an error. This PR fixes that issue.
  • Loading branch information
Ibrahim Jarif authored and dna2github committed Jul 18, 2020
1 parent 9da871c commit 3b9109e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
go.opencensus.io v0.21.0
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20200116001909-b77594299b42
golang.org/x/text v0.3.2
google.golang.org/genproto v0.0.0-20190516172635-bb713bdc0e52 // indirect
Expand Down
16 changes: 8 additions & 8 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
otrace "go.opencensus.io/trace"
"golang.org/x/sync/errgroup"

"github.com/golang/protobuf/proto"
cindex "github.com/google/codesearch/index"
Expand Down Expand Up @@ -364,7 +365,6 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
x.AssertTrue(width > 0)
span.Annotatef(nil, "Width: %d. NumGo: %d", width, numGo)

errCh := make(chan error, numGo)
outputs := make([]*pb.Result, numGo)
listType := schema.State().IsList(q.Attr)

Expand Down Expand Up @@ -484,21 +484,21 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
return nil
} // End of calculate function.

var g errgroup.Group
for i := 0; i < numGo; i++ {
start := i * width
end := start + width
if end > srcFn.n {
end = srcFn.n
}
go func(start, end int) {
errCh <- calculate(start, end)
}(start, end)
g.Go(func() error {
return calculate(start, end)
})
}
for i := 0; i < numGo; i++ {
if err := <-errCh; err != nil {
return err
}
if err := g.Wait(); err != nil {
return err
}

// All goroutines are done. Now attach their results.
out := args.out
for _, chunk := range outputs {
Expand Down

0 comments on commit 3b9109e

Please sign in to comment.