Skip to content

Commit

Permalink
Replace TxnWriter with WriteBatch (#5007)
Browse files Browse the repository at this point in the history
* Replace txnWriter with WriteBatch
* Add benchmark file writer_test.go

Co-authored-by: Ibrahim Jarif <jarifibrahim@gmail.com>
Co-authored-by: Ibrahim Jarif <ibrahim@dgraph.io>
  • Loading branch information
3 people authored May 6, 2020
1 parent cbc764b commit 20494f9
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453
github.com/dgraph-io/ristretto v0.0.2
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c h1:IXsBFBQ0g5JlPfu+3HotLmkej2xgyrkKceWmFlXSYIQ=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200430101140-5d19cc727d87 h1:FsCl1Yg3KVeYEzE7QlvpYg9WnySjLA5vbS9TlmEeUP8=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200430101140-5d19cc727d87/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0 h1:4VBIyLibX6qFfz6wSbEvp4RBfoKETvHfIln18ROLiHI=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nMIukDm/WEvdobPLl0LgeDd/JE66+24jBks=
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
Expand Down Expand Up @@ -342,6 +344,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
12 changes: 6 additions & 6 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

// TODO(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that
// WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter
// could be replaced with WriteBatch in the code
tmpWriter := NewTxnWriter(tmpDB)
tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
Expand Down Expand Up @@ -650,7 +647,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
r.attr, time.Since(start))
}()

writer := NewTxnWriter(pstore)
writer := pstore.NewManagedWriteBatch()
tmpStream := tmpDB.NewStreamAt(counter)
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr)
tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
Expand All @@ -669,14 +666,17 @@ func (r *rebuilder) Run(ctx context.Context) error {
return &bpb.KVList{Kv: kvs}, nil
}
tmpStream.Send = func(kvList *bpb.KVList) error {
// TODO (Anurag): Instead of calling SetEntryAt everytime, we can filter KVList and call Write only once.
// SetEntryAt requries lock for every entry, whereas Write reduces lock contention.
for _, kv := range kvList.Kv {
if len(kv.Value) == 0 {
continue
}

// We choose to write the PL at r.startTs, so it won't be read by txns,
// which occurred before this schema mutation.
if err := writer.SetAt(kv.Key, kv.Value, BitCompletePosting, r.startTs); err != nil {
e := &badger.Entry{Key: kv.Key, Value: kv.Value, UserMeta: BitCompletePosting}
if err := writer.SetEntryAt(e.WithDiscard(), r.startTs); err != nil {
return errors.Wrap(err, "error in writing index to pstore")
}
}
Expand Down
240 changes: 240 additions & 0 deletions posting/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package posting

import (
"io/ioutil"
"math"
"os"
"sync"
"testing"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/stretchr/testify/require"
)

var val = make([]byte, 128)

func BenchmarkWriter(b *testing.B) {
createKVList := func() bpb.KVList {
var KVList bpb.KVList
for i := 0; i < 5000000; i++ {
n := &bpb.KV{Key: []byte(string(i)), Value: val, Version: 5}
KVList.Kv = append(KVList.Kv, n)
}
return KVList
}

// Creates separate writer for each thread
writeInBadgerMThreadsB := func(db *badger.DB, KVList *bpb.KVList, wg *sync.WaitGroup) {
defer wg.Done()
wb := db.NewManagedWriteBatch()
if err := wb.Write(KVList); err != nil {
panic(err)
}
require.NoError(b, wb.Flush())

}

// Resuses one writer for all threads
writeInBadgerMThreadsW := func(wb *badger.WriteBatch, KVList *bpb.KVList, wg *sync.WaitGroup) {
defer wg.Done()

if err := wb.Write(KVList); err != nil {
panic(err)
}

}
// Creates separate writer for each thread
writeInBadgerSingleThreadB := func(db *badger.DB, KVList *bpb.KVList) {
wb := db.NewManagedWriteBatch()
if err := wb.Write(KVList); err != nil {
panic(err)
}
require.NoError(b, wb.Flush())

}
// Resuses one writer for all threads
writeInBadgerSingleThreadW := func(wb *badger.WriteBatch, KVList *bpb.KVList) {
if err := wb.Write(KVList); err != nil {
panic(err)
}

}

dbOpts := badger.DefaultOptions("").
WithLogger(nil).
WithSyncWrites(false).
WithNumVersionsToKeep(math.MaxInt64).
WithCompression(options.None)

KVList := createKVList()

// Vanilla TxnWriter
b.Run("TxnWriter", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir
var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()
for i := 0; i < b.N; i++ {
w := NewTxnWriter(db)
for _, typ := range KVList.Kv {
k := typ.Key
v := typ.Value
err := w.SetAt(k, v, BitSchemaPosting, 1)
require.NoError(b, err)
}
require.NoError(b, w.Flush())

}
})
// Single threaded BatchWriter
b.Run("WriteBatch1", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir

var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()

for i := 0; i < b.N; i++ {
wb := db.NewManagedWriteBatch()
if err := wb.Write(&KVList); err != nil {
panic(err)
}
require.NoError(b, wb.Flush())
}
})
// Multi threaded Batchwriter with thread contention in WriteBatch
b.Run("WriteBatchMultThreadDiffWB", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir

var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()

for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
wg.Add(5)

go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[:1000000]}, &wg)
go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[1000001:2000000]}, &wg)
go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[2000001:3000000]}, &wg)
go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[3000001:4000000]}, &wg)
go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[4000001:]}, &wg)
wg.Wait()

}
})
// Multi threaded Batchwriter with thread contention in SetEntry
b.Run("WriteBatchMultThreadSameWB", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir

var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()

for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
wg.Add(5)
wb := db.NewManagedWriteBatch()
go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[:1000000]}, &wg)
go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[1000001:2000000]}, &wg)
go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[2000001:3000000]}, &wg)
go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[3000001:4000000]}, &wg)
go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[4000001:]}, &wg)

wg.Wait()
require.NoError(b, wb.Flush())
}
})
b.Run("WriteBatchSingleThreadDiffWB", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir

var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()

for i := 0; i < b.N; i++ {
writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[:1000000]})
writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[1000001:2000000]})
writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[2000001:3000000]})
writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[3000001:4000000]})
writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[4000001:]})
}
})
b.Run("WriteBatchSingleThreadSameWB", func(b *testing.B) {
tmpIndexDir, err := ioutil.TempDir("", "dgraph")
require.NoError(b, err)
defer os.RemoveAll(tmpIndexDir)

dbOpts.Dir = tmpIndexDir
dbOpts.ValueDir = tmpIndexDir

var db, err2 = badger.OpenManaged(dbOpts)
require.NoError(b, err2)
defer db.Close()

b.ResetTimer()

for i := 0; i < b.N; i++ {
wb := db.NewManagedWriteBatch()
writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[:1000000]})
writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[1000001:2000000]})
writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[2000001:3000000]})
writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[3000001:4000000]})
writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[4000001:]})
require.NoError(b, wb.Flush())
}
})
}
1 change: 1 addition & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ func (n *node) processApplyCh() {
}
}

// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.
func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
// First let's commit all mutations to disk.
writer := posting.NewTxnWriter(pstore)
Expand Down
2 changes: 1 addition & 1 deletion worker/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {

writer = sw
} else {
writer = posting.NewTxnWriter(pstore)
writer = pstore.NewManagedWriteBatch()
}

// We can use count to check the number of posting lists returned in tests.
Expand Down

0 comments on commit 20494f9

Please sign in to comment.