Skip to content

Commit

Permalink
Merge branch 'kvindex-bulkwrite' into grids-bulkload
Browse files Browse the repository at this point in the history
  • Loading branch information
kellrott committed Sep 19, 2019
2 parents 616978c + cd0ed62 commit 62071da
Show file tree
Hide file tree
Showing 14 changed files with 445 additions and 241 deletions.
6 changes: 5 additions & 1 deletion cmd/kvload/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package kvload

import (
"fmt"
"time"

"github.com/bmeg/golib"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/util"
"github.com/paulbellamy/ratecounter"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -87,6 +89,7 @@ var Cmd = &cobra.Command{
edgeFileArray = append(edgeFileArray, edgeFile)
}

vertexCounter := ratecounter.NewRateCounter(10 * time.Second)
for _, vertexFile := range vertexFileArray {
log.Infof("Loading %s", vertexFile)
count := 0
Expand All @@ -100,8 +103,9 @@ var Cmd = &cobra.Command{
vertexBatch = make([]*gripql.Vertex, 0, batchSize)
}
count++
vertexCounter.Incr(1)
if count%10000 == 0 {
log.Infof("Loaded %d vertices", count)
log.Infof("Loaded %d vertices (%d/sec)", count, vertexCounter.Rate()/10)
}
}
if len(vertexBatch) > 0 {
Expand Down
13 changes: 9 additions & 4 deletions elastic/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/timestamp"
"github.com/bmeg/grip/util"
"github.com/golang/protobuf/jsonpb"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,9 +61,9 @@ func (es *Graph) AddEdge(edges []*gripql.Edge) error {
}
pe := PackEdge(e)
script := elastic.NewScript(`ctx._source.gid = params.gid;
ctx._source.label = params.label;
ctx._source.from = params.from;
ctx._source.to = params.to;
ctx._source.label = params.label;
ctx._source.from = params.from;
ctx._source.to = params.to;
ctx._source.data = params.data;`).Params(pe)
req := elastic.NewBulkUpdateRequest().
Index(es.edgeIndex).
Expand Down Expand Up @@ -95,7 +96,7 @@ func (es *Graph) AddVertex(vertices []*gripql.Vertex) error {
}
pv := PackVertex(v)
script := elastic.NewScript(`ctx._source.gid = params.gid;
ctx._source.label = params.label;
ctx._source.label = params.label;
ctx._source.data = params.data;`).Params(pv)
req := elastic.NewBulkUpdateRequest().
Index(es.vertexIndex).
Expand All @@ -113,6 +114,10 @@ func (es *Graph) AddVertex(vertices []*gripql.Vertex) error {
return nil
}

func (es *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error {
return util.SteamBatch(stream, es.AddVertex, es.AddEdge)
}

// DelEdge deletes edge `eid`
func (es *Graph) DelEdge(eid string) error {
ctx := context.Background()
Expand Down
4 changes: 4 additions & 0 deletions existing-sql/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (g *Graph) AddEdge(edges []*gripql.Edge) error {
return errors.New("not implemented")
}

func (g *Graph) BulkAdd(stream <-chan *gripql.GraphElement) error {
return errors.New("not implemented")
}

// DelVertex is not implemented in the SQL driver
func (g *Graph) DelVertex(key string) error {
return errors.New("not implemented")
Expand Down
2 changes: 2 additions & 0 deletions gdbi/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type GraphInterface interface {
AddVertex(vertex []*gripql.Vertex) error
AddEdge(edge []*gripql.Edge) error

BulkAdd(<-chan *gripql.GraphElement) error

DelVertex(key string) error
DelEdge(key string) error

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/lib/pq v1.1.0
github.com/logrusorgru/aurora v0.0.0-20190428105938-cea283e61946
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/paulbellamy/ratecounter v0.2.0
github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d
github.com/segmentio/ksuid v1.0.2
github.com/sirupsen/logrus v1.4.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pelletier/go-toml v1.1.0 h1:cmiOvKzEunMsAxyhXSzpL5Q1CRKpVv0KQsnAIcSEVYM=
github.com/pelletier/go-toml v1.1.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
198 changes: 142 additions & 56 deletions grids/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package grids
import (
"bytes"
"context"
"sync"
"fmt"

"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvindex"
"github.com/bmeg/grip/kvi"
proto "github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -44,35 +46,88 @@ type kvAddData struct {
doc map[string]interface{}
}

func insertVertex(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, vertex *gripql.Vertex) error {
value, err := proto.Marshal(vertex)
if err != nil {
return err
}
vertexKey := keyMap.GetVertexKey(vertex.Gid)
key := VertexKey(graphKey, vertexKey)
if err != nil {
return err
}
if err := tx.Set(key, value); err != nil {
return fmt.Errorf("AddVertex Error %s", err)
}
return nil
}

func indexVertex(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, vertex *gripql.Vertex) error {
doc := map[string]interface{}{graph: vertexIdxStruct(vertex)}
if err := idx.AddDocTx(tx, vertex.Gid, doc); err != nil {
return fmt.Errorf("AddVertex Error %s", err)
}
return nil
}

func insertEdge(tx kvi.KVBulkWrite, keyMap *KeyMap, graphKey uint64, edge *gripql.Edge) error {
var err error
var data []byte

data, err = proto.Marshal(edge)
if err != nil {
return err
}

label := keyMap.GetLabelKey(edge.Label)
eid := keyMap.GetEdgeKey(edge.Gid) //TODO: fill in black key?
src := keyMap.GetVertexKey(edge.From)
dst := keyMap.GetVertexKey(edge.To)

ekey := EdgeKey(graphKey, eid, src, dst, label)
skey := SrcEdgeKey(graphKey, eid, src, dst, label)
dkey := DstEdgeKey(graphKey, eid, src, dst, label)

err = tx.Set(ekey, data)
if err != nil {
return err
}
err = tx.Set(skey, []byte{})
if err != nil {
return err
}
err = tx.Set(dkey, []byte{})
if err != nil {
return err
}
return nil
}

func indexEdge(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, edge *gripql.Edge) error {
err := idx.AddDocTx(tx, edge.Gid, map[string]interface{}{graph: edgeIdxStruct(edge)})
return err
}

// AddVertex adds an edge to the graph, if it already exists
// in the graph, it is replaced
func (ggraph *GridsGraph) AddVertex(vertices []*gripql.Vertex) error {
dataChan := make(chan *kvAddData, 100)
go func() {
for _, vertex := range vertices {
d, err := proto.Marshal(vertex)
vertexKey := ggraph.kdb.keyMap.GetVertexKey(vertex.Gid)
k := VertexKey(ggraph.graphKey, vertexKey)
if err == nil {
doc := map[string]interface{}{ggraph.graphID: vertexIdxStruct(vertex)}
dataChan <- &kvAddData{key: k, value: d, vertex: vertex, doc: doc}
}
}
close(dataChan)
}()

//TODO: split index out to other transation
err := ggraph.kdb.graphkv.BulkWrite(func(tx kvi.KVBulkWrite) error {
var anyErr error
for kv := range dataChan {
if err := tx.Set(kv.key, kv.value); err != nil {
for _, vert := range vertices {
if err := insertVertex(tx, ggraph.kdb.keyMap, ggraph.graphKey, vert); err != nil {
anyErr = err
log.Errorf("AddVertex Error %s", err)
} else {
if err := ggraph.kdb.idx.AddDocTx(tx, kv.vertex.Gid, kv.doc); err != nil {
anyErr = err
log.Errorf("AddVertex Error %s", err)
}
}
}
ggraph.kdb.ts.Touch(ggraph.graphID)
return anyErr
})
err = ggraph.kdb.indexkv.BulkWrite(func(tx kvi.KVBulkWrite) error {
var anyErr error
for _, vert := range vertices {
if err := indexVertex(tx, ggraph.kdb.idx, ggraph.graphID, vert); err != nil {
anyErr = err
log.Errorf("IndexVertex Error %s", err)
}
}
ggraph.kdb.ts.Touch(ggraph.graphID)
Expand All @@ -86,47 +141,78 @@ func (ggraph *GridsGraph) AddVertex(vertices []*gripql.Vertex) error {
func (ggraph *GridsGraph) AddEdge(edges []*gripql.Edge) error {
err := ggraph.kdb.graphkv.BulkWrite(func(tx kvi.KVBulkWrite) error {
for _, edge := range edges {
var err error
var data []byte

data, err = proto.Marshal(edge)
if err != nil {
return err
}

label := ggraph.kdb.keyMap.GetLabelKey(edge.Label)
eid := ggraph.kdb.keyMap.GetEdgeKey(edge.Gid) //TODO: fill in black key?
src := ggraph.kdb.keyMap.GetVertexKey(edge.From)
dst := ggraph.kdb.keyMap.GetVertexKey(edge.To)

ekey := EdgeKey(ggraph.graphKey, eid, src, dst, label)
skey := SrcEdgeKey(ggraph.graphKey, eid, src, dst, label)
dkey := DstEdgeKey(ggraph.graphKey, eid, src, dst, label)

err = tx.Set(ekey, data)
if err != nil {
return err
}
err = tx.Set(skey, []byte{})
if err != nil {
return err
}
err = tx.Set(dkey, []byte{})
if err != nil {
return err
}
//TODO: change this to a different TX
err = ggraph.kdb.idx.AddDocTx(tx, edge.Gid, map[string]interface{}{ggraph.graphID: edgeIdxStruct(edge)})
if err != nil {
return err
}
insertEdge(tx, ggraph.kdb.keyMap, ggraph.graphKey, edge)
}
ggraph.kdb.ts.Touch(ggraph.graphID)
return nil
})
err = ggraph.kdb.indexkv.BulkWrite(func(tx kvi.KVBulkWrite) error {
for _, edge := range edges {
indexEdge(tx, ggraph.kdb.idx, ggraph.graphID, edge)
}
ggraph.kdb.ts.Touch(ggraph.graphID)
return nil
})
return err

}


func (ggraph *GridsGraph) BulkAdd(stream <-chan *gripql.GraphElement) error {
var anyErr error
insertStream := make(chan *gripql.GraphElement, 100)
indexStream := make(chan *gripql.GraphElement, 100)
s := &sync.WaitGroup{}
s.Add(2)
go func () {
ggraph.kdb.graphkv.BulkWrite(func(tx kvi.KVBulkWrite) error {
for elem := range insertStream {
if elem.Vertex != nil {
if err := insertVertex(tx, ggraph.kdb.keyMap, ggraph.graphKey, elem.Vertex); err != nil {
anyErr = err
}
}
if elem.Edge != nil {
if err := insertEdge(tx, ggraph.kdb.keyMap, ggraph.graphKey, elem.Edge); err != nil {
anyErr = err
}
}
}
s.Done()
return anyErr
})
}()

go func () {
ggraph.kdb.indexkv.BulkWrite(func(tx kvi.KVBulkWrite) error {
for elem := range indexStream {
if elem.Vertex != nil {
if err := indexVertex(tx, ggraph.kdb.idx, ggraph.graphID, elem.Vertex); err != nil {
anyErr = err
}
}
if elem.Edge != nil {
if err := indexEdge(tx, ggraph.kdb.idx, ggraph.graphID, elem.Edge); err != nil {
anyErr = err
}
}
}
s.Done()
return anyErr
})
}()

for i := range stream {
insertStream <- i
indexStream <- i
}
close(insertStream)
close(indexStream)
s.Wait()
return anyErr
}


// DelEdge deletes edge with id `key`
func (ggraph *GridsGraph) DelEdge(eid string) error {
edgeKey := ggraph.kdb.keyMap.GetEdgeKey(eid)
Expand Down
Loading

0 comments on commit 62071da

Please sign in to comment.