Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace TxnWriter with WriteBatch #5007

Merged
merged 56 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
666bc77
Added batchWrite in index.go
all-seeing-code Mar 23, 2020
55cfbad
Add batchWriter where the data is published to pStore and removed the…
all-seeing-code Mar 23, 2020
54f6e2b
Merge branch 'master' into anurags92/replace-txnwrite
all-seeing-code Mar 23, 2020
8eaecac
Code clean-up
all-seeing-code Mar 24, 2020
a07f87d
Edited comments on changes
all-seeing-code Mar 24, 2020
5a6985c
Addressed golangci comments
all-seeing-code Mar 24, 2020
541945d
Added benchmark file
all-seeing-code Mar 30, 2020
5fb9809
Added function to time BatchWrite
all-seeing-code Mar 30, 2020
3252046
Code cleanup
all-seeing-code Mar 30, 2020
528eca0
Bug Fix
all-seeing-code Mar 30, 2020
258173b
Remove glog initialization
all-seeing-code Mar 30, 2020
8bef464
Removed unused flags
all-seeing-code Mar 31, 2020
465a1b8
Merged benchamrk functions in one
all-seeing-code Mar 31, 2020
f289f58
Minor edits
all-seeing-code Mar 31, 2020
2355bfb
Minor edits again
all-seeing-code Mar 31, 2020
11783f4
Changed go.mod file to point to local bager
all-seeing-code Apr 6, 2020
70b7c59
Replace txnWriter with WriteBatch which takes different versions
all-seeing-code Apr 6, 2020
7379274
Temporary changes
all-seeing-code Apr 6, 2020
503d77a
Test Write Batch
all-seeing-code Apr 6, 2020
4a91d0b
Add Write Batch at one instance
all-seeing-code Apr 7, 2020
fce4ca1
Use batchset instead of txnwriter
jarifibrahim Apr 8, 2020
ea3582a
Fix go.mod
jarifibrahim Apr 8, 2020
1716481
Merge from Ibrahim's branch
all-seeing-code Apr 9, 2020
859f270
Merge remote-tracking branch 'origin/master' into anurags92/replace-t…
all-seeing-code Apr 9, 2020
20cda18
Merged Batchset with tests and benchmarks
all-seeing-code Apr 9, 2020
7f4112b
Reduce the numbers of kv pairs
all-seeing-code Apr 9, 2020
85724cd
Add wg Wait
all-seeing-code Apr 13, 2020
e355dce
merge with master
all-seeing-code Apr 23, 2020
a976e0a
resolve merge conflicts
all-seeing-code Apr 23, 2020
022a9cb
resolve merge conflicts again
all-seeing-code Apr 23, 2020
d72ba06
revert back to master for index.go
all-seeing-code Apr 23, 2020
f5fd378
revert index_test.go to master
all-seeing-code Apr 23, 2020
074bcb3
Use new batch writer api
all-seeing-code Apr 24, 2020
46074c5
Fix minor issue
Apr 28, 2020
8ee34e6
Update badger
Apr 28, 2020
26e47cc
Merge branch 'master' into anurags92/replace-txnwrite
Apr 28, 2020
8fdd64c
Merge branch 'master' into anurags92/replace-txnwrite
Apr 29, 2020
0b7e39a
Update badger
Apr 30, 2020
2818574
Update badger
Apr 30, 2020
90e0b22
Add batchwriter inplace of txn writer
all-seeing-code May 3, 2020
9b39c31
Add write batch in snapshot.go
all-seeing-code May 4, 2020
b9c3aa8
Defined new struct to handle interface definition
all-seeing-code May 4, 2020
baf5eaf
Modify writer_test.go to account for new API
all-seeing-code May 4, 2020
af79121
Code clean-up
all-seeing-code May 4, 2020
59f29d1
Add Write API for batch writer inside a function which accepts KVs list
all-seeing-code May 4, 2020
c5ee1e6
bugfixes
all-seeing-code May 4, 2020
0778ce7
Add multi-threaded version of batchwrite for benchmarking
all-seeing-code May 4, 2020
014e069
Address minor comments on PR
all-seeing-code May 4, 2020
cb8564f
Split entry in two lines
all-seeing-code May 4, 2020
12393ee
Use Write API from badger
all-seeing-code May 4, 2020
d21e7cf
Temp changes to run benchmarks
all-seeing-code May 4, 2020
4a3e5fb
Add a TODO and update writer_test.go
all-seeing-code May 5, 2020
74136f0
Add single threaded version to tests to emulate stream behaviour
all-seeing-code May 5, 2020
bbf99d1
Remove wait groups from dingle threaded version
all-seeing-code May 5, 2020
722db3c
Minor fixes
all-seeing-code May 6, 2020
78deec0
Update badger and edit comments in test file
all-seeing-code May 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200430101140-5d19cc727d87
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453
github.com/dgraph-io/ristretto v0.0.2
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand Down Expand Up @@ -67,4 +67,4 @@ require (
gopkg.in/DataDog/dd-trace-go.v1 v1.13.1 // indirect
gopkg.in/ini.v1 v1.48.0 // indirect
gopkg.in/yaml.v2 v2.2.4
)
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c h1:IXsBFBQ0g5JlPfu+3HotLmkej2xgyrkKceWmFlXSYIQ=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200430101140-5d19cc727d87 h1:FsCl1Yg3KVeYEzE7QlvpYg9WnySjLA5vbS9TlmEeUP8=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200430101140-5d19cc727d87/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nMIukDm/WEvdobPLl0LgeDd/JE66+24jBks=
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
Expand Down
14 changes: 7 additions & 7 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

// TODO(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that
// WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter
// could be replaced with WriteBatch in the code
tmpWriter := NewTxnWriter(tmpDB)
tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
Expand Down Expand Up @@ -625,7 +622,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
return &bpb.KVList{Kv: kvs}, nil
}
stream.Send = func(kvList *bpb.KVList) error {
if err := tmpWriter.Write(kvList); err != nil {
if err := x.WriteBatchWriter(tmpWriter, kvList); err != nil {
return errors.Wrap(err, "error setting entries in temp badger")
}

Expand All @@ -650,7 +647,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
r.attr, time.Since(start))
}()

writer := NewTxnWriter(pstore)
writer := pstore.NewManagedWriteBatch()
tmpStream := tmpDB.NewStreamAt(counter)
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr)
tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
Expand All @@ -676,7 +673,10 @@ func (r *rebuilder) Run(ctx context.Context) error {

// We choose to write the PL at r.startTs, so it won't be read by txns,
// which occurred before this schema mutation.
if err := writer.SetAt(kv.Key, kv.Value, BitCompletePosting, r.startTs); err != nil {
if err := writer.SetEntryAt(
(&badger.Entry{Key: kv.Key,
Value: kv.Value,
UserMeta: BitCompletePosting}).WithDiscard(), r.startTs); err != nil {
return errors.Wrap(err, "error in writing index to pstore")
}
}
Expand Down
178 changes: 178 additions & 0 deletions posting/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package posting

import (
"io/ioutil"
"math"
"os"
"sync"
"testing"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/stretchr/testify/require"
)

type kv struct {
key []byte
value []byte
}

func BenchmarkWriter(b *testing.B) {
createKVList := func() []kv {
var KVList = []kv{}
for i := 0; i < 50000; i++ {
n := kv{key: []byte(string(i)), value: []byte(string(i))}
KVList = append(KVList, n)
}
return KVList
}

writeInBagder := func(db *badger.DB, KVList []kv, wg *sync.WaitGroup) {
defer wg.Done()
wb := db.NewManagedWriteBatch()
for _, typ := range KVList {
e := &badger.Entry{Key: typ.key, Value: typ.value}
wb.SetEntryAt(e, 1)
}
require.NoError(b, wb.Flush())

}

writeInBagder2 := func(wb *badger.WriteBatch, KVList []kv, wg *sync.WaitGroup) {
defer wg.Done()

for _, typ := range KVList {
e := &badger.Entry{Key: typ.key, Value: typ.value}
wb.SetEntryAt(e, 1)
}

}

dbOpts := badger.DefaultOptions("").
WithLogger(nil).
WithSyncWrites(false).
WithNumVersionsToKeep(math.MaxInt64).
WithCompression(options.None)

KVList := createKVList()

//Vanilla TxnWriter
b.Run("TxnWriter", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir
var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()
for i := 0; i < b.N; i++ {
w := NewTxnWriter(db)
for _, typ := range KVList {
k := typ.key
v := typ.value
w.SetAt(k, v, BitSchemaPosting, 1)
}
require.NoError(b, w.Flush())

}
})
//Single threaded BatchWriter
b.Run("WriteBatch1", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir

var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()

for i := 0; i < b.N; i++ {
wb := db.NewManagedWriteBatch()
for _, typ := range KVList {
e := &badger.Entry{Key: typ.key, Value: typ.value}
wb.SetEntryAt(e, 1)
}
require.NoError(b, wb.Flush())
}
})
//Multi threaded Batchwriter with thread contention in WriteBatch
b.Run("WriteBatchMultThread1", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir

var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()

for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
wg.Add(5)
go writeInBagder(db, KVList[:10000], &wg)
go writeInBagder(db, KVList[10001:20000], &wg)
go writeInBagder(db, KVList[20001:30000], &wg)
go writeInBagder(db, KVList[30001:40000], &wg)
go writeInBagder(db, KVList[40001:], &wg)
wg.Wait()

}
})
//Multi threaded Batchwriter with thread contention in SetEntry
b.Run("WriteBatchMultThread2", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir

var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()

for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
wg.Add(5)
wb := db.NewManagedWriteBatch()
go writeInBagder2(wb, KVList[:10000], &wg)
go writeInBagder2(wb, KVList[10001:20000], &wg)
go writeInBagder2(wb, KVList[20001:30000], &wg)
go writeInBagder2(wb, KVList[30001:40000], &wg)
go writeInBagder2(wb, KVList[40001:], &wg)
wg.Wait()
require.NoError(b, wb.Flush())
}
})
}
1 change: 1 addition & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ func (n *node) processApplyCh() {
}
}

// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.
func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
// First let's commit all mutations to disk.
writer := posting.NewTxnWriter(pstore)
Expand Down
21 changes: 20 additions & 1 deletion worker/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)

const (
Expand All @@ -38,6 +39,24 @@ type badgerWriter interface {
Write(kvs *bpb.KVList) error
Flush() error
}
type newWriteBatch struct {
wb *badger.WriteBatch
}

func newWriteBatchWriter(db *badger.DB) *newWriteBatch {
return &newwriteBatch{wb: db.NewManagedWriteBatch()}
}

func (nwb *newWriteBatch) Write(kvs *bpb.KVList) error {
if err := x.WriteBatchWriter(nwb.wb, kvs); err != nil {
return err
}
return nil
}

func (nwb *newWriteBatch) Flush() error {
return nwb.wb.Flush()
}

// populateSnapshot gets data for a shard from the leader and writes it to BadgerDB on the follower.
func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {
Expand Down Expand Up @@ -65,7 +84,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {

writer = sw
} else {
writer = posting.NewTxnWriter(pstore)
writer = newWriteBatchWriter(pstore)
}

// We can use count to check the number of posting lists returned in tests.
Expand Down
18 changes: 17 additions & 1 deletion x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ import (
"time"

"github.com/dgraph-io/badger/v2"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v200"
"github.com/dgraph-io/dgo/v200/protos/api"

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/spf13/viper"
Expand Down Expand Up @@ -850,6 +850,22 @@ func AskUserPassword(userid string, pwdType string, times int) (string, error) {
return password, nil
}

//WriteBatchWriter exposes Write API batch writer.
func WriteBatchWriter(writer *badger.WriteBatch, kvList *bpb.KVList) error {
for _, kv := range kvList.Kv {
e := &badger.Entry{Key: kv.Key, Value: kv.Value}
//
if len(kv.UserMeta) > 0 {
e.UserMeta = kv.UserMeta[0]
}
if err := writer.SetEntryAt(e, kv.Version); err != nil {
return err
}
}
return nil

}

// GetPassAndLogin uses the given credentials and client to perform the login operation.
func GetPassAndLogin(dg *dgo.Dgraph, opt *CredOpt) error {
password := opt.Conf.GetString(opt.PasswordOpt)
Expand Down