Skip to content

Commit

Permalink
Reworking the optimizer so that it processes gripql.GraphStatements r…
Browse files Browse the repository at this point in the history
…ather then the processors
  • Loading branch information
kellrott committed Sep 27, 2019
1 parent 99a7b95 commit 6b8c1f7
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 272 deletions.
151 changes: 17 additions & 134 deletions engine/core/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type DefaultPipeline struct {
markTypes map[string]gdbi.DataType
}

func NewPipeline(procs []gdbi.Processor, ps *PipelineState) *DefaultPipeline {
return &DefaultPipeline{procs, ps.LastType, ps.MarkTypes}
}

// DataType return the datatype
func (pipe *DefaultPipeline) DataType() gdbi.DataType {
return pipe.dataType
Expand Down Expand Up @@ -65,8 +69,8 @@ func (ps *PipelineState) StepLoadData() bool {
}

func NewPipelineState(stmts []*gripql.GraphStatement) *PipelineState {
steps := engine.PipelineSteps(stmts)
stepOut := engine.PipelineStepOutputs(stmts)
steps := inspect.PipelineSteps(stmts)
stepOut := inspect.PipelineStepOutputs(stmts)

return &PipelineState{
LastType: gdbi.NoData,
Expand All @@ -84,10 +88,13 @@ func (comp DefaultCompiler) Compile(stmts []*gripql.GraphStatement) (gdbi.Pipeli

stmts = Flatten(stmts)

if err := validate(stmts); err != nil {
if err := Validate(stmts); err != nil {
return &DefaultPipeline{}, fmt.Errorf("invalid statments: %s", err)
}

stmts = IndexStartOptimize(stmts)


ps := NewPipelineState(stmts)

procs := make([]gdbi.Processor, 0, len(stmts))
Expand All @@ -101,8 +108,6 @@ func (comp DefaultCompiler) Compile(stmts []*gripql.GraphStatement) (gdbi.Pipeli
procs = append(procs, p)
}

procs = indexStartOptimize(procs)

return &DefaultPipeline{procs, ps.LastType, ps.MarkTypes}, nil
}

Expand Down Expand Up @@ -302,143 +307,21 @@ func StatementProcessor(gs *gripql.GraphStatement, db gdbi.GraphInterface, ps *P
ps.LastType = gdbi.AggregationData
return &aggregate{stmt.Aggregate.Aggregations}, nil

//Custom graph statements
case *gripql.GraphStatement_LookupVertsIndex:
ps.LastType = gdbi.VertexData
return &LookupVertsIndex{db:db, labels:stmt.Labels, loadData:ps.StepLoadData()}, nil

default:
return nil, fmt.Errorf("unknown statement type")
}
}

// For V().Has(Eq("$.label", "Person")) and V().Has(Eq("$.gid", "1")) queries, streamline into a single index lookup
func indexStartOptimize(pipe []gdbi.Processor) []gdbi.Processor {
optimized := []gdbi.Processor{}

var lookupV *LookupVerts
hasIDIdx := []int{}
hasLabelIdx := []int{}
isDone := false
for i, step := range pipe {
if isDone {
break
}
if i == 0 {
if lv, ok := step.(*LookupVerts); ok {
lookupV = lv
} else {
break
}
continue
}
switch s := step.(type) {
case *HasID:
hasIDIdx = append(hasIDIdx, i)
case *HasLabel:
hasLabelIdx = append(hasLabelIdx, i)
case *Has:
if and := s.stmt.GetAnd(); and != nil {
stmts := and.GetExpressions()
newPipe := []gdbi.Processor{}
newPipe = append(newPipe, pipe[:i]...)
for _, stmt := range stmts {
newPipe = append(newPipe, &Has{stmt: stmt})
}
newPipe = append(newPipe, pipe[i+1:]...)
return indexStartOptimize(newPipe)
}
if cond := s.stmt.GetCondition(); cond != nil {
path := jsonpath.GetJSONPath(cond.Key)
switch path {
case "$.gid":
hasIDIdx = append(hasIDIdx, i)
case "$.label":
hasLabelIdx = append(hasLabelIdx, i)
default:
// do nothing
}
}
default:
isDone = true
}
}

idOpt := false
if len(hasIDIdx) > 0 {
ids := []string{}
idx := hasIDIdx[0]
if has, ok := pipe[idx].(*Has); ok {
ids = append(ids, extractHasVals(has)...)
}
if has, ok := pipe[idx].(*HasID); ok {
ids = append(ids, has.ids...)
}
if len(ids) > 0 {
idOpt = true
hIdx := &LookupVerts{ids: ids, db: lookupV.db}
optimized = append(optimized, hIdx)
}
}

labelOpt := false
if len(hasLabelIdx) > 0 && !idOpt {
labels := []string{}
idx := hasLabelIdx[0]
if has, ok := pipe[idx].(*Has); ok {
labels = append(labels, extractHasVals(has)...)
}
if has, ok := pipe[idx].(*HasLabel); ok {
labels = append(labels, has.labels...)
}
if len(labels) > 0 {
labelOpt = true
hIdx := &LookupVertsIndex{labels: labels, db: lookupV.db}
optimized = append(optimized, hIdx)
}
}

for i, step := range pipe {
if idOpt || labelOpt {
if i == 0 {
continue
}
} else {
optimized = append(optimized, step)
}
if idOpt {
if i != hasIDIdx[0] {
optimized = append(optimized, step)
}
}
if labelOpt {
if i != hasLabelIdx[0] {
optimized = append(optimized, step)
}
}
}

return optimized
}

func extractHasVals(h *Has) []string {
vals := []string{}
if cond := h.stmt.GetCondition(); cond != nil {
// path := jsonpath.GetJSONPath(cond.Key)
val := protoutil.UnWrapValue(cond.Value)
switch cond.Condition {
case gripql.Condition_EQ:
if l, ok := val.(string); ok {
vals = []string{l}
}
case gripql.Condition_WITHIN:
v := val.([]interface{})
for _, x := range v {
vals = append(vals, x.(string))
}
default:
// do nothing
}
}
return vals
}

func validate(stmts []*gripql.GraphStatement) error {
//Validate checks pipeline for chains of statements that won't work
func Validate(stmts []*gripql.GraphStatement) error {
for i, gs := range stmts {
// Validate that the first statement is V() or E()
if i == 0 {
Expand Down
143 changes: 143 additions & 0 deletions engine/core/optimize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@

package core

import (
"github.com/bmeg/grip/jsonpath"
"github.com/bmeg/grip/protoutil"
"github.com/bmeg/grip/gripql"
)

//IndexStartOptimize looks at processor pipeline for queries like
// V().Has(Eq("$.label", "Person")) and V().Has(Eq("$.gid", "1")),
// streamline into a single index lookup
func IndexStartOptimize(pipe []*gripql.GraphStatement) []*gripql.GraphStatement {
optimized := []*gripql.GraphStatement{}

//var lookupV *gripql.GraphStatement_V
hasIDIdx := []int{}
hasLabelIdx := []int{}
isDone := false
for i, step := range pipe {
if isDone {
break
}
if i == 0 {
if _, ok := step.GetStatement().(*gripql.GraphStatement_V); ok {
//lookupV = lv
} else {
break
}
continue
}
switch s := step.GetStatement().(type) {
case *gripql.GraphStatement_HasId:
hasIDIdx = append(hasIDIdx, i)
case *gripql.GraphStatement_HasLabel:
hasLabelIdx = append(hasLabelIdx, i)
case *gripql.GraphStatement_Has:
if and := s.Has.GetAnd(); and != nil {
stmts := and.GetExpressions()
newPipe := []*gripql.GraphStatement{}
newPipe = append(newPipe, pipe[:i]...)
for _, stmt := range stmts {
newPipe = append(newPipe, &gripql.GraphStatement{Statement:&gripql.GraphStatement_Has{Has: stmt}})
}
newPipe = append(newPipe, pipe[i+1:]...)
return IndexStartOptimize(newPipe)
}
if cond := s.Has.GetCondition(); cond != nil {
path := jsonpath.GetJSONPath(cond.Key)
switch path {
case "$.gid":
hasIDIdx = append(hasIDIdx, i)
case "$.label":
hasLabelIdx = append(hasLabelIdx, i)
default:
// do nothing
}
}
default:
isDone = true
}
}

idOpt := false
if len(hasIDIdx) > 0 {
ids := []string{}
idx := hasIDIdx[0]
if has, ok := pipe[idx].GetStatement().(*gripql.GraphStatement_Has); ok {
ids = append(ids, extractHasVals(has)...)
}
if has, ok := pipe[idx].GetStatement().(*gripql.GraphStatement_HasId); ok {
ids = append(ids, protoutil.AsStringList(has.HasId)...)
}
if len(ids) > 0 {
idOpt = true
hIdx := &gripql.GraphStatement_V{V: protoutil.AsListValue(ids)}
optimized = append(optimized, &gripql.GraphStatement{Statement:hIdx})
}
}

labelOpt := false
if len(hasLabelIdx) > 0 && !idOpt {
labels := []string{}
idx := hasLabelIdx[0]
if has, ok := pipe[idx].GetStatement().(*gripql.GraphStatement_Has); ok {
labels = append(labels, extractHasVals(has)...)
}
if has, ok := pipe[idx].GetStatement().(*gripql.GraphStatement_HasLabel); ok {
labels = append(labels, protoutil.AsStringList(has.HasLabel)...)
}
if len(labels) > 0 {
labelOpt = true
hIdx := &gripql.GraphStatement_LookupVertsIndex{Labels: labels}
optimized = append(optimized, &gripql.GraphStatement{hIdx})
}
}

for i, step := range pipe {
if idOpt || labelOpt {
if i == 0 {
continue
}
} else {
optimized = append(optimized, step)
}
if idOpt {
if i != hasIDIdx[0] {
optimized = append(optimized, step)
}
}
if labelOpt {
if i != hasLabelIdx[0] {
optimized = append(optimized, step)
}
}
}

return optimized
}



func extractHasVals(h *gripql.GraphStatement_Has) []string {
vals := []string{}
if cond := h.Has.GetCondition(); cond != nil {
// path := jsonpath.GetJSONPath(cond.Key)
val := protoutil.UnWrapValue(cond.Value)
switch cond.Condition {
case gripql.Condition_EQ:
if l, ok := val.(string); ok {
vals = []string{l}
}
case gripql.Condition_WITHIN:
v := val.([]interface{})
for _, x := range v {
vals = append(vals, x.(string))
}
default:
// do nothing
}
}
return vals
}
Loading

0 comments on commit 6b8c1f7

Please sign in to comment.