diff --git a/Makefile b/Makefile index e748260b..688d56bc 100644 --- a/Makefile +++ b/Makefile @@ -38,6 +38,13 @@ depends: with-rocksdb: depends @go install -tags 'rocksdb' -ldflags '$(VERSION_LDFLAGS)' . +local-rocksdb: rocksdb-lib + @CGO_LDFLAGS="-L$(shell pwd)/rocksdb-lib" CGO_CFLAGS="-I$(shell pwd)/rocksdb-lib/include/" go install -tags 'rocksdb' -ldflags '$(VERSION_LDFLAGS)' . + +rocksdb-lib: + @git clone https://github.com/facebook/rocksdb.git rocksdb-lib + @pushd rocksdb-lib && make static_lib && popd + # -------------------------- # Complile Protobuf Schemas # -------------------------- diff --git a/benchmark/engine_test.go b/benchmark/engine_test.go index c1672328..181c1790 100644 --- a/benchmark/engine_test.go +++ b/benchmark/engine_test.go @@ -5,15 +5,16 @@ import ( "fmt" "testing" - "github.com/bmeg/grip/badgerdb" "github.com/bmeg/grip/engine" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/kvgraph" + "github.com/bmeg/grip/kvi" + "github.com/bmeg/grip/kvi/badgerdb" ) // Dead simple baseline tests: get all vertices from a memory-backed graph. func BenchmarkBaselineV(b *testing.B) { - kv, _ := badgerdb.NewKVInterface("test-badger.db") + kv, _ := badgerdb.NewKVInterface("test-badger.db", kvi.Options{}) db, err := kvgraph.NewKVGraph(kv).Graph("test-graph") if err != nil { b.Fatal(err) diff --git a/benchmark/insertion_test.go b/benchmark/insertion_test.go index e7c57841..d2c29fb7 100644 --- a/benchmark/insertion_test.go +++ b/benchmark/insertion_test.go @@ -7,8 +7,9 @@ import ( "github.com/bmeg/grip/gripql" //"github.com/bmeg/grip/gdbi" - "github.com/bmeg/grip/badgerdb" "github.com/bmeg/grip/kvgraph" + "github.com/bmeg/grip/kvi" + "github.com/bmeg/grip/kvi/badgerdb" ) var idRunes = []rune("abcdefghijklmnopqrstuvwxyz") @@ -22,7 +23,7 @@ func randID() string { } func BenchmarkVertexInsert(b *testing.B) { - kv, _ := badgerdb.NewKVInterface("test_1.db") + kv, _ := badgerdb.NewKVInterface("test_1.db", kvi.Options{}) graphDB := kvgraph.NewKVGraph(kv) graphDB.AddGraph("test") graph, err := graphDB.Graph("test") @@ -45,7 +46,7 @@ func BenchmarkVertexInsert(b *testing.B) { } func BenchmarkEdgeInsert(b *testing.B) { - kv, _ := badgerdb.NewKVInterface("test_1.db") + kv, _ := badgerdb.NewKVInterface("test_1.db", kvi.Options{}) graphDB := kvgraph.NewKVGraph(kv) graphDB.AddGraph("test") graph, err := graphDB.Graph("test") diff --git a/cmd/kvload/main.go b/cmd/kvload/main.go index 98040fa4..0fa3b08b 100644 --- a/cmd/kvload/main.go +++ b/cmd/kvload/main.go @@ -4,8 +4,10 @@ import ( "fmt" "log" + "github.com/bmeg/golib" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/kvgraph" + "github.com/bmeg/grip/kvi" "github.com/bmeg/grip/util" "github.com/spf13/cobra" ) @@ -16,6 +18,8 @@ var kvDriver = "badger" var graph string var vertexFile string var edgeFile string +var vertexManifestFile string +var edgeManifestFile string var batchSize = 1000 @@ -35,7 +39,7 @@ var Cmd = &cobra.Command{ Long: ``, Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - if vertexFile == "" && edgeFile == "" { + if vertexFile == "" && edgeFile == "" && vertexManifestFile == "" && edgeManifestFile == "" { return fmt.Errorf("no edge or vertex files were provided") } @@ -44,10 +48,11 @@ var Cmd = &cobra.Command{ // Create the graph if it doesn't already exist. // Creating the graph also results in the creation of indices // for the edge/vertex collections. - db, err := kvgraph.NewKVGraphDB(kvDriver, dbPath) + kv, err := kvgraph.NewKVInterface(kvDriver, dbPath, &kvi.Options{BulkLoad: true}) if err != nil { return err } + db := kvgraph.NewKVGraph(kv) db.AddGraph(graph) kgraph, err := db.Graph(graph) @@ -55,7 +60,34 @@ var Cmd = &cobra.Command{ return err } + vertexFileArray := []string{} + edgeFileArray := []string{} + + if vertexManifestFile != "" { + reader, err := golib.ReadFileLines(vertexManifestFile) + if err == nil { + for line := range reader { + vertexFileArray = append(vertexFileArray, string(line)) + } + } + } + if edgeManifestFile != "" { + reader, err := golib.ReadFileLines(edgeManifestFile) + if err == nil { + for line := range reader { + edgeFileArray = append(edgeFileArray, string(line)) + } + } + } + if vertexFile != "" { + vertexFileArray = append(vertexFileArray, vertexFile) + } + if edgeFile != "" { + edgeFileArray = append(edgeFileArray, edgeFile) + } + + for _, vertexFile := range vertexFileArray { log.Printf("Loading %s", vertexFile) count := 0 vertexChan := make(chan []*gripql.Vertex, 100) @@ -87,7 +119,7 @@ var Cmd = &cobra.Command{ } } - if edgeFile != "" { + for _, edgeFile := range edgeFileArray { log.Printf("Loading %s", edgeFile) count := 0 edgeChan := make(chan []*gripql.Edge, 100) @@ -129,5 +161,7 @@ func init() { flags.StringVar(&kvDriver, "driver", kvDriver, "KV Driver") flags.StringVar(&vertexFile, "vertex", "", "vertex file") flags.StringVar(&edgeFile, "edge", "", "edge file") - flags.IntVar(&batchSize, "batch-size", batchSize, "mongo bulk load batch size") + flags.StringVar(&vertexManifestFile, "vertex-manifest", "", "vertex manifest file") + flags.StringVar(&edgeManifestFile, "edge-manifest", "", "edge manifest file") + flags.IntVar(&batchSize, "batch-size", batchSize, "bulk load batch size") } diff --git a/cmd/server/main.go b/cmd/server/main.go index 6660bea9..a4a9dd5c 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -7,15 +7,15 @@ import ( "os/signal" "strings" - _ "github.com/bmeg/grip/badgerdb" // import so badger will register itself - _ "github.com/bmeg/grip/boltdb" // import so bolt will register itself "github.com/bmeg/grip/config" "github.com/bmeg/grip/elastic" "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/kvgraph" - _ "github.com/bmeg/grip/leveldb" // import so level will register itself + _ "github.com/bmeg/grip/kvi/badgerdb" // import so badger will register itself + _ "github.com/bmeg/grip/kvi/boltdb" // import so bolt will register itself + _ "github.com/bmeg/grip/kvi/leveldb" // import so level will register itself + _ "github.com/bmeg/grip/kvi/rocksdb" // import so rocks will register itself "github.com/bmeg/grip/mongo" - _ "github.com/bmeg/grip/rocksdb" // import so rocks will register itself "github.com/bmeg/grip/server" "github.com/bmeg/grip/sql" _ "github.com/go-sql-driver/mysql" //import so mysql will register as a sql driver diff --git a/engine/manager.go b/engine/manager.go index ce6e89df..9317e2f5 100644 --- a/engine/manager.go +++ b/engine/manager.go @@ -4,9 +4,9 @@ import ( "io/ioutil" "os" - "github.com/bmeg/grip/badgerdb" "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/kvi" + "github.com/bmeg/grip/kvi/badgerdb" ) // NewManager creates a resource manager @@ -22,7 +22,7 @@ type manager struct { func (bm *manager) GetTempKV() kvi.KVInterface { td, _ := ioutil.TempDir(bm.workDir, "kvTmp") - kv, _ := badgerdb.NewKVInterface(td) + kv, _ := badgerdb.NewKVInterface(td, kvi.Options{}) bm.kvs = append(bm.kvs, kv) bm.paths = append(bm.paths, td) diff --git a/kvgraph/graphdb.go b/kvgraph/graphdb.go index b07daf6b..5c6869e0 100644 --- a/kvgraph/graphdb.go +++ b/kvgraph/graphdb.go @@ -2,7 +2,6 @@ package kvgraph import ( "bytes" - "context" "fmt" "github.com/bmeg/grip/gdbi" @@ -80,8 +79,3 @@ func (kgraph *KVGraph) ListGraphs() []string { }) return out } - -// GetSchema returns the schema of a specific graph in the database -func (kgraph *KVGraph) GetSchema(ctx context.Context, graph string, sampleN uint32, random bool) (*gripql.GraphSchema, error) { - return nil, fmt.Errorf("not implemented") -} diff --git a/kvgraph/new.go b/kvgraph/new.go index 311d47dd..a63a5b2b 100644 --- a/kvgraph/new.go +++ b/kvgraph/new.go @@ -34,9 +34,12 @@ func AddKVDriver(name string, builder kvi.KVBuilder) error { // NewKVInterface intitalize a new key value interface given the name of the // driver and path to create the database -func NewKVInterface(name string, dbPath string) (kvi.KVInterface, error) { +func NewKVInterface(name string, dbPath string, opts *kvi.Options) (kvi.KVInterface, error) { if builder, ok := kvMap[name]; ok { - return builder(dbPath) + if opts != nil { + return builder(dbPath, *opts) + } + return builder(dbPath, kvi.Options{}) } return nil, fmt.Errorf("driver %s Not Found", name) } @@ -44,7 +47,7 @@ func NewKVInterface(name string, dbPath string) (kvi.KVInterface, error) { // NewKVGraphDB intitalize a new key value graph driver given the name of the // driver and path/url to create the database at func NewKVGraphDB(name string, dbPath string) (gdbi.GraphDB, error) { - kv, err := NewKVInterface(name, dbPath) + kv, err := NewKVInterface(name, dbPath, nil) if err != nil { return nil, err } diff --git a/kvgraph/schema.go b/kvgraph/schema.go new file mode 100644 index 00000000..918ee1d0 --- /dev/null +++ b/kvgraph/schema.go @@ -0,0 +1,13 @@ +package kvgraph + +import ( + "context" + "fmt" + + "github.com/bmeg/grip/gripql" +) + +// GetSchema gets schema of the graph +func (kgraph *KVGraph) GetSchema(ctx context.Context, graph string, sampleN uint32, random bool) (*gripql.GraphSchema, error) { + return nil, fmt.Errorf("KV Schema not implemented") +} diff --git a/kvgraph/test/main_test.go b/kvgraph/test/main_test.go index 1f9427fc..32ee8bb8 100644 --- a/kvgraph/test/main_test.go +++ b/kvgraph/test/main_test.go @@ -5,12 +5,12 @@ import ( "os" "testing" - _ "github.com/bmeg/grip/badgerdb" // import so badger will register itself - _ "github.com/bmeg/grip/boltdb" // import so bolt will register itself "github.com/bmeg/grip/kvgraph" "github.com/bmeg/grip/kvi" - _ "github.com/bmeg/grip/leveldb" // import so level will register itself - _ "github.com/bmeg/grip/rocksdb" // import so rocks will register itself + _ "github.com/bmeg/grip/kvi/badgerdb" // import so badger will register itself + _ "github.com/bmeg/grip/kvi/boltdb" // import so bolt will register itself + _ "github.com/bmeg/grip/kvi/leveldb" // import so level will register itself + _ "github.com/bmeg/grip/kvi/rocksdb" // import so rocks will register itself "github.com/bmeg/grip/util" ) @@ -25,7 +25,7 @@ func resetKVInterface() { panic(err) } dbpath = "test.db." + util.RandomString(6) - kvdriver, err = kvgraph.NewKVInterface(dbname, dbpath) + kvdriver, err = kvgraph.NewKVInterface(dbname, dbpath, nil) if err != nil { panic(err) } @@ -55,7 +55,7 @@ func TestMain(m *testing.M) { for _, dbname = range []string{"badger", "bolt", "level", "rocks"} { dbpath = "test.db." + util.RandomString(6) - kvdriver, err = kvgraph.NewKVInterface(dbname, dbpath) + kvdriver, err = kvgraph.NewKVInterface(dbname, dbpath, nil) if err != nil { if dbname == "rocks" { fmt.Println(`Warning: rocks driver not found; run test with "-tags rocksdb"`) diff --git a/badgerdb/badger_store.go b/kvi/badgerdb/badger_store.go similarity index 96% rename from badgerdb/badger_store.go rename to kvi/badgerdb/badger_store.go index f677a35d..5b99c8dd 100644 --- a/badgerdb/badger_store.go +++ b/kvi/badgerdb/badger_store.go @@ -19,7 +19,7 @@ import ( var loaded = kvgraph.AddKVDriver("badger", NewKVInterface) // NewKVInterface creates new BoltDB backed KVInterface at `path` -func NewKVInterface(path string) (kvi.KVInterface, error) { +func NewKVInterface(path string, kopts kvi.Options) (kvi.KVInterface, error) { log.Info("Starting BadgerDB") _, err := os.Stat(path) if os.IsNotExist(err) { @@ -31,6 +31,10 @@ func NewKVInterface(path string) (kvi.KVInterface, error) { opts.TableLoadingMode = options.MemoryMap opts.Dir = path opts.ValueDir = path + if kopts.BulkLoad { + opts.SyncWrites = false + opts.DoNotCompact = true // NOTE: this is a test value, it may need to be removed + } db, err := badger.Open(opts) if err != nil { return nil, err @@ -45,6 +49,7 @@ type BadgerKV struct { // Close closes the badger connection func (badgerkv *BadgerKV) Close() error { + log.Info("Closing BadgerDB") return badgerkv.db.Close() } diff --git a/boltdb/bolt_store.go b/kvi/boltdb/bolt_store.go similarity index 98% rename from boltdb/bolt_store.go rename to kvi/boltdb/bolt_store.go index 6eb96244..0035aa05 100644 --- a/boltdb/bolt_store.go +++ b/kvi/boltdb/bolt_store.go @@ -19,7 +19,7 @@ var loaded = kvgraph.AddKVDriver("bolt", NewKVInterface) var graphBucket = []byte("graph") // NewKVInterface creates new BoltDB backed KVInterface at `path` -func NewKVInterface(path string) (kvi.KVInterface, error) { +func NewKVInterface(path string, opts kvi.Options) (kvi.KVInterface, error) { log.Info("Starting BoltDB") db, err := bolt.Open(path, 0600, nil) if err != nil { diff --git a/kvi/interface.go b/kvi/interface.go index b8ae5ba7..cb8706d7 100644 --- a/kvi/interface.go +++ b/kvi/interface.go @@ -1,8 +1,13 @@ package kvi +// Options are the options for loading the KeyValue driver +type Options struct { + BulkLoad bool +} + // KVBuilder is function implemented by the various key/value storage drivers // that returns an initialized KVInterface given a file/path argument -type KVBuilder func(path string) (KVInterface, error) +type KVBuilder func(path string, opts Options) (KVInterface, error) // KVInterface is the base interface for key/value based graph driver type KVInterface interface { diff --git a/leveldb/level_store.go b/kvi/leveldb/level_store.go similarity index 87% rename from leveldb/level_store.go rename to kvi/leveldb/level_store.go index ca2e1f83..fbdfa3a2 100644 --- a/leveldb/level_store.go +++ b/kvi/leveldb/level_store.go @@ -13,27 +13,42 @@ import ( log "github.com/sirupsen/logrus" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" ) var loaded = kvgraph.AddKVDriver("level", NewKVInterface) // NewKVInterface creates new LevelDB backed KVInterface at `path` -func NewKVInterface(path string) (kvi.KVInterface, error) { +func NewKVInterface(path string, opts kvi.Options) (kvi.KVInterface, error) { log.Info("Starting LevelDB") - db, err := leveldb.OpenFile(path, nil) + + var db *leveldb.DB + var err error + if opts.BulkLoad { + o := opt.Options{} + o.CompactionL0Trigger = 1 << 31 + db, err = leveldb.OpenFile(path, &o) + } else { + db, err = leveldb.OpenFile(path, nil) + } if err != nil { return &LevelKV{}, err } - return &LevelKV{db: db}, nil + return &LevelKV{db: db, opts: opts}, nil } // LevelKV implements the generic key value interface using the leveldb library type LevelKV struct { - db *leveldb.DB + db *leveldb.DB + opts kvi.Options } // Close database func (l *LevelKV) Close() error { + if l.opts.BulkLoad { + l.db.CompactRange(util.Range{Start: nil, Limit: nil}) + } return l.db.Close() } @@ -82,17 +97,18 @@ func (l *LevelKV) Set(id []byte, val []byte) error { // Update runs an alteration transaction of the kvstore func (l *LevelKV) Update(u func(tx kvi.KVTransaction) error) error { tx, _ := l.db.OpenTransaction() - ktx := levelTransaction{tx} + ktx := levelTransaction{tx, l.db} defer tx.Commit() return u(ktx) } type levelTransaction struct { tx *leveldb.Transaction + db *leveldb.DB } func (ltx levelTransaction) Set(key, val []byte) error { - return ltx.tx.Put(key, val, nil) + return ltx.tx.Put(key, val, nil) //&opt.WriteOptions{NoWriteMerge: true}) } // Delete removes key `id` from the kv store diff --git a/rocksdb/rocks.go b/kvi/rocksdb/rocks.go similarity index 94% rename from rocksdb/rocks.go rename to kvi/rocksdb/rocks.go index be47139a..57064454 100644 --- a/rocksdb/rocks.go +++ b/kvi/rocksdb/rocks.go @@ -19,7 +19,7 @@ import ( var loaded = kvgraph.AddKVDriver("rocks", NewKVInterface) // NewKVInterface creates new RocksDB backed KVInterface at `path` -func NewKVInterface(path string) (kvi.KVInterface, error) { +func NewKVInterface(path string, kopts kvi.Options) (kvi.KVInterface, error) { log.Info("Starting RocksDB") bbto := gorocksdb.NewDefaultBlockBasedTableOptions() @@ -29,6 +29,9 @@ func NewKVInterface(path string) (kvi.KVInterface, error) { opts := gorocksdb.NewDefaultOptions() opts.SetBlockBasedTableFactory(bbto) opts.SetCreateIfMissing(true) + if kopts.BulkLoad { + opts.PrepareForBulkLoad() + } db, err := gorocksdb.OpenDb(opts, path) if err != nil { @@ -40,21 +43,26 @@ func NewKVInterface(path string) (kvi.KVInterface, error) { //wo.SetSync(true) return &RocksKV{ - db: db, - ro: ro, - wo: wo, + db: db, + ro: ro, + wo: wo, + opts: kopts, }, nil } // RocksKV is an implementation of the KVStore for rocksdb type RocksKV struct { - db *gorocksdb.DB - ro *gorocksdb.ReadOptions - wo *gorocksdb.WriteOptions + db *gorocksdb.DB + ro *gorocksdb.ReadOptions + wo *gorocksdb.WriteOptions + opts kvi.Options } // Close closes the rocksdb connection func (rockskv *RocksKV) Close() error { + if rockskv.opts.BulkLoad { + rockskv.db.CompactRange(gorocksdb.Range{nil, nil}) + } rockskv.db.Close() return nil } diff --git a/rocksdb/stub.go b/kvi/rocksdb/stub.go similarity index 100% rename from rocksdb/stub.go rename to kvi/rocksdb/stub.go diff --git a/server/auth_test.go b/server/auth_test.go index e95aa113..250eec4e 100644 --- a/server/auth_test.go +++ b/server/auth_test.go @@ -7,9 +7,9 @@ import ( "testing" "time" - _ "github.com/bmeg/grip/badgerdb" // import so badger will register itself "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/kvgraph" + _ "github.com/bmeg/grip/kvi/badgerdb" // import so badger will register itself "github.com/bmeg/grip/util" "github.com/bmeg/grip/util/rpc" ) diff --git a/test/kvbench/bagder_test.go b/test/kvbench/bagder_test.go new file mode 100644 index 00000000..933514aa --- /dev/null +++ b/test/kvbench/bagder_test.go @@ -0,0 +1,48 @@ +package kvbench + +import ( + "log" + "os" + "testing" + "time" + + "github.com/bmeg/grip/util" + "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/options" +) + +func badgerInit() *badger.DB { + path := "test.badger" + os.RemoveAll(path) + os.Mkdir(path, 0700) + + opts := badger.Options{} + opts = badger.DefaultOptions + opts.TableLoadingMode = options.MemoryMap + opts.SyncWrites = false + opts.Dir = path + opts.ValueDir = path + db, err := badger.Open(opts) + + if err != nil { + log.Printf("Error: %s", err) + } + return db +} + +func BenchmarkBadgerPut(b *testing.B) { + + db := badgerInit() + + time.Sleep(5 * time.Second) + + b.ResetTimer() + for i := 0; i < b.N*10000; i++ { + id := util.RandomString(10) + val := []byte("testing") + db.Update(func(tx *badger.Txn) error { + return tx.Set([]byte(id), val) + }) + } + db.Close() +} diff --git a/test/kvbench/level_test.go b/test/kvbench/level_test.go new file mode 100644 index 00000000..13fecb75 --- /dev/null +++ b/test/kvbench/level_test.go @@ -0,0 +1,49 @@ +package kvbench + +import ( + "log" + "os" + "testing" + + "github.com/bmeg/grip/util" + + "github.com/syndtr/goleveldb/leveldb" + //"github.com/syndtr/goleveldb/leveldb/iterator" +) + +func BenchmarkLevelDBPut(b *testing.B) { + path := "test.level" + os.RemoveAll(path) + db, err := leveldb.OpenFile(path, nil) + if err != nil { + log.Printf("Error: %s", err) + } + b.ResetTimer() + for i := 0; i < b.N*10000; i++ { + id := util.RandomString(10) + val := []byte("testing") + db.Put([]byte(id), val, nil) + } + db.Close() +} + +func BenchmarkLevelDBBatch(b *testing.B) { + path := "test.level" + os.RemoveAll(path) + db, err := leveldb.OpenFile(path, nil) + if err != nil { + log.Printf("Error: %s", err) + } + b.ResetTimer() + + for i := 0; i < b.N; i++ { + batch := new(leveldb.Batch) + for j := 0; j < 10000; j++ { + id := util.RandomString(10) + val := []byte("testing") + batch.Put([]byte(id), val) + } + err = db.Write(batch, nil) + } + db.Close() +} diff --git a/test/main_test.go b/test/main_test.go index bde5c7c1..ab767a89 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -8,16 +8,16 @@ import ( "strings" "testing" - _ "github.com/bmeg/grip/badgerdb" // import so badger will register itself - _ "github.com/bmeg/grip/boltdb" // import so bolt will register itself "github.com/bmeg/grip/config" "github.com/bmeg/grip/elastic" "github.com/bmeg/grip/gdbi" "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/kvgraph" - _ "github.com/bmeg/grip/leveldb" // import so level will register itself + _ "github.com/bmeg/grip/kvi/badgerdb" // import so badger will register itself + _ "github.com/bmeg/grip/kvi/boltdb" // import so bolt will register itself + _ "github.com/bmeg/grip/kvi/leveldb" // import so level will register itself + _ "github.com/bmeg/grip/kvi/rocksdb" // import so rocks will register itself "github.com/bmeg/grip/mongo" - _ "github.com/bmeg/grip/rocksdb" // import so rocks will register itself "github.com/bmeg/grip/sql" "github.com/bmeg/grip/util" _ "github.com/lib/pq" // import so postgres will register as a sql driver