Skip to content

Commit

Permalink
Starting to code some of the path processors
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Oct 8, 2019
1 parent 9547dd2 commit 59f40c5
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 34 deletions.
19 changes: 0 additions & 19 deletions grids/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package grids

import (
"fmt"
"context"
"github.com/bmeg/grip/engine/core"
"github.com/bmeg/grip/engine/inspect"
"github.com/bmeg/grip/gdbi"
Expand Down Expand Up @@ -55,21 +54,3 @@ func (comp GridsCompiler) Compile(stmts []*gripql.GraphStatement) (gdbi.Pipeline
}
return core.NewPipeline(procs, ps), nil
}

type PathStatement struct {

}

func (path *PathStatement) GetProcessor(db gdbi.GraphInterface, ps *gdbi.PipelineState) (gdbi.Processor, error) {
out := PathProcessor{}

return &out, nil
}

type PathProcessor struct {
}

func (pp *PathProcessor) Process(ctx context.Context, man gdbi.Manager, in gdbi.InPipe, out gdbi.OutPipe) context.Context {
//TODO: need actual code here
return ctx
}
90 changes: 75 additions & 15 deletions grids/graph_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,43 @@ type RawPathProcessor struct {

}

type PathTraveler struct {
current *RawDataElement
traveler *gdbi.Traveler
}

type RawDataElement struct {
Gid uint64
To uint64
From uint64
Label uint64
}

// ElementLookup request to look up data
type RawElementLookup struct {
ID uint64
Ref interface{}
Data *RawDataElement
}

// AddCurrent creates a new copy of the travel with new 'current' value
func (t *PathTraveler) AddCurrent(r *RawDataElement) *PathTraveler {
o := t.traveler.AddCurrent(nil)
a := PathTraveler{ current: r, traveler: o}
return &a
}


type RawProcessor interface {
Process(ctx context.Context, in chan *PathTraveler, out chan *PathTraveler) context.Context
}

type RawPipeline []*RawProcessor

func RawPathCompile(stmts []*gripql.GraphStatement) gdbi.Processor {

//s := RawPipeline{}

for _, s := range stmts {
switch s.GetStatement().(type) {
case *gripql.GraphStatement_V:
Expand All @@ -28,7 +63,6 @@ func RawPathCompile(stmts []*gripql.GraphStatement) gdbi.Processor {
fmt.Printf("Unknown command: %T\n", s.GetStatement())
}
}

return &RawPathProcessor{}
}

Expand All @@ -37,22 +71,48 @@ func (pc *RawPathProcessor) Process(ctx context.Context, man gdbi.Manager, in gd
return ctx
}

type PathVProc struct {
db *GridsGraph
}

func (r *PathVProc) Process(ctx context.Context, in chan *PathTraveler, out chan *PathTraveler) context.Context {
for elem := range r.db.RawGetVertexList(ctx) {
out <- &PathTraveler{ current: elem }
}
return ctx
}

type RawDataElement struct {
Gid uint64
To uint64
From uint64
Label uint64
type PathOutProc struct {
db *GridsGraph
labels []string
}

// ElementLookup request to look up data
type RawElementLookup struct {
ID uint64
Ref interface{}
Data *RawDataElement
func (r *PathOutProc) Process(ctx context.Context, in chan *PathTraveler, out chan *PathTraveler) context.Context {
queryChan := make(chan *RawElementLookup, 100)
go func() {
defer close(queryChan)
for i := range in {
queryChan <- &RawElementLookup{
ID: i.current.Gid,
Ref: i,
}
}
}()
go func() {
defer close(out)
for ov := range r.db.RawGetOutChannel(queryChan, r.labels) {
i := ov.Ref.(*PathTraveler)
out <- i.AddCurrent(&RawDataElement{
Gid: ov.Data.Gid,
Label: ov.Data.Label,
})
}
}()
return ctx
}



func (rd *RawDataElement) VertexDataElement(ggraph *GridsGraph) *gdbi.DataElement {
Gid := ggraph.kdb.keyMap.GetVertexID(rd.Gid)
Label := ggraph.kdb.keyMap.GetLabelID(rd.Label)
Expand Down Expand Up @@ -120,7 +180,7 @@ func (ggraph *GridsGraph) RawGetEdgeList(ctx context.Context) <-chan *RawDataEle
return o
}

func (ggraph *GridsGraph) RawGetOutChannel(reqChan chan *RawElementLookup, load bool, edgeLabels []string) chan *RawElementLookup {
func (ggraph *GridsGraph) RawGetOutChannel(reqChan chan *RawElementLookup, edgeLabels []string) chan *RawElementLookup {
o := make(chan *RawElementLookup, 100)
edgeLabelKeys := make([]uint64, 0, len(edgeLabels))
for i := range edgeLabels {
Expand Down Expand Up @@ -156,7 +216,7 @@ func (ggraph *GridsGraph) RawGetOutChannel(reqChan chan *RawElementLookup, load
return o
}

func (ggraph *GridsGraph) RawGetInChannel(reqChan chan *RawElementLookup, load bool, edgeLabels []string) chan *RawElementLookup {
func (ggraph *GridsGraph) RawGetInChannel(reqChan chan *RawElementLookup, edgeLabels []string) chan *RawElementLookup {
o := make(chan *RawElementLookup, 100)
edgeLabelKeys := make([]uint64, 0, len(edgeLabels))
for i := range edgeLabels {
Expand Down Expand Up @@ -192,7 +252,7 @@ func (ggraph *GridsGraph) RawGetInChannel(reqChan chan *RawElementLookup, load b
return o
}

func (ggraph *GridsGraph) RawGetOutEdgeChannel(reqChan chan *RawElementLookup, load bool, edgeLabels []string) chan *RawElementLookup {
func (ggraph *GridsGraph) RawGetOutEdgeChannel(reqChan chan *RawElementLookup, edgeLabels []string) chan *RawElementLookup {
o := make(chan *RawElementLookup, 100)
edgeLabelKeys := make([]uint64, 0, len(edgeLabels))
for i := range edgeLabels {
Expand Down Expand Up @@ -229,7 +289,7 @@ func (ggraph *GridsGraph) RawGetOutEdgeChannel(reqChan chan *RawElementLookup, l
return o
}

func (ggraph *GridsGraph) RawGetInEdgeChannel(reqChan chan *RawElementLookup, load bool, edgeLabels []string) chan *RawElementLookup {
func (ggraph *GridsGraph) RawGetInEdgeChannel(reqChan chan *RawElementLookup, edgeLabels []string) chan *RawElementLookup {
o := make(chan *RawElementLookup, 100)
edgeLabelKeys := make([]uint64, 0, len(edgeLabels))
for i := range edgeLabels {
Expand Down

0 comments on commit 59f40c5

Please sign in to comment.