Skip to content

Commit ff5c908

Browse files
Revert "feat(Skiplist): Introduce a way to hand over skiplists to Badger (#1696)"
This reverts commit b21f591.
1 parent 2b93727 commit ff5c908

File tree

15 files changed

+47
-216
lines changed

15 files changed

+47
-216
lines changed

backup_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func TestBackupLoadIncremental(t *testing.T) {
446446
if err := txn.SetEntry(entry); err != nil {
447447
return err
448448
}
449-
updates[i] = BitDiscardEarlierVersions
449+
updates[i] = bitDiscardEarlierVersions
450450
}
451451
return nil
452452
})

db.go

Lines changed: 18 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -758,11 +758,16 @@ var requestPool = sync.Pool{
758758
}
759759

760760
func (db *DB) writeToLSM(b *request) error {
761-
db.lock.RLock()
762-
defer db.lock.RUnlock()
761+
// We should check the length of b.Prts and b.Entries only when badger is not
762+
// running in InMemory mode. In InMemory mode, we don't write anything to the
763+
// value log and that's why the length of b.Ptrs will always be zero.
764+
if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
765+
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
766+
}
767+
763768
for i, entry := range b.Entries {
764769
var err error
765-
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
770+
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
766771
// Will include deletion / tombstone case.
767772
err = db.mt.Put(entry.Key,
768773
y.ValueStruct{
@@ -824,7 +829,6 @@ func (db *DB) writeRequests(reqs []*request) error {
824829
}
825830
count += len(b.Entries)
826831
var i uint64
827-
var err error
828832
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
829833
i++
830834
if i%100 == 0 {
@@ -1003,62 +1007,16 @@ func (db *DB) ensureRoomForWrite() error {
10031007
}
10041008
}
10051009

1006-
func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
1007-
if !db.opt.managedTxns {
1008-
panic("Handover Skiplist is only available in managed mode.")
1009-
}
1010-
db.lock.Lock()
1011-
defer db.lock.Unlock()
1012-
1013-
// If we have some data in db.mt, we should push that first, so the ordering of writes is
1014-
// maintained.
1015-
if !db.mt.sl.Empty() {
1016-
sz := db.mt.sl.MemSize()
1017-
db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz)
1018-
var err error
1019-
select {
1020-
case db.flushChan <- flushTask{mt: db.mt}:
1021-
db.imm = append(db.imm, db.mt)
1022-
db.mt, err = db.newMemTable()
1023-
if err != nil {
1024-
return y.Wrapf(err, "cannot push current memtable")
1025-
}
1026-
default:
1027-
return errNoRoom
1028-
}
1029-
}
1030-
1031-
mt := &memTable{sl: skl}
1032-
1033-
select {
1034-
case db.flushChan <- flushTask{mt: mt, cb: callback}:
1035-
db.imm = append(db.imm, mt)
1036-
return nil
1037-
default:
1038-
return errNoRoom
1039-
}
1040-
}
1041-
10421010
func arenaSize(opt Options) int64 {
10431011
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
10441012
}
10451013

1046-
func (db *DB) NewSkiplist() *skl.Skiplist {
1047-
return skl.NewSkiplist(arenaSize(db.opt))
1048-
}
1049-
10501014
// buildL0Table builds a new table from the memtable.
10511015
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
1052-
var iter y.Iterator
1053-
if ft.itr != nil {
1054-
iter = ft.itr
1055-
} else {
1056-
iter = ft.mt.sl.NewUniIterator(false)
1057-
}
1016+
iter := ft.mt.sl.NewIterator()
10581017
defer iter.Close()
1059-
10601018
b := table.NewTableBuilder(bopts)
1061-
for iter.Rewind(); iter.Valid(); iter.Next() {
1019+
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
10621020
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
10631021
continue
10641022
}
@@ -1074,14 +1032,16 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
10741032

10751033
type flushTask struct {
10761034
mt *memTable
1077-
cb func()
1078-
itr y.Iterator
10791035
dropPrefixes [][]byte
10801036
}
10811037

10821038
// handleFlushTask must be run serially.
10831039
func (db *DB) handleFlushTask(ft flushTask) error {
1084-
// ft.mt could be nil with ft.itr being the valid field.
1040+
// There can be a scenario, when empty memtable is flushed.
1041+
if ft.mt.sl.Empty() {
1042+
return nil
1043+
}
1044+
10851045
bopts := buildTableOptions(db)
10861046
builder := buildL0Table(ft, bopts)
10871047
defer builder.Close()
@@ -1117,52 +1077,11 @@ func (db *DB) handleFlushTask(ft flushTask) error {
11171077
func (db *DB) flushMemtable(lc *z.Closer) error {
11181078
defer lc.Done()
11191079

1120-
var sz int64
1121-
var itrs []y.Iterator
1122-
var mts []*memTable
1123-
var cbs []func()
1124-
slurp := func() {
1125-
for {
1126-
select {
1127-
case more := <-db.flushChan:
1128-
if more.mt == nil {
1129-
return
1130-
}
1131-
sl := more.mt.sl
1132-
itrs = append(itrs, sl.NewUniIterator(false))
1133-
mts = append(mts, more.mt)
1134-
cbs = append(cbs, more.cb)
1135-
1136-
sz += sl.MemSize()
1137-
if sz > db.opt.MemTableSize {
1138-
return
1139-
}
1140-
default:
1141-
return
1142-
}
1143-
}
1144-
}
1145-
11461080
for ft := range db.flushChan {
11471081
if ft.mt == nil {
11481082
// We close db.flushChan now, instead of sending a nil ft.mt.
11491083
continue
11501084
}
1151-
sz = ft.mt.sl.MemSize()
1152-
// Reset of itrs, mts etc. is being done below.
1153-
y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0)
1154-
itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
1155-
mts = append(mts, ft.mt)
1156-
cbs = append(cbs, ft.cb)
1157-
1158-
// Pick more memtables, so we can really fill up the L0 table.
1159-
slurp()
1160-
1161-
// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
1162-
ft.mt = nil
1163-
ft.itr = table.NewMergeIterator(itrs, false)
1164-
ft.cb = nil
1165-
11661085
for {
11671086
err := db.handleFlushTask(ft)
11681087
if err == nil {
@@ -1173,26 +1092,17 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
11731092
// which would arrive here would match db.imm[0], because we acquire a
11741093
// lock over DB when pushing to flushChan.
11751094
// TODO: This logic is dirty AF. Any change and this could easily break.
1176-
for _, mt := range mts {
1177-
y.AssertTrue(mt == db.imm[0])
1178-
db.imm = db.imm[1:]
1179-
mt.DecrRef() // Return memory.
1180-
}
1095+
y.AssertTrue(ft.mt == db.imm[0])
1096+
db.imm = db.imm[1:]
1097+
ft.mt.DecrRef() // Return memory.
11811098
db.lock.Unlock()
11821099

1183-
for _, cb := range cbs {
1184-
if cb != nil {
1185-
cb()
1186-
}
1187-
}
11881100
break
11891101
}
11901102
// Encountered error. Retry indefinitely.
11911103
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
11921104
time.Sleep(time.Second)
11931105
}
1194-
// Reset everything.
1195-
itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0
11961106
}
11971107
return nil
11981108
}

db_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
20882088
y.Check2(rand.Read(value))
20892089
st := 0
20902090

2091-
buf := z.NewBuffer(10<<20, "test")
2091+
buf := z.NewBuffer(10 << 20, "test")
20922092
defer buf.Release()
20932093
for i := 0; i < 1000; i++ {
20942094
key := make([]byte, 8)

iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (item *Item) IsDeletedOrExpired() bool {
146146
// DiscardEarlierVersions returns whether the item was created with the
147147
// option to discard earlier versions of a key when multiple are available.
148148
func (item *Item) DiscardEarlierVersions() bool {
149-
return item.meta&BitDiscardEarlierVersions > 0
149+
return item.meta&bitDiscardEarlierVersions > 0
150150
}
151151

152152
func (item *Item) yieldItemValue() ([]byte, func(), error) {

levels.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
717717
}
718718
lastKey = y.SafeCopy(lastKey, it.Key())
719719
numVersions = 0
720-
firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0
720+
firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
721721

722722
if len(tableKr.left) == 0 {
723723
tableKr.left = y.SafeCopy(tableKr.left, it.Key())
@@ -754,7 +754,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
754754
// - The `discardEarlierVersions` bit is set OR
755755
// - We've already processed `NumVersionsToKeep` number of versions
756756
// (including the current item being processed)
757-
lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
757+
lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
758758
numVersions == s.kv.opt.NumVersionsToKeep
759759

760760
if isExpired || lastValidVersion {

levels_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -724,11 +724,11 @@ func TestDiscardFirstVersion(t *testing.T) {
724724

725725
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
726726
l0 := []keyValVersion{{"foo", "bar", 1, 0}}
727-
l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}}
727+
l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}}
728728
l02 := []keyValVersion{{"foo", "bar", 3, 0}}
729729
l03 := []keyValVersion{{"foo", "bar", 4, 0}}
730730
l04 := []keyValVersion{{"foo", "bar", 9, 0}}
731-
l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}}
731+
l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}}
732732

733733
// Level 0 has all the tables.
734734
createAndOpen(db, l0, 0)
@@ -759,11 +759,11 @@ func TestDiscardFirstVersion(t *testing.T) {
759759
// - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions"
760760
// marker so IT WILL BE REMOVED.
761761
ExpectedKeys := []keyValVersion{
762-
{"foo", "bar", 10, BitDiscardEarlierVersions},
762+
{"foo", "bar", 10, bitDiscardEarlierVersions},
763763
{"foo", "bar", 9, 0},
764764
{"foo", "bar", 4, 0},
765765
{"foo", "bar", 3, 0},
766-
{"foo", "bar", 2, BitDiscardEarlierVersions}}
766+
{"foo", "bar", 2, bitDiscardEarlierVersions}}
767767

768768
getAllAndCheck(t, db, ExpectedKeys)
769769
})
@@ -1077,15 +1077,15 @@ func TestSameLevel(t *testing.T) {
10771077
opt.LmaxCompaction = true
10781078
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
10791079
l6 := []keyValVersion{
1080-
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1080+
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
10811081
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
10821082
}
10831083
l61 := []keyValVersion{
1084-
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1084+
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
10851085
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
10861086
}
10871087
l62 := []keyValVersion{
1088-
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1088+
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
10891089
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
10901090
}
10911091
createAndOpen(db, l6, 6)
@@ -1094,11 +1094,11 @@ func TestSameLevel(t *testing.T) {
10941094
require.NoError(t, db.lc.validate())
10951095

10961096
getAllAndCheck(t, db, []keyValVersion{
1097-
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1097+
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
10981098
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
1099-
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1099+
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
11001100
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
1101-
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1101+
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
11021102
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
11031103
})
11041104

@@ -1114,11 +1114,11 @@ func TestSameLevel(t *testing.T) {
11141114
db.SetDiscardTs(3)
11151115
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
11161116
getAllAndCheck(t, db, []keyValVersion{
1117-
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1117+
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
11181118
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
1119-
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1119+
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
11201120
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
1121-
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1121+
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
11221122
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
11231123
})
11241124

@@ -1135,9 +1135,9 @@ func TestSameLevel(t *testing.T) {
11351135
cdef.t.baseLevel = 1
11361136
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
11371137
getAllAndCheck(t, db, []keyValVersion{
1138-
{"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
1139-
{"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
1140-
{"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
1138+
{"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
1139+
{"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
1140+
{"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
11411141
require.NoError(t, db.lc.validate())
11421142
})
11431143
}
@@ -1203,7 +1203,7 @@ func TestStaleDataCleanup(t *testing.T) {
12031203
for i := count; i > 0; i-- {
12041204
var meta byte
12051205
if i == 0 {
1206-
meta = BitDiscardEarlierVersions
1206+
meta = bitDiscardEarlierVersions
12071207
}
12081208
b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0)
12091209
}

managed_db_test.go

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -771,54 +771,6 @@ func TestWriteBatchDuplicate(t *testing.T) {
771771
})
772772
}
773773

774-
func TestWriteViaSkip(t *testing.T) {
775-
key := func(i int) []byte {
776-
return []byte(fmt.Sprintf("%10d", i))
777-
}
778-
val := func(i int) []byte {
779-
return []byte(fmt.Sprintf("%128d", i))
780-
}
781-
opt := DefaultOptions("")
782-
opt.managedTxns = true
783-
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
784-
s := db.NewSkiplist()
785-
for i := 0; i < 100; i++ {
786-
s.Put(y.KeyWithTs(key(i), math.MaxUint64), y.ValueStruct{Value: val(i)})
787-
}
788-
{
789-
// Update key timestamps by directly changing them in the skiplist.
790-
itr := s.NewUniIterator(false)
791-
defer itr.Close()
792-
itr.Rewind()
793-
for itr.Valid() {
794-
y.SetKeyTs(itr.Key(), 101)
795-
itr.Next()
796-
}
797-
}
798-
799-
// Hand over skiplist to Badger.
800-
require.NoError(t, db.HandoverSkiplist(s, nil))
801-
802-
// Read the data back.
803-
txn := db.NewTransactionAt(101, false)
804-
defer txn.Discard()
805-
itr := txn.NewIterator(DefaultIteratorOptions)
806-
defer itr.Close()
807-
808-
i := 0
809-
for itr.Rewind(); itr.Valid(); itr.Next() {
810-
item := itr.Item()
811-
require.Equal(t, string(key(i)), string(item.Key()))
812-
require.Equal(t, item.Version(), uint64(101))
813-
valcopy, err := item.ValueCopy(nil)
814-
require.NoError(t, err)
815-
require.Equal(t, val(i), valcopy)
816-
i++
817-
}
818-
require.Equal(t, 100, i)
819-
})
820-
}
821-
822774
func TestZeroDiscardStats(t *testing.T) {
823775
N := uint64(10000)
824776
populate := func(t *testing.T, db *DB) {

0 commit comments

Comments
 (0)