Skip to content

Commit

Permalink
Various optimizations for Geo queries (#3805)
Browse files Browse the repository at this point in the history
- Remove query cache which is causing contention.
- Make geo filtering execute in parallel.
- Fix up convertGeom to also take geometries, not just coordinates.

Changes:
* Remove query cache which is causing contention. Make geo filtering execute in parallel. Fix up convertGeom to also take geometries, not just coordinates.
* Add another simpler case for dealing with no facets tree in handleUidPostings
* Use spans instead of glog.Infofs
* Only output to span if numGo > 1. Also, simplify logic a bit.
* Create a new span for processTask
* Refactor checking closed loop into a single func.
* Fix test failure: Handle another facet condition.
  • Loading branch information
manishrjain authored Aug 13, 2019
1 parent dc1d25e commit 77f024a
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 76 deletions.
2 changes: 1 addition & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Respo
if req.StartTs == 0 {
req.StartTs = posting.Oracle().MaxAssigned()
}
queryRequest.Cache = worker.NoTxnCache
queryRequest.Cache = worker.NoCache
}

if req.StartTs == 0 {
Expand Down
10 changes: 5 additions & 5 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (
ErrRetry = errors.New("Temporary error. Please retry")
// ErrNoValue would be returned if no value was found in the posting list.
ErrNoValue = errors.New("No value found")
errStopIteration = errors.New("Stop iteration")
ErrStopIteration = errors.New("Stop iteration")
emptyPosting = &pb.Posting{}
maxListSize = mb / 2
)
Expand Down Expand Up @@ -660,7 +660,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
log.Fatalf("Unhandled case during iteration of posting list.")
}
}
if err == errStopIteration {
if err == ErrStopIteration {
return nil
}
return err
Expand All @@ -673,7 +673,7 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
var count int
err := l.iterate(readTs, afterUid, func(p *pb.Posting) error {
count++
return errStopIteration
return ErrStopIteration
})
if err != nil {
return false, err
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func (l *List) postingForLangs(readTs uint64, langs []string) (pos *pb.Posting,
if p.PostingType == pb.Posting_VALUE_LANG {
pos = p
found = true
return errStopIteration
return ErrStopIteration
}
return nil
})
Expand Down Expand Up @@ -1112,7 +1112,7 @@ func (l *List) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posti
pos = p
found = true
}
return errStopIteration
return ErrStopIteration
})

return found, pos, err
Expand Down
2 changes: 1 addition & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestAddMutation(t *testing.T) {
func getFirst(l *List, readTs uint64) (res pb.Posting) {
l.Iterate(readTs, 0, func(p *pb.Posting) error {
res = *p
return errStopIteration
return ErrStopIteration
})
return res
}
Expand Down
70 changes: 43 additions & 27 deletions types/s2.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,52 @@ func Intersects(l1 *s2.Loop, l2 *s2.Loop) bool {
return intersects(l1, l2)
}

func closed(coords []geom.Coord) bool {
l := len(coords)
return coords[0][0] == coords[l-1][0] && coords[0][1] == coords[l-1][1]
}

func convertToGeom(str string) (geom.T, error) {
// validate would ensure that we have a closed loop for all the polygons. We don't support open
// loop polygons.
closed := func(p *geom.Polygon) error {
coords := p.Coords()
if len(coords) == 0 {
return errors.Errorf("Got empty polygon.")
}
// Check that first ring is closed.
c := coords[0]
l := len(c)
if c[0][0] == c[l-1][0] && c[0][1] == c[l-1][1] {
return nil
}
return errors.Errorf("Last coord not same as first")
}

validate := func(g geom.T) (geom.T, error) {
switch v := g.(type) {
case *geom.MultiPolygon:
for i := 0; i < v.NumPolygons(); i++ {
if err := closed(v.Polygon(i)); err != nil {
return nil, err
}
}
case *geom.Polygon:
if err := closed(v); err != nil {
return nil, err
}
}
return g, nil
}

var g geojson.Geometry
if err := json.Unmarshal([]byte(str), &g); err == nil {
t, err := g.Decode()
if err != nil {
return nil, err
}
return validate(t)
}

s := x.WhiteSpace.Replace(str)
if len(s) < 5 { // [1,2]
return nil, errors.Errorf("Invalid coordinates")
}
var g geojson.Geometry
var m json.RawMessage
var err error

Expand All @@ -156,18 +191,7 @@ func convertToGeom(str string) (geom.T, error) {
if err != nil {
return nil, errors.Wrapf(err, "Invalid coordinates")
}
mp := g1.(*geom.MultiPolygon)
for i := 0; i < mp.NumPolygons(); i++ {
coords := mp.Polygon(i).Coords()
if len(coords) == 0 {
return nil, errors.Errorf("Got empty polygon inside multi-polygon.")
}
// Check that first ring is closed.
if !closed(mp.Polygon(i).Coords()[0]) {
return nil, errors.Errorf("Last coord not same as first")
}
}
return g1, nil
return validate(g1)
}

if s[0:3] == "[[[" {
Expand All @@ -181,15 +205,7 @@ func convertToGeom(str string) (geom.T, error) {
if err != nil {
return nil, errors.Wrapf(err, "Invalid coordinates")
}
coords := g1.(*geom.Polygon).Coords()
if len(coords) == 0 {
return nil, errors.Errorf("Got empty polygon.")
}
// Check that first ring is closed.
if !closed(coords[0]) {
return nil, errors.Errorf("Last coord not same as first")
}
return g1, nil
return validate(g1)
}

if s[0] == '[' {
Expand Down
119 changes: 77 additions & 42 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,11 +632,10 @@ func (qs *queryState) handleUidPostings(
tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}}
out.UidMatrix = append(out.UidMatrix, tlist)
}
default:
case q.FacetParam != nil || facetsTree != nil:
if i == 0 {
span.Annotate(nil, "default")
span.Annotate(nil, "default with facets")
}

uidList := &pb.List{
Uids: make([]uint64, 0, pl.ApproxLen()),
}
Expand Down Expand Up @@ -668,6 +667,15 @@ func (qs *queryState) handleUidPostings(
if q.FacetParam != nil {
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList})
}
default:
if i == 0 {
span.Annotate(nil, "default no facets")
}
uidList, err := pl.Uids(opts)
if err != nil {
return err
}
out.UidMatrix = append(out.UidMatrix, uidList)
}
}
return nil
Expand Down Expand Up @@ -695,19 +703,26 @@ func (qs *queryState) handleUidPostings(
out.Counts = append(out.Counts, chunk.Counts...)
out.UidMatrix = append(out.UidMatrix, chunk.UidMatrix...)
}
var total int
for _, list := range out.UidMatrix {
total += len(list.Uids)
}
span.Annotatef(nil, "Total number of elements in matrix: %d", total)
return nil
}

const (
// UseTxnCache indicates the transaction cache should be used.
UseTxnCache = iota
// NoTxnCache indicates no transaction caches should be used.
NoTxnCache
// NoCache indicates no caches should be used.
NoCache
)

// processTask processes the query, accumulates and returns the result.
func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) {
span := otrace.FromContext(ctx)
ctx, span := otrace.StartSpan(ctx, "processTask."+q.Attr)
defer span.End()

stop := x.SpanTimer(span, "processTask"+q.Attr)
defer stop()

Expand Down Expand Up @@ -743,9 +758,8 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro
if q.Cache == UseTxnCache {
qs.cache = posting.Oracle().CacheAt(q.ReadTs)
}
if qs.cache == nil {
qs.cache = posting.NewLocalCache(q.ReadTs)
}
// For now, remove the query level cache. It is causing contention for queries with high
// fan-out.

out, err := qs.helpProcessTask(ctx, q, gid)
if err != nil {
Expand Down Expand Up @@ -867,7 +881,7 @@ func (qs *queryState) helpProcessTask(
// If geo filter, do value check for correctness.
if srcFn.geoQuery != nil {
span.Annotate(nil, "handleGeoFunction")
if err := qs.filterGeoFunction(funcArgs{q, gid, srcFn, out}); err != nil {
if err := qs.filterGeoFunction(ctx, funcArgs{q, gid, srcFn, out}); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1236,49 +1250,70 @@ func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) err
return nil
}

func (qs *queryState) filterGeoFunction(arg funcArgs) error {
func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error {
span := otrace.FromContext(ctx)
stop := x.SpanTimer(span, "filterGeoFunction")
defer stop()

attr := arg.q.Attr
uids := algo.MergeSorted(arg.out.UidMatrix)
isList := schema.State().IsList(attr)
filtered := &pb.List{}
for _, uid := range uids.Uids {
pl, err := qs.cache.Get(x.DataKey(attr, uid))
if err != nil {
return err
}
if !isList {
val, err := pl.Value(arg.q.ReadTs)
if err == posting.ErrNoValue {
continue
} else if err != nil {
numGo, width := x.DivideAndRule(len(uids.Uids))
if span != nil && numGo > 1 {
span.Annotatef(nil, "Number of uids: %d. NumGo: %d. Width: %d\n",
len(uids.Uids), numGo, width)
}

filtered := make([]*pb.List, numGo)
filter := func(idx, start, end int) error {
filtered[idx] = &pb.List{}
out := filtered[idx]
for _, uid := range uids.Uids[start:end] {
pl, err := qs.cache.Get(x.DataKey(attr, uid))
if err != nil {
return err
}
newValue := &pb.TaskValue{ValType: val.Tid.Enum(), Val: val.Value.([]byte)}
if types.MatchGeo(newValue, arg.srcFn.geoQuery) {
filtered.Uids = append(filtered.Uids, uid)
var tv pb.TaskValue
err = pl.Iterate(arg.q.ReadTs, 0, func(p *pb.Posting) error {
tv.ValType = p.ValType
tv.Val = p.Value
if types.MatchGeo(&tv, arg.srcFn.geoQuery) {
out.Uids = append(out.Uids, uid)
return posting.ErrStopIteration
}
return nil
})
if err != nil {
return err
}

continue
}
return nil
}

// list type
vals, err := pl.AllValues(arg.q.ReadTs)
if err == posting.ErrNoValue {
continue
} else if err != nil {
return err
errCh := make(chan error, numGo)
for i := 0; i < numGo; i++ {
start := i * width
end := start + width
if end > len(uids.Uids) {
end = len(uids.Uids)
}
for _, val := range vals {
newValue := &pb.TaskValue{ValType: val.Tid.Enum(), Val: val.Value.([]byte)}
if types.MatchGeo(newValue, arg.srcFn.geoQuery) {
filtered.Uids = append(filtered.Uids, uid)
break
}
go func(idx, start, end int) {
errCh <- filter(idx, start, end)
}(i, start, end)
}
for i := 0; i < numGo; i++ {
if err := <-errCh; err != nil {
return err
}
}

final := &pb.List{}
for _, out := range filtered {
final.Uids = append(final.Uids, out.Uids...)
}
if span != nil && numGo > 1 {
span.Annotatef(nil, "Total uids after filtering geo: %d", len(final.Uids))
}
for i := 0; i < len(arg.out.UidMatrix); i++ {
algo.IntersectWith(arg.out.UidMatrix[i], filtered, arg.out.UidMatrix[i])
algo.IntersectWith(arg.out.UidMatrix[i], final, arg.out.UidMatrix[i])
}
return nil
}
Expand Down

0 comments on commit 77f024a

Please sign in to comment.