From 20494f9cae244152f075c2cfcca7aa4b7e249ba5 Mon Sep 17 00:00:00 2001 From: Anurag Date: Wed, 6 May 2020 15:07:27 +0530 Subject: [PATCH] Replace TxnWriter with WriteBatch (#5007) * Replace txnWriter with WriteBatch * Add benchmark file writer_test.go Co-authored-by: Ibrahim Jarif Co-authored-by: Ibrahim Jarif --- go.mod | 2 +- go.sum | 7 +- posting/index.go | 12 +-- posting/writer_test.go | 240 +++++++++++++++++++++++++++++++++++++++++ worker/draft.go | 1 + worker/snapshot.go | 2 +- 6 files changed, 254 insertions(+), 10 deletions(-) create mode 100644 posting/writer_test.go diff --git a/go.mod b/go.mod index 0be9cfc949b..3b3f565ff4a 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c + github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0 github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 github.com/dgraph-io/ristretto v0.0.2 github.com/dgrijalva/jwt-go v3.2.0+incompatible diff --git a/go.sum b/go.sum index f38a59b6ada..b69ad0f8b7c 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c h1:IXsBFBQ0g5JlPfu+3HotLmkej2xgyrkKceWmFlXSYIQ= -github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200421062606-cddf7c03451c/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200430101140-5d19cc727d87 h1:FsCl1Yg3KVeYEzE7QlvpYg9WnySjLA5vbS9TlmEeUP8= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200430101140-5d19cc727d87/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0 h1:4VBIyLibX6qFfz6wSbEvp4RBfoKETvHfIln18ROLiHI= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nMIukDm/WEvdobPLl0LgeDd/JE66+24jBks= github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= @@ -342,6 +344,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/posting/index.go b/posting/index.go index 41f04339da2..1d08760013e 100644 --- a/posting/index.go +++ b/posting/index.go @@ -573,10 +573,7 @@ func (r *rebuilder) Run(ctx context.Context) error { // We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0. var counter uint64 = 1 - // TODO(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that - // WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter - // could be replaced with WriteBatch in the code - tmpWriter := NewTxnWriter(tmpDB) + tmpWriter := tmpDB.NewManagedWriteBatch() stream := pstore.NewStreamAt(r.startTs) stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr) stream.Prefix = r.prefix @@ -650,7 +647,7 @@ func (r *rebuilder) Run(ctx context.Context) error { r.attr, time.Since(start)) }() - writer := NewTxnWriter(pstore) + writer := pstore.NewManagedWriteBatch() tmpStream := tmpDB.NewStreamAt(counter) tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr) tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { @@ -669,6 +666,8 @@ func (r *rebuilder) Run(ctx context.Context) error { return &bpb.KVList{Kv: kvs}, nil } tmpStream.Send = func(kvList *bpb.KVList) error { + // TODO (Anurag): Instead of calling SetEntryAt everytime, we can filter KVList and call Write only once. + // SetEntryAt requries lock for every entry, whereas Write reduces lock contention. for _, kv := range kvList.Kv { if len(kv.Value) == 0 { continue @@ -676,7 +675,8 @@ func (r *rebuilder) Run(ctx context.Context) error { // We choose to write the PL at r.startTs, so it won't be read by txns, // which occurred before this schema mutation. - if err := writer.SetAt(kv.Key, kv.Value, BitCompletePosting, r.startTs); err != nil { + e := &badger.Entry{Key: kv.Key, Value: kv.Value, UserMeta: BitCompletePosting} + if err := writer.SetEntryAt(e.WithDiscard(), r.startTs); err != nil { return errors.Wrap(err, "error in writing index to pstore") } } diff --git a/posting/writer_test.go b/posting/writer_test.go new file mode 100644 index 00000000000..3518943379d --- /dev/null +++ b/posting/writer_test.go @@ -0,0 +1,240 @@ +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package posting + +import ( + "io/ioutil" + "math" + "os" + "sync" + "testing" + + "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2/options" + bpb "github.com/dgraph-io/badger/v2/pb" + "github.com/stretchr/testify/require" +) + +var val = make([]byte, 128) + +func BenchmarkWriter(b *testing.B) { + createKVList := func() bpb.KVList { + var KVList bpb.KVList + for i := 0; i < 5000000; i++ { + n := &bpb.KV{Key: []byte(string(i)), Value: val, Version: 5} + KVList.Kv = append(KVList.Kv, n) + } + return KVList + } + + // Creates separate writer for each thread + writeInBadgerMThreadsB := func(db *badger.DB, KVList *bpb.KVList, wg *sync.WaitGroup) { + defer wg.Done() + wb := db.NewManagedWriteBatch() + if err := wb.Write(KVList); err != nil { + panic(err) + } + require.NoError(b, wb.Flush()) + + } + + // Resuses one writer for all threads + writeInBadgerMThreadsW := func(wb *badger.WriteBatch, KVList *bpb.KVList, wg *sync.WaitGroup) { + defer wg.Done() + + if err := wb.Write(KVList); err != nil { + panic(err) + } + + } + // Creates separate writer for each thread + writeInBadgerSingleThreadB := func(db *badger.DB, KVList *bpb.KVList) { + wb := db.NewManagedWriteBatch() + if err := wb.Write(KVList); err != nil { + panic(err) + } + require.NoError(b, wb.Flush()) + + } + // Resuses one writer for all threads + writeInBadgerSingleThreadW := func(wb *badger.WriteBatch, KVList *bpb.KVList) { + if err := wb.Write(KVList); err != nil { + panic(err) + } + + } + + dbOpts := badger.DefaultOptions(""). + WithLogger(nil). + WithSyncWrites(false). + WithNumVersionsToKeep(math.MaxInt64). + WithCompression(options.None) + + KVList := createKVList() + + // Vanilla TxnWriter + b.Run("TxnWriter", func(b *testing.B) { + tmpIndexDir, err := ioutil.TempDir("", "dgraph") + require.NoError(b, err) + defer os.RemoveAll(tmpIndexDir) + + dbOpts.Dir = tmpIndexDir + dbOpts.ValueDir = tmpIndexDir + var db, err2 = badger.OpenManaged(dbOpts) + require.NoError(b, err2) + defer db.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w := NewTxnWriter(db) + for _, typ := range KVList.Kv { + k := typ.Key + v := typ.Value + err := w.SetAt(k, v, BitSchemaPosting, 1) + require.NoError(b, err) + } + require.NoError(b, w.Flush()) + + } + }) + // Single threaded BatchWriter + b.Run("WriteBatch1", func(b *testing.B) { + tmpIndexDir, err := ioutil.TempDir("", "dgraph") + require.NoError(b, err) + defer os.RemoveAll(tmpIndexDir) + + dbOpts.Dir = tmpIndexDir + dbOpts.ValueDir = tmpIndexDir + + var db, err2 = badger.OpenManaged(dbOpts) + require.NoError(b, err2) + defer db.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wb := db.NewManagedWriteBatch() + if err := wb.Write(&KVList); err != nil { + panic(err) + } + require.NoError(b, wb.Flush()) + } + }) + // Multi threaded Batchwriter with thread contention in WriteBatch + b.Run("WriteBatchMultThreadDiffWB", func(b *testing.B) { + tmpIndexDir, err := ioutil.TempDir("", "dgraph") + require.NoError(b, err) + defer os.RemoveAll(tmpIndexDir) + + dbOpts.Dir = tmpIndexDir + dbOpts.ValueDir = tmpIndexDir + + var db, err2 = badger.OpenManaged(dbOpts) + require.NoError(b, err2) + defer db.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + wg.Add(5) + + go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[:1000000]}, &wg) + go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[1000001:2000000]}, &wg) + go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[2000001:3000000]}, &wg) + go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[3000001:4000000]}, &wg) + go writeInBadgerMThreadsB(db, &bpb.KVList{Kv: KVList.Kv[4000001:]}, &wg) + wg.Wait() + + } + }) + // Multi threaded Batchwriter with thread contention in SetEntry + b.Run("WriteBatchMultThreadSameWB", func(b *testing.B) { + tmpIndexDir, err := ioutil.TempDir("", "dgraph") + require.NoError(b, err) + defer os.RemoveAll(tmpIndexDir) + + dbOpts.Dir = tmpIndexDir + dbOpts.ValueDir = tmpIndexDir + + var db, err2 = badger.OpenManaged(dbOpts) + require.NoError(b, err2) + defer db.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + wg.Add(5) + wb := db.NewManagedWriteBatch() + go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[:1000000]}, &wg) + go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[1000001:2000000]}, &wg) + go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[2000001:3000000]}, &wg) + go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[3000001:4000000]}, &wg) + go writeInBadgerMThreadsW(wb, &bpb.KVList{Kv: KVList.Kv[4000001:]}, &wg) + + wg.Wait() + require.NoError(b, wb.Flush()) + } + }) + b.Run("WriteBatchSingleThreadDiffWB", func(b *testing.B) { + tmpIndexDir, err := ioutil.TempDir("", "dgraph") + require.NoError(b, err) + defer os.RemoveAll(tmpIndexDir) + + dbOpts.Dir = tmpIndexDir + dbOpts.ValueDir = tmpIndexDir + + var db, err2 = badger.OpenManaged(dbOpts) + require.NoError(b, err2) + defer db.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[:1000000]}) + writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[1000001:2000000]}) + writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[2000001:3000000]}) + writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[3000001:4000000]}) + writeInBadgerSingleThreadB(db, &bpb.KVList{Kv: KVList.Kv[4000001:]}) + } + }) + b.Run("WriteBatchSingleThreadSameWB", func(b *testing.B) { + tmpIndexDir, err := ioutil.TempDir("", "dgraph") + require.NoError(b, err) + defer os.RemoveAll(tmpIndexDir) + + dbOpts.Dir = tmpIndexDir + dbOpts.ValueDir = tmpIndexDir + + var db, err2 = badger.OpenManaged(dbOpts) + require.NoError(b, err2) + defer db.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wb := db.NewManagedWriteBatch() + writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[:1000000]}) + writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[1000001:2000000]}) + writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[2000001:3000000]}) + writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[3000001:4000000]}) + writeInBadgerSingleThreadW(wb, &bpb.KVList{Kv: KVList.Kv[4000001:]}) + require.NoError(b, wb.Flush()) + } + }) +} diff --git a/worker/draft.go b/worker/draft.go index 5a390facb8f..4398ca98550 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -711,6 +711,7 @@ func (n *node) processApplyCh() { } } +// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused. func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error { // First let's commit all mutations to disk. writer := posting.NewTxnWriter(pstore) diff --git a/worker/snapshot.go b/worker/snapshot.go index 7cd78948414..518f8382abd 100644 --- a/worker/snapshot.go +++ b/worker/snapshot.go @@ -65,7 +65,7 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) { writer = sw } else { - writer = posting.NewTxnWriter(pstore) + writer = pstore.NewManagedWriteBatch() } // We can use count to check the number of posting lists returned in tests.