Skip to content
2 changes: 1 addition & 1 deletion backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func TestBackupLoadIncremental(t *testing.T) {
if err := txn.SetEntry(entry); err != nil {
return err
}
updates[i] = bitDiscardEarlierVersions
updates[i] = BitDiscardEarlierVersions
}
return nil
})
Expand Down
134 changes: 112 additions & 22 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,16 +763,9 @@ var requestPool = sync.Pool{
}

func (db *DB) writeToLSM(b *request) error {
// We should check the length of b.Prts and b.Entries only when badger is not
// running in InMemory mode. In InMemory mode, we don't write anything to the
// value log and that's why the length of b.Ptrs will always be zero.
if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
}

for i, entry := range b.Entries {
var err error
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
// Will include deletion / tombstone case.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Expand Down Expand Up @@ -818,10 +811,13 @@ func (db *DB) writeRequests(reqs []*request) error {
}
}
db.opt.Debugf("writeRequests called. Writing to value log")
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
if !db.opt.managedTxns {
// Don't do value log writes in managed mode.
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}
}

db.opt.Debugf("Sending updates to subscribers")
Expand All @@ -834,6 +830,7 @@ func (db *DB) writeRequests(reqs []*request) error {
}
count += len(b.Entries)
var i uint64
var err error
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
Expand Down Expand Up @@ -1010,16 +1007,61 @@ func (db *DB) ensureRoomForWrite() error {
}
}

func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
if !db.opt.managedTxns {
panic("Handover Skiplist is only available in managed mode.")
}
db.lock.Lock()
defer db.lock.Unlock()

// If we have some data in db.mt, we should push that first, so the ordering of writes is
// maintained.
if !db.mt.sl.Empty() {
sz := db.mt.sl.MemSize()
db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz)
var err error
select {
case db.flushChan <- flushTask{mt: db.mt}:
db.imm = append(db.imm, db.mt)
db.mt, err = db.newMemTable()
if err != nil {
return y.Wrapf(err, "cannot push current memtable")
}
default:
return errNoRoom
}
}

mt := &memTable{sl: skl}
select {
case db.flushChan <- flushTask{mt: mt, cb: callback}:
db.imm = append(db.imm, mt)
return nil
default:
return errNoRoom
}
}

func arenaSize(opt Options) int64 {
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
}

func (db *DB) NewSkiplist() *skl.Skiplist {
return skl.NewSkiplist(arenaSize(db.opt))
}

// buildL0Table builds a new table from the memtable.
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
iter := ft.mt.sl.NewIterator()
var iter y.Iterator
if ft.itr != nil {
iter = ft.itr
} else {
iter = ft.mt.sl.NewUniIterator(false)
}
defer iter.Close()

b := table.NewTableBuilder(bopts)
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
for iter.Rewind(); iter.Valid(); iter.Next() {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
Expand All @@ -1035,16 +1077,14 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {

type flushTask struct {
mt *memTable
cb func()
itr y.Iterator
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed.
if ft.mt.sl.Empty() {
return nil
}

// ft.mt could be nil with ft.itr being the valid field.
bopts := buildTableOptions(db)
builder := buildL0Table(ft, bopts)
defer builder.Close()
Expand Down Expand Up @@ -1080,11 +1120,52 @@ func (db *DB) handleFlushTask(ft flushTask) error {
func (db *DB) flushMemtable(lc *z.Closer) error {
defer lc.Done()

var sz int64
var itrs []y.Iterator
var mts []*memTable
var cbs []func()
slurp := func() {
for {
select {
case more := <-db.flushChan:
if more.mt == nil {
return
}
sl := more.mt.sl
itrs = append(itrs, sl.NewUniIterator(false))
mts = append(mts, more.mt)
cbs = append(cbs, more.cb)

sz += sl.MemSize()
if sz > db.opt.MemTableSize {
return
}
default:
return
}
}
}

for ft := range db.flushChan {
if ft.mt == nil {
// We close db.flushChan now, instead of sending a nil ft.mt.
continue
}
sz = ft.mt.sl.MemSize()
// Reset of itrs, mts etc. is being done below.
y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0)
itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
mts = append(mts, ft.mt)
cbs = append(cbs, ft.cb)

// Pick more memtables, so we can really fill up the L0 table.
slurp()

// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
ft.mt = nil
ft.itr = table.NewMergeIterator(itrs, false)
ft.cb = nil

for {
err := db.handleFlushTask(ft)
if err == nil {
Expand All @@ -1095,17 +1176,26 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(ft.mt == db.imm[0])
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
for _, mt := range mts {
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
}
db.lock.Unlock()

for _, cb := range cbs {
if cb != nil {
cb()
}
}
break
}
// Encountered error. Retry indefinitely.
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
time.Sleep(time.Second)
}
// Reset everything.
itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
y.Check2(rand.Read(value))
st := 0

buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
Expand Down
2 changes: 1 addition & 1 deletion iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (item *Item) IsDeletedOrExpired() bool {
// DiscardEarlierVersions returns whether the item was created with the
// option to discard earlier versions of a key when multiple are available.
func (item *Item) DiscardEarlierVersions() bool {
return item.meta&bitDiscardEarlierVersions > 0
return item.meta&BitDiscardEarlierVersions > 0
}

func (item *Item) yieldItemValue() ([]byte, func(), error) {
Expand Down
4 changes: 2 additions & 2 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
}
lastKey = y.SafeCopy(lastKey, it.Key())
numVersions = 0
firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0

if len(tableKr.left) == 0 {
tableKr.left = y.SafeCopy(tableKr.left, it.Key())
Expand Down Expand Up @@ -753,7 +753,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
// - The `discardEarlierVersions` bit is set OR
// - We've already processed `NumVersionsToKeep` number of versions
// (including the current item being processed)
lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
numVersions == s.kv.opt.NumVersionsToKeep

if isExpired || lastValidVersion {
Expand Down
34 changes: 17 additions & 17 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,11 +707,11 @@ func TestDiscardFirstVersion(t *testing.T) {

runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l0 := []keyValVersion{{"foo", "bar", 1, 0}}
l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}}
l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}}
l02 := []keyValVersion{{"foo", "bar", 3, 0}}
l03 := []keyValVersion{{"foo", "bar", 4, 0}}
l04 := []keyValVersion{{"foo", "bar", 9, 0}}
l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}}
l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}}

// Level 0 has all the tables.
createAndOpen(db, l0, 0)
Expand Down Expand Up @@ -742,11 +742,11 @@ func TestDiscardFirstVersion(t *testing.T) {
// - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions"
// marker so IT WILL BE REMOVED.
ExpectedKeys := []keyValVersion{
{"foo", "bar", 10, bitDiscardEarlierVersions},
{"foo", "bar", 10, BitDiscardEarlierVersions},
{"foo", "bar", 9, 0},
{"foo", "bar", 4, 0},
{"foo", "bar", 3, 0},
{"foo", "bar", 2, bitDiscardEarlierVersions}}
{"foo", "bar", 2, BitDiscardEarlierVersions}}

getAllAndCheck(t, db, ExpectedKeys)
})
Expand Down Expand Up @@ -1060,15 +1060,15 @@ func TestSameLevel(t *testing.T) {
opt.LmaxCompaction = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
l6 := []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
}
l61 := []keyValVersion{
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
}
l62 := []keyValVersion{
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
}
createAndOpen(db, l6, 6)
Expand All @@ -1077,11 +1077,11 @@ func TestSameLevel(t *testing.T) {
require.NoError(t, db.lc.validate())

getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
})

Expand All @@ -1097,11 +1097,11 @@ func TestSameLevel(t *testing.T) {
db.SetDiscardTs(3)
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
})

Expand All @@ -1118,9 +1118,9 @@ func TestSameLevel(t *testing.T) {
cdef.t.baseLevel = 1
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
getAllAndCheck(t, db, []keyValVersion{
{"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
{"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
{"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
{"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
require.NoError(t, db.lc.validate())
})
}
Expand Down Expand Up @@ -1186,7 +1186,7 @@ func TestStaleDataCleanup(t *testing.T) {
for i := count; i > 0; i-- {
var meta byte
if i == 0 {
meta = bitDiscardEarlierVersions
meta = BitDiscardEarlierVersions
}
b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0)
}
Expand Down
Loading