Skip to content

Commit

Permalink
Merge pull request #223 from bmeg/kvindex-bulkwrite-as
Browse files Browse the repository at this point in the history
KV BulkWrite
  • Loading branch information
kellrott authored Dec 9, 2019
2 parents 63f9631 + 4cc7482 commit b2d89a9
Show file tree
Hide file tree
Showing 56 changed files with 3,516 additions and 2,242 deletions.
139 changes: 66 additions & 73 deletions cmd/kvload/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,27 @@ package kvload

import (
"fmt"
"strings"
"sync"
"time"

"github.com/bmeg/golib"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util"
log "github.com/sirupsen/logrus"
"github.com/paulbellamy/ratecounter"
"github.com/spf13/cobra"
)

var dbPath = "grip.db"
var kvDriver = "badger"

var graph string
var vertexFile string
var edgeFile string
var vertexManifestFile string
var edgeManifestFile string

var batchSize = 1000

func found(set []string, val string) bool {
for _, i := range set {
if i == val {
return true
}
}
return false
}

// Cmd is the declaration of the command line
var Cmd = &cobra.Command{
Use: "kvload <graph>",
Expand All @@ -45,6 +36,8 @@ var Cmd = &cobra.Command{

graph = args[0]

log.GetLogger().SetLevel(log.DebugLevel)

// Create the graph if it doesn't already exist.
// Creating the graph also results in the creation of indices
// for the edge/vertex collections.
Expand All @@ -53,8 +46,14 @@ var Cmd = &cobra.Command{
return err
}
db := kvgraph.NewKVGraph(kv)
defer db.Close()

db.AddGraph(graph)
err = db.AddGraph(graph)
if err != nil {
if strings.Contains(err.Error(), "invalid graph name") {
return err
}
}
kgraph, err := db.Graph(graph)
if err != nil {
return err
Expand All @@ -64,17 +63,24 @@ var Cmd = &cobra.Command{
edgeFileArray := []string{}

if vertexManifestFile != "" {
reader, err := golib.ReadFileLines(vertexManifestFile)
if err == nil {
for line := range reader {
reader, err := util.StreamLines(vertexManifestFile, 10)
if err != nil {
return err
}
for line := range reader {
if line != "" {
vertexFileArray = append(vertexFileArray, string(line))
}
}
}

if edgeManifestFile != "" {
reader, err := golib.ReadFileLines(edgeManifestFile)
if err == nil {
for line := range reader {
reader, err := util.StreamLines(edgeManifestFile, 10)
if err != nil {
return err
}
for line := range reader {
if line != "" {
edgeFileArray = append(edgeFileArray, string(line))
}
}
Expand All @@ -87,70 +93,58 @@ var Cmd = &cobra.Command{
edgeFileArray = append(edgeFileArray, edgeFile)
}

graphChan := make(chan *gripql.GraphElement, 10)
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
if err := kgraph.BulkAdd(graphChan); err != nil {
log.Errorf("BulkdAdd: %v", err)
}
wg.Done()
}()

vertexCounter := ratecounter.NewRateCounter(10 * time.Second)
for _, vertexFile := range vertexFileArray {
log.Infof("Loading %s", vertexFile)
count := 0
vertexChan := make(chan []*gripql.Vertex, 100)
vertexBatch := make([]*gripql.Vertex, 0, batchSize)
go func() {
for v := range util.StreamVerticesFromFile(vertexFile) {
vertexBatch = append(vertexBatch, v)
if len(vertexBatch) >= batchSize {
vertexChan <- vertexBatch
vertexBatch = make([]*gripql.Vertex, 0, batchSize)
}
count++
if count%10000 == 0 {
log.Infof("Loaded %d vertices", count)
}
}
if len(vertexBatch) > 0 {
vertexChan <- vertexBatch
}
log.Infof("Loaded %d vertices", count)
close(vertexChan)
}()

for batch := range vertexChan {
//serialize and store vertex
if err := kgraph.AddVertex(batch); err != nil {
log.Errorf("%s", err)
vertChan, err := util.StreamVerticesFromFile(vertexFile)
if err != nil {
log.WithFields(log.Fields{"error": err}).Errorf("Error reading file: %s", vertexFile)
continue
}
for v := range vertChan {
graphChan <- &gripql.GraphElement{Graph: graph, Vertex: v}
count++
vertexCounter.Incr(1)
if count%10000 == 0 {
log.Infof("Loaded %d vertices (%d/sec)", count, vertexCounter.Rate()/10)
}
}
log.Infof("Loaded %d vertices (%d/sec)", count, vertexCounter.Rate()/10)
}

edgeCounter := ratecounter.NewRateCounter(10 * time.Second)
for _, edgeFile := range edgeFileArray {
log.Printf("Loading %s", edgeFile)
log.Infof("Loading %s", edgeFile)
count := 0
edgeChan := make(chan []*gripql.Edge, 100)
edgeBatch := make([]*gripql.Edge, 0, batchSize)
go func() {
for e := range util.StreamEdgesFromFile(edgeFile) {
edgeBatch = append(edgeBatch, e)
if len(edgeBatch) >= batchSize {
edgeChan <- edgeBatch
edgeBatch = make([]*gripql.Edge, 0, batchSize)
}
count++
if count%10000 == 0 {
log.Infof("Loaded %d edges", count)
}
}
if len(edgeBatch) > 0 {
edgeChan <- edgeBatch
}
log.Infof("Loaded %d edges", count)
close(edgeChan)
}()
for batch := range edgeChan {
//serialize and store vertex
if err := kgraph.AddEdge(batch); err != nil {
log.Errorf("%s", err)
edgeChan, err := util.StreamEdgesFromFile(edgeFile)
if err != nil {
log.WithFields(log.Fields{"error": err}).Errorf("Error reading file: %s", edgeFile)
continue
}
for e := range edgeChan {
graphChan <- &gripql.GraphElement{Graph: graph, Edge: e}
count++
edgeCounter.Incr(1)
if count%10000 == 0 {
log.Infof("Loaded %d edges (%d/sec)", count, edgeCounter.Rate()/10)
}
}
log.Infof("Loaded %d edges (%d/sec)", count, edgeCounter.Rate()/10)
}

db.Close()
close(graphChan)
wg.Wait()
return nil
},
}
Expand All @@ -163,5 +157,4 @@ func init() {
flags.StringVar(&edgeFile, "edge", "", "edge file")
flags.StringVar(&vertexManifestFile, "vertex-manifest", "", "vertex manifest file")
flags.StringVar(&edgeManifestFile, "edge-manifest", "", "edge manifest file")
flags.IntVar(&batchSize, "batch-size", batchSize, "bulk load batch size")
}
2 changes: 1 addition & 1 deletion cmd/load/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/gripql/example"

"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand Down
16 changes: 12 additions & 4 deletions cmd/load/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (

"github.com/bmeg/grip/cmd/load/example"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util"
"github.com/bmeg/grip/util/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -61,15 +61,19 @@ var Cmd = &cobra.Command{
wait := make(chan bool)
go func() {
if err := conn.BulkAdd(elemChan); err != nil {
log.Printf("bulk add error: %v", err)
log.Errorf("bulk add error: %v", err)
}
wait <- false
}()

if vertexFile != "" {
log.Infof("Loading vertex file: %s", vertexFile)
count := 0
for v := range util.StreamVerticesFromFile(vertexFile) {
vertChan, err := util.StreamVerticesFromFile(vertexFile)
if err != nil {
return err
}
for v := range vertChan {
count++
if count%1000 == 0 {
log.Infof("Loaded %d vertices", count)
Expand All @@ -82,7 +86,11 @@ var Cmd = &cobra.Command{
if edgeFile != "" {
log.Infof("Loading edge file: %s", edgeFile)
count := 0
for e := range util.StreamEdgesFromFile(edgeFile) {
edgeChan, err := util.StreamEdgesFromFile(edgeFile)
if err != nil {
return err
}
for e := range edgeChan {
count++
if count%1000 == 0 {
log.Infof("Loaded %d edges", count)
Expand Down
Loading

0 comments on commit b2d89a9

Please sign in to comment.