Skip to content

Commit

Permalink
Merge pull request #150 from bmeg/kv-dev-2
Browse files Browse the repository at this point in the history
KV Bulk Loading Improvements
  • Loading branch information
adamstruck authored Oct 5, 2018
2 parents 484f367 + f64a3ba commit a73e197
Show file tree
Hide file tree
Showing 20 changed files with 235 additions and 51 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ depends:
with-rocksdb: depends
@go install -tags 'rocksdb' -ldflags '$(VERSION_LDFLAGS)' .

local-rocksdb: rocksdb-lib
@CGO_LDFLAGS="-L$(shell pwd)/rocksdb-lib" CGO_CFLAGS="-I$(shell pwd)/rocksdb-lib/include/" go install -tags 'rocksdb' -ldflags '$(VERSION_LDFLAGS)' .

rocksdb-lib:
@git clone https://github.com/facebook/rocksdb.git rocksdb-lib
@pushd rocksdb-lib && make static_lib && popd

# --------------------------
# Complile Protobuf Schemas
# --------------------------
Expand Down
5 changes: 3 additions & 2 deletions benchmark/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"fmt"
"testing"

"github.com/bmeg/grip/badgerdb"
"github.com/bmeg/grip/engine"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/kvi/badgerdb"
)

// Dead simple baseline tests: get all vertices from a memory-backed graph.
func BenchmarkBaselineV(b *testing.B) {
kv, _ := badgerdb.NewKVInterface("test-badger.db")
kv, _ := badgerdb.NewKVInterface("test-badger.db", kvi.Options{})
db, err := kvgraph.NewKVGraph(kv).Graph("test-graph")
if err != nil {
b.Fatal(err)
Expand Down
7 changes: 4 additions & 3 deletions benchmark/insertion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (

"github.com/bmeg/grip/gripql"
//"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/badgerdb"
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/kvi/badgerdb"
)

var idRunes = []rune("abcdefghijklmnopqrstuvwxyz")
Expand All @@ -22,7 +23,7 @@ func randID() string {
}

func BenchmarkVertexInsert(b *testing.B) {
kv, _ := badgerdb.NewKVInterface("test_1.db")
kv, _ := badgerdb.NewKVInterface("test_1.db", kvi.Options{})
graphDB := kvgraph.NewKVGraph(kv)
graphDB.AddGraph("test")
graph, err := graphDB.Graph("test")
Expand All @@ -45,7 +46,7 @@ func BenchmarkVertexInsert(b *testing.B) {
}

func BenchmarkEdgeInsert(b *testing.B) {
kv, _ := badgerdb.NewKVInterface("test_1.db")
kv, _ := badgerdb.NewKVInterface("test_1.db", kvi.Options{})
graphDB := kvgraph.NewKVGraph(kv)
graphDB.AddGraph("test")
graph, err := graphDB.Graph("test")
Expand Down
42 changes: 38 additions & 4 deletions cmd/kvload/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"log"

"github.com/bmeg/golib"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/util"
"github.com/spf13/cobra"
)
Expand All @@ -16,6 +18,8 @@ var kvDriver = "badger"
var graph string
var vertexFile string
var edgeFile string
var vertexManifestFile string
var edgeManifestFile string

var batchSize = 1000

Expand All @@ -35,7 +39,7 @@ var Cmd = &cobra.Command{
Long: ``,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if vertexFile == "" && edgeFile == "" {
if vertexFile == "" && edgeFile == "" && vertexManifestFile == "" && edgeManifestFile == "" {
return fmt.Errorf("no edge or vertex files were provided")
}

Expand All @@ -44,18 +48,46 @@ var Cmd = &cobra.Command{
// Create the graph if it doesn't already exist.
// Creating the graph also results in the creation of indices
// for the edge/vertex collections.
db, err := kvgraph.NewKVGraphDB(kvDriver, dbPath)
kv, err := kvgraph.NewKVInterface(kvDriver, dbPath, &kvi.Options{BulkLoad: true})
if err != nil {
return err
}
db := kvgraph.NewKVGraph(kv)

db.AddGraph(graph)
kgraph, err := db.Graph(graph)
if err != nil {
return err
}

vertexFileArray := []string{}
edgeFileArray := []string{}

if vertexManifestFile != "" {
reader, err := golib.ReadFileLines(vertexManifestFile)
if err == nil {
for line := range reader {
vertexFileArray = append(vertexFileArray, string(line))
}
}
}
if edgeManifestFile != "" {
reader, err := golib.ReadFileLines(edgeManifestFile)
if err == nil {
for line := range reader {
edgeFileArray = append(edgeFileArray, string(line))
}
}
}

if vertexFile != "" {
vertexFileArray = append(vertexFileArray, vertexFile)
}
if edgeFile != "" {
edgeFileArray = append(edgeFileArray, edgeFile)
}

for _, vertexFile := range vertexFileArray {
log.Printf("Loading %s", vertexFile)
count := 0
vertexChan := make(chan []*gripql.Vertex, 100)
Expand Down Expand Up @@ -87,7 +119,7 @@ var Cmd = &cobra.Command{
}
}

if edgeFile != "" {
for _, edgeFile := range edgeFileArray {
log.Printf("Loading %s", edgeFile)
count := 0
edgeChan := make(chan []*gripql.Edge, 100)
Expand Down Expand Up @@ -129,5 +161,7 @@ func init() {
flags.StringVar(&kvDriver, "driver", kvDriver, "KV Driver")
flags.StringVar(&vertexFile, "vertex", "", "vertex file")
flags.StringVar(&edgeFile, "edge", "", "edge file")
flags.IntVar(&batchSize, "batch-size", batchSize, "mongo bulk load batch size")
flags.StringVar(&vertexManifestFile, "vertex-manifest", "", "vertex manifest file")
flags.StringVar(&edgeManifestFile, "edge-manifest", "", "edge manifest file")
flags.IntVar(&batchSize, "batch-size", batchSize, "bulk load batch size")
}
8 changes: 4 additions & 4 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"os/signal"
"strings"

_ "github.com/bmeg/grip/badgerdb" // import so badger will register itself
_ "github.com/bmeg/grip/boltdb" // import so bolt will register itself
"github.com/bmeg/grip/config"
"github.com/bmeg/grip/elastic"
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/kvgraph"
_ "github.com/bmeg/grip/leveldb" // import so level will register itself
_ "github.com/bmeg/grip/kvi/badgerdb" // import so badger will register itself
_ "github.com/bmeg/grip/kvi/boltdb" // import so bolt will register itself
_ "github.com/bmeg/grip/kvi/leveldb" // import so level will register itself
_ "github.com/bmeg/grip/kvi/rocksdb" // import so rocks will register itself
"github.com/bmeg/grip/mongo"
_ "github.com/bmeg/grip/rocksdb" // import so rocks will register itself
"github.com/bmeg/grip/server"
"github.com/bmeg/grip/sql"
_ "github.com/go-sql-driver/mysql" //import so mysql will register as a sql driver
Expand Down
4 changes: 2 additions & 2 deletions engine/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"io/ioutil"
"os"

"github.com/bmeg/grip/badgerdb"
"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/kvi"
"github.com/bmeg/grip/kvi/badgerdb"
)

// NewManager creates a resource manager
Expand All @@ -22,7 +22,7 @@ type manager struct {

func (bm *manager) GetTempKV() kvi.KVInterface {
td, _ := ioutil.TempDir(bm.workDir, "kvTmp")
kv, _ := badgerdb.NewKVInterface(td)
kv, _ := badgerdb.NewKVInterface(td, kvi.Options{})

bm.kvs = append(bm.kvs, kv)
bm.paths = append(bm.paths, td)
Expand Down
6 changes: 0 additions & 6 deletions kvgraph/graphdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kvgraph

import (
"bytes"
"context"
"fmt"

"github.com/bmeg/grip/gdbi"
Expand Down Expand Up @@ -80,8 +79,3 @@ func (kgraph *KVGraph) ListGraphs() []string {
})
return out
}

// GetSchema returns the schema of a specific graph in the database
func (kgraph *KVGraph) GetSchema(ctx context.Context, graph string, sampleN uint32, random bool) (*gripql.GraphSchema, error) {
return nil, fmt.Errorf("not implemented")
}
9 changes: 6 additions & 3 deletions kvgraph/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,20 @@ func AddKVDriver(name string, builder kvi.KVBuilder) error {

// NewKVInterface intitalize a new key value interface given the name of the
// driver and path to create the database
func NewKVInterface(name string, dbPath string) (kvi.KVInterface, error) {
func NewKVInterface(name string, dbPath string, opts *kvi.Options) (kvi.KVInterface, error) {
if builder, ok := kvMap[name]; ok {
return builder(dbPath)
if opts != nil {
return builder(dbPath, *opts)
}
return builder(dbPath, kvi.Options{})
}
return nil, fmt.Errorf("driver %s Not Found", name)
}

// NewKVGraphDB intitalize a new key value graph driver given the name of the
// driver and path/url to create the database at
func NewKVGraphDB(name string, dbPath string) (gdbi.GraphDB, error) {
kv, err := NewKVInterface(name, dbPath)
kv, err := NewKVInterface(name, dbPath, nil)
if err != nil {
return nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions kvgraph/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kvgraph

import (
"context"
"fmt"

"github.com/bmeg/grip/gripql"
)

// GetSchema gets schema of the graph
func (kgraph *KVGraph) GetSchema(ctx context.Context, graph string, sampleN uint32, random bool) (*gripql.GraphSchema, error) {
return nil, fmt.Errorf("KV Schema not implemented")
}
12 changes: 6 additions & 6 deletions kvgraph/test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"os"
"testing"

_ "github.com/bmeg/grip/badgerdb" // import so badger will register itself
_ "github.com/bmeg/grip/boltdb" // import so bolt will register itself
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
_ "github.com/bmeg/grip/leveldb" // import so level will register itself
_ "github.com/bmeg/grip/rocksdb" // import so rocks will register itself
_ "github.com/bmeg/grip/kvi/badgerdb" // import so badger will register itself
_ "github.com/bmeg/grip/kvi/boltdb" // import so bolt will register itself
_ "github.com/bmeg/grip/kvi/leveldb" // import so level will register itself
_ "github.com/bmeg/grip/kvi/rocksdb" // import so rocks will register itself
"github.com/bmeg/grip/util"
)

Expand All @@ -25,7 +25,7 @@ func resetKVInterface() {
panic(err)
}
dbpath = "test.db." + util.RandomString(6)
kvdriver, err = kvgraph.NewKVInterface(dbname, dbpath)
kvdriver, err = kvgraph.NewKVInterface(dbname, dbpath, nil)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestMain(m *testing.M) {
for _, dbname = range []string{"badger", "bolt", "level", "rocks"} {
dbpath = "test.db." + util.RandomString(6)

kvdriver, err = kvgraph.NewKVInterface(dbname, dbpath)
kvdriver, err = kvgraph.NewKVInterface(dbname, dbpath, nil)
if err != nil {
if dbname == "rocks" {
fmt.Println(`Warning: rocks driver not found; run test with "-tags rocksdb"`)
Expand Down
7 changes: 6 additions & 1 deletion badgerdb/badger_store.go → kvi/badgerdb/badger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
var loaded = kvgraph.AddKVDriver("badger", NewKVInterface)

// NewKVInterface creates new BoltDB backed KVInterface at `path`
func NewKVInterface(path string) (kvi.KVInterface, error) {
func NewKVInterface(path string, kopts kvi.Options) (kvi.KVInterface, error) {
log.Info("Starting BadgerDB")
_, err := os.Stat(path)
if os.IsNotExist(err) {
Expand All @@ -31,6 +31,10 @@ func NewKVInterface(path string) (kvi.KVInterface, error) {
opts.TableLoadingMode = options.MemoryMap
opts.Dir = path
opts.ValueDir = path
if kopts.BulkLoad {
opts.SyncWrites = false
opts.DoNotCompact = true // NOTE: this is a test value, it may need to be removed
}
db, err := badger.Open(opts)
if err != nil {
return nil, err
Expand All @@ -45,6 +49,7 @@ type BadgerKV struct {

// Close closes the badger connection
func (badgerkv *BadgerKV) Close() error {
log.Info("Closing BadgerDB")
return badgerkv.db.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion boltdb/bolt_store.go → kvi/boltdb/bolt_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var loaded = kvgraph.AddKVDriver("bolt", NewKVInterface)
var graphBucket = []byte("graph")

// NewKVInterface creates new BoltDB backed KVInterface at `path`
func NewKVInterface(path string) (kvi.KVInterface, error) {
func NewKVInterface(path string, opts kvi.Options) (kvi.KVInterface, error) {
log.Info("Starting BoltDB")
db, err := bolt.Open(path, 0600, nil)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion kvi/interface.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package kvi

// Options are the options for loading the KeyValue driver
type Options struct {
BulkLoad bool
}

// KVBuilder is function implemented by the various key/value storage drivers
// that returns an initialized KVInterface given a file/path argument
type KVBuilder func(path string) (KVInterface, error)
type KVBuilder func(path string, opts Options) (KVInterface, error)

// KVInterface is the base interface for key/value based graph driver
type KVInterface interface {
Expand Down
Loading

0 comments on commit a73e197

Please sign in to comment.