Skip to content

Commit

Permalink
fix(tests): Writebatch, Stream, Vlog tests (hypermodeinc#1577)
Browse files Browse the repository at this point in the history
This PR fixes the following issues/tests
 - Deadlock in writes batch - Use atomic to set value of `writebatch.error`
 - Vlog Truncate test - Fix issues with empty memtables
 - Test options - Set memtable size.
 - Compaction tests - Acquire lock before updating level tables
 - Vlog Write - Truncate the file size if the transaction cannot fit in vlog size
 - TestPersistLFDiscardStats - Set numLevelZeroTables=1 to force compaction.

This PR also fixes the failing bank test by adding an index cache to the bank test.
  • Loading branch information
Ibrahim Jarif authored Nov 2, 2020
1 parent 45bca18 commit 0506f78
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 40 deletions.
3 changes: 2 additions & 1 deletion badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ func runTest(cmd *cobra.Command, args []string) error {
WithNumVersionsToKeep(int(math.MaxInt32)).
WithValueThreshold(1). // Make all values go to value log
WithCompression(options.ZSTD).
WithBlockCacheSize(10 << 20)
WithBlockCacheSize(10 << 20).
WithIndexCacheSize(10 << 20)

if verbose {
opts = opts.WithLoggingLevel(badger.DEBUG)
Expand Down
36 changes: 17 additions & 19 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"sync"
"sync/atomic"

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
Expand All @@ -30,7 +31,7 @@ type WriteBatch struct {
txn *Txn
db *DB
throttle *y.Throttle
err error
err atomic.Value

isManaged bool
commitTs uint64
Expand Down Expand Up @@ -88,13 +89,10 @@ func (wb *WriteBatch) callback(err error) {
if err == nil {
return
}

wb.Lock()
defer wb.Unlock()
if wb.err != nil {
if err := wb.Error(); err != nil {
return
}
wb.err = err
wb.err.Store(err)
}

func (wb *WriteBatch) Write(kvList *pb.KVList) error {
Expand Down Expand Up @@ -136,7 +134,7 @@ func (wb *WriteBatch) handleEntry(e *Entry) error {
// This time the error must not be ErrTxnTooBig, otherwise, we make the
// error permanent.
if err := wb.txn.SetEntry(e); err != nil {
wb.err = err
wb.err.Store(err)
return err
}
return nil
Expand Down Expand Up @@ -173,28 +171,28 @@ func (wb *WriteBatch) Delete(k []byte) error {
return err
}
if err := wb.txn.Delete(k); err != nil {
wb.err = err
wb.err.Store(err)
return err
}
return nil
}

// Caller to commit must hold a write lock.
func (wb *WriteBatch) commit() error {
if wb.err != nil {
return wb.err
if err := wb.Error(); err != nil {
return err
}
if wb.finished {
return y.ErrCommitAfterFinish
}
if err := wb.throttle.Do(); err != nil {
wb.err = err
return wb.err
wb.err.Store(err)
return err
}
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, wb.isManaged)
wb.txn.commitTs = wb.commitTs
return wb.err
return wb.Error()
}

// Flush must be called at the end to ensure that any pending writes get committed to Badger. Flush
Expand All @@ -211,18 +209,18 @@ func (wb *WriteBatch) Flush() error {
wb.Unlock()

if err := wb.throttle.Finish(); err != nil {
if wb.err != nil {
return errors.Errorf("wb.err: %s err: %s", wb.err, err)
if wb.Error() != nil {
return errors.Errorf("wb.err: %s err: %s", wb.Error(), err)
}
return err
}

return wb.err
return wb.Error()
}

// Error returns any errors encountered so far. No commits would be run once an error is detected.
func (wb *WriteBatch) Error() error {
wb.Lock()
defer wb.Unlock()
return wb.err
// If the interface conversion fails, the err will be nil.
err, _ := wb.err.Load().(error)
return err
}
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func Open(opt Options) (*DB, error) {

if opt.IndexCacheSize > 0 {
// Index size is around 5% of the table size.
indexSz := int64(float64(opt.BaseTableSize) * 0.05)
indexSz := int64(float64(opt.MemTableSize) * 0.05)
numInCache := opt.IndexCacheSize / indexSz
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
Expand Down
1 change: 1 addition & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (s *DB) validate() error { return s.lc.validate() }

func getTestOptions(dir string) Options {
opt := DefaultOptions(dir).
WithMemTableSize(1 << 15).
WithBaseTableSize(1 << 15). // Force more compaction.
WithBaseLevelSize(4 << 15). // Force more compaction.
WithSyncWrites(false)
Expand Down
2 changes: 2 additions & 0 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ func createAndOpen(db *DB, td []keyValVersion, level int) {
}); err != nil {
panic(err)
}
db.lc.levels[level].Lock()
// Add table to the given level.
db.lc.levels[level].tables = append(db.lc.levels[level].tables, tab)
db.lc.levels[level].Unlock()
}

type keyValVersion struct {
Expand Down
6 changes: 6 additions & 0 deletions memtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func (db *DB) openMemTables(opt Options) error {
if err != nil {
return y.Wrapf(err, "while opening fid: %d", fid)
}
// If this memtable is empty we don't need to add it. This is a
// memtable that was completely truncated.
if mt.sl.Empty() {
mt.DecrRef()
continue
}
// These should no longer be written to. So, make them part of the imm.
db.imm = append(db.imm, mt)
}
Expand Down
31 changes: 17 additions & 14 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,23 @@ func TestStreamWriter5(t *testing.T) {
func TestStreamWriter6(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
list := &pb.KVList{}
str := []string{"a", "a", "b", "b", "c", "c"}
ver := 1
str := []string{"a", "b", "c"}
ver := uint64(0)
// The baseTable size is 32 KB (1<<15) and the max table size for level
// 6 table is 1 mb (look at newWrite function). Since all the tables
// will be written to level 6, we need to insert at least 1 mb of data.
// Setting keycount below 32 would cause this test to fail.
keyCount := 40
for i := range str {
kv := &pb.KV{
Key: bytes.Repeat([]byte(str[i]), int(db.opt.BaseTableSize)),
Value: []byte("val"),
Version: uint64(ver),
for j := 0; j < keyCount; j++ {
ver++
kv := &pb.KV{
Key: bytes.Repeat([]byte(str[i]), int(db.opt.BaseTableSize)),
Value: []byte("val"),
Version: uint64(keyCount - j),
}
list.Kv = append(list.Kv, kv)
}
list.Kv = append(list.Kv, kv)
ver = (ver + 1) % 2
}

// list has 3 pairs for equal keys. Since each Key has size equal to MaxTableSize
Expand All @@ -347,12 +354,8 @@ func TestStreamWriter6(t *testing.T) {
tables := db.Tables()
require.Equal(t, 3, len(tables), "Count of tables not matching")
for _, tab := range tables {
if tab.Level > 0 {
require.Equal(t, 2, int(tab.KeyCount),
fmt.Sprintf("failed for level: %d", tab.Level))
} else {
require.Equal(t, 1, int(tab.KeyCount)) // level 0 table will have head key
}
require.Equal(t, keyCount, int(tab.KeyCount),
fmt.Sprintf("failed for level: %d", tab.Level))
}
require.NoError(t, db.Close())
db, err := Open(db.opt)
Expand Down
3 changes: 2 additions & 1 deletion value.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,8 +798,9 @@ func (vlog *valueLog) write(reqs []*request) error {

n := uint32(buf.Len())
endOffset := atomic.AddUint32(&vlog.writableLogOffset, n)
// Increase the file size if we cannot accommodate this entry.
if int(endOffset) >= len(curlf.Data) {
return y.Wrapf(ErrTxnTooBig, "endOffset: %d len: %d\n", endOffset, len(curlf.Data))
curlf.Truncate(int64(endOffset))
}

start := int(endOffset - n)
Expand Down
8 changes: 4 additions & 4 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,14 @@ func TestValueGC4(t *testing.T) {
}

func TestPersistLFDiscardStats(t *testing.T) {
// TODO(ibrahim): This test is failing because compactions are not
// happening and so no discard stats are generated.
t.Skip()
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opt := getTestOptions(dir)
// Force more compaction by reducing the number of L0 tables.
opt.NumLevelZeroTables = 1
opt.ValueLogFileSize = 1 << 20
// avoid compaction on close, so that discard map remains same
// Avoid compaction on close so that the discard map remains the same.
opt.CompactL0OnClose = false

db, err := Open(opt)
Expand Down Expand Up @@ -960,6 +959,7 @@ func BenchmarkReadWrite(b *testing.B) {
}

// Regression test for https://github.com/dgraph-io/badger/issues/817
// This test verifies if fully corrupted memtables are deleted on reopen.
func TestValueLogTruncate(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
Expand Down

0 comments on commit 0506f78

Please sign in to comment.