Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace TxnWriter with WriteBatch #5007

Merged
merged 56 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
666bc77
Added batchWrite in index.go
all-seeing-code Mar 23, 2020
55cfbad
Add batchWriter where the data is published to pStore and removed the…
all-seeing-code Mar 23, 2020
54f6e2b
Merge branch 'master' into anurags92/replace-txnwrite
all-seeing-code Mar 23, 2020
8eaecac
Code clean-up
all-seeing-code Mar 24, 2020
a07f87d
Edited comments on changes
all-seeing-code Mar 24, 2020
5a6985c
Addressed golangci comments
all-seeing-code Mar 24, 2020
541945d
Added benchmark file
all-seeing-code Mar 30, 2020
5fb9809
Added function to time BatchWrite
all-seeing-code Mar 30, 2020
3252046
Code cleanup
all-seeing-code Mar 30, 2020
528eca0
Bug Fix
all-seeing-code Mar 30, 2020
258173b
Remove glog initialization
all-seeing-code Mar 30, 2020
8bef464
Removed unused flags
all-seeing-code Mar 31, 2020
465a1b8
Merged benchamrk functions in one
all-seeing-code Mar 31, 2020
f289f58
Minor edits
all-seeing-code Mar 31, 2020
2355bfb
Minor edits again
all-seeing-code Mar 31, 2020
11783f4
Changed go.mod file to point to local bager
all-seeing-code Apr 6, 2020
70b7c59
Replace txnWriter with WriteBatch which takes different versions
all-seeing-code Apr 6, 2020
7379274
Temporary changes
all-seeing-code Apr 6, 2020
503d77a
Test Write Batch
all-seeing-code Apr 6, 2020
4a91d0b
Add Write Batch at one instance
all-seeing-code Apr 7, 2020
fce4ca1
Use batchset instead of txnwriter
jarifibrahim Apr 8, 2020
ea3582a
Fix go.mod
jarifibrahim Apr 8, 2020
1716481
Merge from Ibrahim's branch
all-seeing-code Apr 9, 2020
859f270
Merge remote-tracking branch 'origin/master' into anurags92/replace-t…
all-seeing-code Apr 9, 2020
20cda18
Merged Batchset with tests and benchmarks
all-seeing-code Apr 9, 2020
7f4112b
Reduce the numbers of kv pairs
all-seeing-code Apr 9, 2020
85724cd
Add wg Wait
all-seeing-code Apr 13, 2020
e355dce
merge with master
all-seeing-code Apr 23, 2020
a976e0a
resolve merge conflicts
all-seeing-code Apr 23, 2020
022a9cb
resolve merge conflicts again
all-seeing-code Apr 23, 2020
d72ba06
revert back to master for index.go
all-seeing-code Apr 23, 2020
f5fd378
revert index_test.go to master
all-seeing-code Apr 23, 2020
074bcb3
Use new batch writer api
all-seeing-code Apr 24, 2020
46074c5
Fix minor issue
Apr 28, 2020
8ee34e6
Update badger
Apr 28, 2020
26e47cc
Merge branch 'master' into anurags92/replace-txnwrite
Apr 28, 2020
8fdd64c
Merge branch 'master' into anurags92/replace-txnwrite
Apr 29, 2020
0b7e39a
Update badger
Apr 30, 2020
2818574
Update badger
Apr 30, 2020
90e0b22
Add batchwriter inplace of txn writer
all-seeing-code May 3, 2020
9b39c31
Add write batch in snapshot.go
all-seeing-code May 4, 2020
b9c3aa8
Defined new struct to handle interface definition
all-seeing-code May 4, 2020
baf5eaf
Modify writer_test.go to account for new API
all-seeing-code May 4, 2020
af79121
Code clean-up
all-seeing-code May 4, 2020
59f29d1
Add Write API for batch writer inside a function which accepts KVs list
all-seeing-code May 4, 2020
c5ee1e6
bugfixes
all-seeing-code May 4, 2020
0778ce7
Add multi-threaded version of batchwrite for benchmarking
all-seeing-code May 4, 2020
014e069
Address minor comments on PR
all-seeing-code May 4, 2020
cb8564f
Split entry in two lines
all-seeing-code May 4, 2020
12393ee
Use Write API from badger
all-seeing-code May 4, 2020
d21e7cf
Temp changes to run benchmarks
all-seeing-code May 4, 2020
4a3e5fb
Add a TODO and update writer_test.go
all-seeing-code May 5, 2020
74136f0
Add single threaded version to tests to emulate stream behaviour
all-seeing-code May 5, 2020
bbf99d1
Remove wait groups from dingle threaded version
all-seeing-code May 5, 2020
722db3c
Minor fixes
all-seeing-code May 6, 2020
78deec0
Update badger and edit comments in test file
all-seeing-code May 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,10 @@ func (r *rebuilder) Run(ctx context.Context) error {
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

// TODO(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that
// WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter
// could be replaced with WriteBatch in the code
// WriteBatch can not be used here because it doesn't have an API to allow writing
// multiple versions. We wish to store same keys with diff version/timestamp to
// ensure that we get all of them back when doing roll-up. WriteBatch can only be
// used when we want to write all txns at the same timestamp.
tmpWriter := NewTxnWriter(tmpDB)
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
Expand Down Expand Up @@ -644,7 +645,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
r.attr, time.Since(start))
}()

writer := NewTxnWriter(pstore)
batchWriter := pstore.NewWriteBatchAt(r.startTs)
tmpStream := tmpDB.NewStreamAt(counter)
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr)
tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
Expand All @@ -670,7 +671,8 @@ func (r *rebuilder) Run(ctx context.Context) error {

// We choose to write the PL at r.startTs, so it won't be read by txns,
// which occurred before this schema mutation.
if err := writer.SetAt(kv.Key, kv.Value, BitCompletePosting, r.startTs); err != nil {
e := &badger.Entry{Key: kv.Key, Value: kv.Value, UserMeta: BitCompletePosting}
if err := batchWriter.SetEntry(e); err != nil {
return errors.Wrap(err, "error in writing index to pstore")
}
}
Expand All @@ -682,7 +684,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n", r.attr)
return writer.Flush()
return batchWriter.Flush()
}

// IndexRebuild holds the info needed to initiate a rebuilt of the indices.
Expand Down
89 changes: 89 additions & 0 deletions posting/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2019 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package posting

import (
"fmt"
"io/ioutil"
"math"
"testing"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/dgraph/x"
)

type kv struct {
key []byte
value []byte
}

var tmpIndexDir, err = ioutil.TempDir("", "dgraph_index_")

var dbOpts = badger.DefaultOptions(tmpIndexDir).
WithSyncWrites(false).
WithNumVersionsToKeep(math.MaxInt64).
WithLogger(&x.ToGlog{}).
all-seeing-code marked this conversation as resolved.
Show resolved Hide resolved
WithCompression(options.None).
all-seeing-code marked this conversation as resolved.
Show resolved Hide resolved
WithEventLogging(false).
WithLogRotatesToFlush(10).
WithMaxCacheSize(50) // TODO(Aman): Disable cache altogether

var db, err2 = badger.OpenManaged(dbOpts)

func createKVList() []kv {
var KVList = []kv{}
for i := 0; i < 50000; i++ {
n := kv{key: []byte(string(i)), value: []byte("Check Value")}
KVList = append(KVList, n)
}
return KVList
}

func BenchmarkTxnWriter(b *testing.B) {
KVList := createKVList()
w := NewTxnWriter(db)
all-seeing-code marked this conversation as resolved.
Show resolved Hide resolved
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, typ := range KVList {
k := typ.key
v := typ.value
x.Check(w.SetAt(k, v, BitSchemaPosting, 1))
}
if err := w.Flush(); err != nil {
fmt.Printf("Got error while flushing txnwriter: %v\n", err)
}
}

}

func BenchmarkWriteBatch(b *testing.B) {
KVList := createKVList()
batch := db.NewWriteBatchAt(1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, typ := range KVList {
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved
k := typ.key
v := typ.value
x.Check(batch.Set(k, v))
}
if err := batch.Flush(); err != nil {
all-seeing-code marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("Got error while flushing batch: %v\n", err)
}
}
mangalaman93 marked this conversation as resolved.
Show resolved Hide resolved

}
1 change: 1 addition & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ func (n *node) processApplyCh() {
}
}

// TODO(Anurag): Are we using pkey? Remove if unused.
func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
// First let's commit all mutations to disk.
writer := posting.NewTxnWriter(pstore)
Expand Down