Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KV BulkWrite #223

Merged
merged 45 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6b5e2af
Changing KV index code so that it doesn't do a 'set(get+1)' increment…
kellrott Jul 13, 2019
0b934bf
Adding bulk write interface to KV interface. It only does something d…
kellrott Jul 16, 2019
db0f614
Fixing bug in the aggrigation processor.
kellrott Jul 16, 2019
639abcf
Removing RocksDB from driver list and testing
kellrott Jul 16, 2019
920e365
Adding rate counter to KVLoading logging
kellrott Jul 19, 2019
55b75f6
Adding a BulkAdd method to all of the drivers. Reworking the server A…
kellrott Jul 19, 2019
3079c11
Merge remote-tracking branch 'origin/master' into kvindex-bulkwrite
kellrott Jul 19, 2019
7b062a6
Making kvgraph bulk add use the bulk transaction
kellrott Jul 19, 2019
b6e00c2
Removing redundant code
kellrott Jul 19, 2019
8fa93a8
Fixing lint issue
kellrott Jul 19, 2019
b7a8598
Removing redundant code
kellrott Jul 22, 2019
cd0ed62
Having internal bulkAdd call errors get copied if if they occur
kellrott Jul 23, 2019
c9b3e09
Addressing issues raised in PR
kellrott Sep 11, 2019
e5241d0
Merge remote-tracking branch 'origin/master' into kvindex-bulkwrite
kellrott Sep 11, 2019
c49018b
Adding some tests related to StreamBatch method
kellrott Sep 11, 2019
02cb246
Adding graph name validation in SteamBatch
kellrott Sep 11, 2019
fff653f
Updating bulk upload behavior and unit tests
kellrott Sep 11, 2019
13b748c
Cleaning linting issues
kellrott Sep 11, 2019
0b358ef
Updating badger link
kellrott Sep 11, 2019
755bb93
Fixing lint issue
kellrott Sep 11, 2019
86a7bd5
Fixing flake issue
kellrott Sep 11, 2019
ff79aa3
Fixing offline KV bulk command line loader to use new bulk loading API
kellrott Sep 12, 2019
cb30d36
Adding some error logging
kellrott Sep 13, 2019
6a6b38f
Adding logging edge insert rates
kellrott Sep 13, 2019
1a05b77
more error handling for BulkAdd
adamstruck Sep 17, 2019
d0440f1
cmd/kvload: cleanup
adamstruck Sep 24, 2019
02c2fb7
pass logrus logger to badger
adamstruck Sep 24, 2019
c353877
cleanup badger opts
adamstruck Sep 24, 2019
3bc6496
kvgraph: fix error handling in BulkAdd
adamstruck Sep 24, 2019
891e204
wip
adamstruck Sep 26, 2019
43b0361
create grip logger package, allows passing configured logger to subpr…
adamstruck Oct 1, 2019
0ff4dab
set logger to debug for kvload
adamstruck Oct 1, 2019
390b6b1
add a log sub method
adamstruck Oct 1, 2019
b037525
log: fix log level in methods
adamstruck Oct 1, 2019
f958057
updated file stream methods
adamstruck Nov 22, 2019
944711e
increase buffer size used in StreamFile
adamstruck Nov 22, 2019
d0cac8b
debug wip
adamstruck Nov 22, 2019
c1a64fc
Merge remote-tracking branch 'origin/master' into kvindex-bulkwrite-as
adamstruck Dec 3, 2019
773dd0e
Merge branch 'kvindex-bulkwrite-as' of github.com:bmeg/grip into kvin…
adamstruck Dec 3, 2019
0e6805e
fix broken tests
adamstruck Dec 4, 2019
27765ed
server: fix BulkAdd
adamstruck Dec 5, 2019
b0f2008
added global --pprof flag to enable profiling
adamstruck Dec 6, 2019
6815731
addressing PR comments
adamstruck Dec 6, 2019
c412c8d
lint
adamstruck Dec 6, 2019
4cc7482
use badger v2.0.0
adamstruck Dec 9, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 67 additions & 73 deletions cmd/kvload/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,28 @@ package kvload

import (
"fmt"
"strings"
"sync"
"time"

"github.com/bmeg/golib"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util"
log "github.com/sirupsen/logrus"
"github.com/paulbellamy/ratecounter"
"github.com/sirupsen/logrus"
adamstruck marked this conversation as resolved.
Show resolved Hide resolved
"github.com/spf13/cobra"
)

var dbPath = "grip.db"
var kvDriver = "badger"

var graph string
var vertexFile string
var edgeFile string
var vertexManifestFile string
var edgeManifestFile string

var batchSize = 1000

func found(set []string, val string) bool {
for _, i := range set {
if i == val {
return true
}
}
return false
}

// Cmd is the declaration of the command line
var Cmd = &cobra.Command{
Use: "kvload <graph>",
Expand All @@ -45,6 +37,8 @@ var Cmd = &cobra.Command{

graph = args[0]

log.GetLogger().SetLevel(logrus.DebugLevel)

// Create the graph if it doesn't already exist.
// Creating the graph also results in the creation of indices
// for the edge/vertex collections.
Expand All @@ -53,8 +47,14 @@ var Cmd = &cobra.Command{
return err
}
db := kvgraph.NewKVGraph(kv)
defer db.Close()

db.AddGraph(graph)
err = db.AddGraph(graph)
if err != nil {
if strings.Contains(err.Error(), "invalid graph name") {
return err
}
}
kgraph, err := db.Graph(graph)
if err != nil {
return err
Expand All @@ -64,17 +64,24 @@ var Cmd = &cobra.Command{
edgeFileArray := []string{}

if vertexManifestFile != "" {
reader, err := golib.ReadFileLines(vertexManifestFile)
if err == nil {
for line := range reader {
reader, err := util.StreamLines(vertexManifestFile, 10)
if err != nil {
return err
}
for line := range reader {
if line != "" {
vertexFileArray = append(vertexFileArray, string(line))
}
}
}

if edgeManifestFile != "" {
reader, err := golib.ReadFileLines(edgeManifestFile)
if err == nil {
for line := range reader {
reader, err := util.StreamLines(edgeManifestFile, 10)
if err != nil {
return err
}
for line := range reader {
if line != "" {
edgeFileArray = append(edgeFileArray, string(line))
}
}
Expand All @@ -87,70 +94,58 @@ var Cmd = &cobra.Command{
edgeFileArray = append(edgeFileArray, edgeFile)
}

graphChan := make(chan *gripql.GraphElement, 10)
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
if err := kgraph.BulkAdd(graphChan); err != nil {
log.Errorf("BulkdAdd: %v", err)
}
wg.Done()
}()

vertexCounter := ratecounter.NewRateCounter(10 * time.Second)
for _, vertexFile := range vertexFileArray {
log.Infof("Loading %s", vertexFile)
count := 0
vertexChan := make(chan []*gripql.Vertex, 100)
vertexBatch := make([]*gripql.Vertex, 0, batchSize)
go func() {
for v := range util.StreamVerticesFromFile(vertexFile) {
vertexBatch = append(vertexBatch, v)
if len(vertexBatch) >= batchSize {
vertexChan <- vertexBatch
vertexBatch = make([]*gripql.Vertex, 0, batchSize)
}
count++
if count%10000 == 0 {
log.Infof("Loaded %d vertices", count)
}
}
if len(vertexBatch) > 0 {
vertexChan <- vertexBatch
}
log.Infof("Loaded %d vertices", count)
close(vertexChan)
}()

for batch := range vertexChan {
//serialize and store vertex
if err := kgraph.AddVertex(batch); err != nil {
log.Errorf("%s", err)
vertChan, err := util.StreamVerticesFromFile(vertexFile)
if err != nil {
log.WithFields(log.Fields{"error": err}).Errorf("Error reading file: %s", vertexFile)
continue
}
for v := range vertChan {
graphChan <- &gripql.GraphElement{Graph: graph, Vertex: v}
count++
vertexCounter.Incr(1)
if count%10000 == 0 {
log.Infof("Loaded %d vertices (%d/sec)", count, vertexCounter.Rate()/10)
}
}
log.Infof("Loaded %d vertices (%d/sec)", count, vertexCounter.Rate()/10)
}

edgeCounter := ratecounter.NewRateCounter(10 * time.Second)
for _, edgeFile := range edgeFileArray {
log.Printf("Loading %s", edgeFile)
log.Infof("Loading %s", edgeFile)
count := 0
edgeChan := make(chan []*gripql.Edge, 100)
edgeBatch := make([]*gripql.Edge, 0, batchSize)
go func() {
for e := range util.StreamEdgesFromFile(edgeFile) {
edgeBatch = append(edgeBatch, e)
if len(edgeBatch) >= batchSize {
edgeChan <- edgeBatch
edgeBatch = make([]*gripql.Edge, 0, batchSize)
}
count++
if count%10000 == 0 {
log.Infof("Loaded %d edges", count)
}
}
if len(edgeBatch) > 0 {
edgeChan <- edgeBatch
}
log.Infof("Loaded %d edges", count)
close(edgeChan)
}()
for batch := range edgeChan {
//serialize and store vertex
if err := kgraph.AddEdge(batch); err != nil {
log.Errorf("%s", err)
edgeChan, err := util.StreamEdgesFromFile(edgeFile)
if err != nil {
log.WithFields(log.Fields{"error": err}).Errorf("Error reading file: %s", edgeFile)
continue
}
for e := range edgeChan {
graphChan <- &gripql.GraphElement{Graph: graph, Edge: e}
count++
edgeCounter.Incr(1)
if count%10000 == 0 {
log.Infof("Loaded %d edges (%d/sec)", count, edgeCounter.Rate()/10)
}
}
log.Infof("Loaded %d edges (%d/sec)", count, edgeCounter.Rate()/10)
}

db.Close()
close(graphChan)
wg.Wait()
return nil
},
}
Expand All @@ -163,5 +158,4 @@ func init() {
flags.StringVar(&edgeFile, "edge", "", "edge file")
flags.StringVar(&vertexManifestFile, "vertex-manifest", "", "vertex manifest file")
flags.StringVar(&edgeManifestFile, "edge-manifest", "", "edge manifest file")
flags.IntVar(&batchSize, "batch-size", batchSize, "bulk load batch size")
}
2 changes: 1 addition & 1 deletion cmd/load/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/gripql/example"

"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand Down
16 changes: 12 additions & 4 deletions cmd/load/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (

"github.com/bmeg/grip/cmd/load/example"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util"
"github.com/bmeg/grip/util/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -61,15 +61,19 @@ var Cmd = &cobra.Command{
wait := make(chan bool)
go func() {
if err := conn.BulkAdd(elemChan); err != nil {
log.Printf("bulk add error: %v", err)
log.Errorf("bulk add error: %v", err)
}
wait <- false
}()

if vertexFile != "" {
log.Infof("Loading vertex file: %s", vertexFile)
count := 0
for v := range util.StreamVerticesFromFile(vertexFile) {
vertChan, err := util.StreamVerticesFromFile(vertexFile)
if err != nil {
return err
}
for v := range vertChan {
count++
if count%1000 == 0 {
log.Infof("Loaded %d vertices", count)
Expand All @@ -82,7 +86,11 @@ var Cmd = &cobra.Command{
if edgeFile != "" {
log.Infof("Loading edge file: %s", edgeFile)
count := 0
for e := range util.StreamEdgesFromFile(edgeFile) {
edgeChan, err := util.StreamEdgesFromFile(edgeFile)
if err != nil {
return err
}
for e := range edgeChan {
count++
if count%1000 == 0 {
log.Infof("Loaded %d edges", count)
Expand Down
Loading