Skip to content

Commit b21f591

Browse files
authored
feat(Skiplist): Introduce a way to hand over skiplists to Badger (#1696)
In Dgraph, we already use Raft write-ahead log. Also, when we commit transactions, we update tens of thousands of keys in one go. To optimize this write path, this PR introduces a way to directly hand over Skiplist to Badger, short circuiting Badger's Value Log and WAL. This feature allows Dgraph to generate Skiplists while processing mutations and just hand them over to Badger during commits. It also accepts a callback which can be run when Skiplist is written to disk. This is useful for determining when to create a snapshot in Dgraph.
1 parent 84267c4 commit b21f591

File tree

15 files changed

+220
-51
lines changed

15 files changed

+220
-51
lines changed

backup_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func TestBackupLoadIncremental(t *testing.T) {
446446
if err := txn.SetEntry(entry); err != nil {
447447
return err
448448
}
449-
updates[i] = bitDiscardEarlierVersions
449+
updates[i] = BitDiscardEarlierVersions
450450
}
451451
return nil
452452
})

db.go

Lines changed: 112 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -763,16 +763,9 @@ var requestPool = sync.Pool{
763763
}
764764

765765
func (db *DB) writeToLSM(b *request) error {
766-
// We should check the length of b.Prts and b.Entries only when badger is not
767-
// running in InMemory mode. In InMemory mode, we don't write anything to the
768-
// value log and that's why the length of b.Ptrs will always be zero.
769-
if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
770-
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
771-
}
772-
773766
for i, entry := range b.Entries {
774767
var err error
775-
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
768+
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
776769
// Will include deletion / tombstone case.
777770
err = db.mt.Put(entry.Key,
778771
y.ValueStruct{
@@ -818,10 +811,13 @@ func (db *DB) writeRequests(reqs []*request) error {
818811
}
819812
}
820813
db.opt.Debugf("writeRequests called. Writing to value log")
821-
err := db.vlog.write(reqs)
822-
if err != nil {
823-
done(err)
824-
return err
814+
if !db.opt.managedTxns {
815+
// Don't do value log writes in managed mode.
816+
err := db.vlog.write(reqs)
817+
if err != nil {
818+
done(err)
819+
return err
820+
}
825821
}
826822

827823
db.opt.Debugf("Sending updates to subscribers")
@@ -834,6 +830,7 @@ func (db *DB) writeRequests(reqs []*request) error {
834830
}
835831
count += len(b.Entries)
836832
var i uint64
833+
var err error
837834
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
838835
i++
839836
if i%100 == 0 {
@@ -1010,16 +1007,61 @@ func (db *DB) ensureRoomForWrite() error {
10101007
}
10111008
}
10121009

1010+
func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
1011+
if !db.opt.managedTxns {
1012+
panic("Handover Skiplist is only available in managed mode.")
1013+
}
1014+
db.lock.Lock()
1015+
defer db.lock.Unlock()
1016+
1017+
// If we have some data in db.mt, we should push that first, so the ordering of writes is
1018+
// maintained.
1019+
if !db.mt.sl.Empty() {
1020+
sz := db.mt.sl.MemSize()
1021+
db.opt.Infof("Handover found %d B data in current memtable. Pushing to flushChan.", sz)
1022+
var err error
1023+
select {
1024+
case db.flushChan <- flushTask{mt: db.mt}:
1025+
db.imm = append(db.imm, db.mt)
1026+
db.mt, err = db.newMemTable()
1027+
if err != nil {
1028+
return y.Wrapf(err, "cannot push current memtable")
1029+
}
1030+
default:
1031+
return errNoRoom
1032+
}
1033+
}
1034+
1035+
mt := &memTable{sl: skl}
1036+
select {
1037+
case db.flushChan <- flushTask{mt: mt, cb: callback}:
1038+
db.imm = append(db.imm, mt)
1039+
return nil
1040+
default:
1041+
return errNoRoom
1042+
}
1043+
}
1044+
10131045
func arenaSize(opt Options) int64 {
10141046
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
10151047
}
10161048

1049+
func (db *DB) NewSkiplist() *skl.Skiplist {
1050+
return skl.NewSkiplist(arenaSize(db.opt))
1051+
}
1052+
10171053
// buildL0Table builds a new table from the memtable.
10181054
func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
1019-
iter := ft.mt.sl.NewIterator()
1055+
var iter y.Iterator
1056+
if ft.itr != nil {
1057+
iter = ft.itr
1058+
} else {
1059+
iter = ft.mt.sl.NewUniIterator(false)
1060+
}
10201061
defer iter.Close()
1062+
10211063
b := table.NewTableBuilder(bopts)
1022-
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
1064+
for iter.Rewind(); iter.Valid(); iter.Next() {
10231065
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
10241066
continue
10251067
}
@@ -1035,16 +1077,14 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
10351077

10361078
type flushTask struct {
10371079
mt *memTable
1080+
cb func()
1081+
itr y.Iterator
10381082
dropPrefixes [][]byte
10391083
}
10401084

10411085
// handleFlushTask must be run serially.
10421086
func (db *DB) handleFlushTask(ft flushTask) error {
1043-
// There can be a scenario, when empty memtable is flushed.
1044-
if ft.mt.sl.Empty() {
1045-
return nil
1046-
}
1047-
1087+
// ft.mt could be nil with ft.itr being the valid field.
10481088
bopts := buildTableOptions(db)
10491089
builder := buildL0Table(ft, bopts)
10501090
defer builder.Close()
@@ -1080,11 +1120,52 @@ func (db *DB) handleFlushTask(ft flushTask) error {
10801120
func (db *DB) flushMemtable(lc *z.Closer) error {
10811121
defer lc.Done()
10821122

1123+
var sz int64
1124+
var itrs []y.Iterator
1125+
var mts []*memTable
1126+
var cbs []func()
1127+
slurp := func() {
1128+
for {
1129+
select {
1130+
case more := <-db.flushChan:
1131+
if more.mt == nil {
1132+
return
1133+
}
1134+
sl := more.mt.sl
1135+
itrs = append(itrs, sl.NewUniIterator(false))
1136+
mts = append(mts, more.mt)
1137+
cbs = append(cbs, more.cb)
1138+
1139+
sz += sl.MemSize()
1140+
if sz > db.opt.MemTableSize {
1141+
return
1142+
}
1143+
default:
1144+
return
1145+
}
1146+
}
1147+
}
1148+
10831149
for ft := range db.flushChan {
10841150
if ft.mt == nil {
10851151
// We close db.flushChan now, instead of sending a nil ft.mt.
10861152
continue
10871153
}
1154+
sz = ft.mt.sl.MemSize()
1155+
// Reset of itrs, mts etc. is being done below.
1156+
y.AssertTrue(len(itrs) == 0 && len(mts) == 0 && len(cbs) == 0)
1157+
itrs = append(itrs, ft.mt.sl.NewUniIterator(false))
1158+
mts = append(mts, ft.mt)
1159+
cbs = append(cbs, ft.cb)
1160+
1161+
// Pick more memtables, so we can really fill up the L0 table.
1162+
slurp()
1163+
1164+
// db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
1165+
ft.mt = nil
1166+
ft.itr = table.NewMergeIterator(itrs, false)
1167+
ft.cb = nil
1168+
10881169
for {
10891170
err := db.handleFlushTask(ft)
10901171
if err == nil {
@@ -1095,17 +1176,26 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
10951176
// which would arrive here would match db.imm[0], because we acquire a
10961177
// lock over DB when pushing to flushChan.
10971178
// TODO: This logic is dirty AF. Any change and this could easily break.
1098-
y.AssertTrue(ft.mt == db.imm[0])
1099-
db.imm = db.imm[1:]
1100-
ft.mt.DecrRef() // Return memory.
1179+
for _, mt := range mts {
1180+
y.AssertTrue(mt == db.imm[0])
1181+
db.imm = db.imm[1:]
1182+
mt.DecrRef() // Return memory.
1183+
}
11011184
db.lock.Unlock()
11021185

1186+
for _, cb := range cbs {
1187+
if cb != nil {
1188+
cb()
1189+
}
1190+
}
11031191
break
11041192
}
11051193
// Encountered error. Retry indefinitely.
11061194
db.opt.Errorf("Failure while flushing memtable to disk: %v. Retrying...\n", err)
11071195
time.Sleep(time.Second)
11081196
}
1197+
// Reset everything.
1198+
itrs, mts, cbs, sz = itrs[:0], mts[:0], cbs[:0], 0
11091199
}
11101200
return nil
11111201
}

db_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
20882088
y.Check2(rand.Read(value))
20892089
st := 0
20902090

2091-
buf := z.NewBuffer(10 << 20, "test")
2091+
buf := z.NewBuffer(10<<20, "test")
20922092
defer buf.Release()
20932093
for i := 0; i < 1000; i++ {
20942094
key := make([]byte, 8)

iterator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (item *Item) IsDeletedOrExpired() bool {
146146
// DiscardEarlierVersions returns whether the item was created with the
147147
// option to discard earlier versions of a key when multiple are available.
148148
func (item *Item) DiscardEarlierVersions() bool {
149-
return item.meta&bitDiscardEarlierVersions > 0
149+
return item.meta&BitDiscardEarlierVersions > 0
150150
}
151151

152152
func (item *Item) yieldItemValue() ([]byte, func(), error) {

levels.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
716716
}
717717
lastKey = y.SafeCopy(lastKey, it.Key())
718718
numVersions = 0
719-
firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
719+
firstKeyHasDiscardSet = it.Value().Meta&BitDiscardEarlierVersions > 0
720720

721721
if len(tableKr.left) == 0 {
722722
tableKr.left = y.SafeCopy(tableKr.left, it.Key())
@@ -753,7 +753,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
753753
// - The `discardEarlierVersions` bit is set OR
754754
// - We've already processed `NumVersionsToKeep` number of versions
755755
// (including the current item being processed)
756-
lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
756+
lastValidVersion := vs.Meta&BitDiscardEarlierVersions > 0 ||
757757
numVersions == s.kv.opt.NumVersionsToKeep
758758

759759
if isExpired || lastValidVersion {

levels_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -707,11 +707,11 @@ func TestDiscardFirstVersion(t *testing.T) {
707707

708708
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
709709
l0 := []keyValVersion{{"foo", "bar", 1, 0}}
710-
l01 := []keyValVersion{{"foo", "bar", 2, bitDiscardEarlierVersions}}
710+
l01 := []keyValVersion{{"foo", "bar", 2, BitDiscardEarlierVersions}}
711711
l02 := []keyValVersion{{"foo", "bar", 3, 0}}
712712
l03 := []keyValVersion{{"foo", "bar", 4, 0}}
713713
l04 := []keyValVersion{{"foo", "bar", 9, 0}}
714-
l05 := []keyValVersion{{"foo", "bar", 10, bitDiscardEarlierVersions}}
714+
l05 := []keyValVersion{{"foo", "bar", 10, BitDiscardEarlierVersions}}
715715

716716
// Level 0 has all the tables.
717717
createAndOpen(db, l0, 0)
@@ -742,11 +742,11 @@ func TestDiscardFirstVersion(t *testing.T) {
742742
// - Version 1 is below DiscardTS and below the first "bitDiscardEarlierVersions"
743743
// marker so IT WILL BE REMOVED.
744744
ExpectedKeys := []keyValVersion{
745-
{"foo", "bar", 10, bitDiscardEarlierVersions},
745+
{"foo", "bar", 10, BitDiscardEarlierVersions},
746746
{"foo", "bar", 9, 0},
747747
{"foo", "bar", 4, 0},
748748
{"foo", "bar", 3, 0},
749-
{"foo", "bar", 2, bitDiscardEarlierVersions}}
749+
{"foo", "bar", 2, BitDiscardEarlierVersions}}
750750

751751
getAllAndCheck(t, db, ExpectedKeys)
752752
})
@@ -1060,15 +1060,15 @@ func TestSameLevel(t *testing.T) {
10601060
opt.LmaxCompaction = true
10611061
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
10621062
l6 := []keyValVersion{
1063-
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1063+
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
10641064
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
10651065
}
10661066
l61 := []keyValVersion{
1067-
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1067+
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
10681068
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
10691069
}
10701070
l62 := []keyValVersion{
1071-
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1071+
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
10721072
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
10731073
}
10741074
createAndOpen(db, l6, 6)
@@ -1077,11 +1077,11 @@ func TestSameLevel(t *testing.T) {
10771077
require.NoError(t, db.lc.validate())
10781078

10791079
getAllAndCheck(t, db, []keyValVersion{
1080-
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1080+
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
10811081
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
1082-
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1082+
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
10831083
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
1084-
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1084+
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
10851085
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
10861086
})
10871087

@@ -1097,11 +1097,11 @@ func TestSameLevel(t *testing.T) {
10971097
db.SetDiscardTs(3)
10981098
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
10991099
getAllAndCheck(t, db, []keyValVersion{
1100-
{"A", "bar", 4, bitDiscardEarlierVersions}, {"A", "bar", 3, 0},
1100+
{"A", "bar", 4, BitDiscardEarlierVersions}, {"A", "bar", 3, 0},
11011101
{"A", "bar", 2, 0}, {"Afoo", "baz", 2, 0},
1102-
{"B", "bar", 4, bitDiscardEarlierVersions}, {"B", "bar", 3, 0},
1102+
{"B", "bar", 4, BitDiscardEarlierVersions}, {"B", "bar", 3, 0},
11031103
{"B", "bar", 2, 0}, {"Bfoo", "baz", 2, 0},
1104-
{"C", "bar", 4, bitDiscardEarlierVersions}, {"C", "bar", 3, 0},
1104+
{"C", "bar", 4, BitDiscardEarlierVersions}, {"C", "bar", 3, 0},
11051105
{"C", "bar", 2, 0}, {"Cfoo", "baz", 2, 0},
11061106
})
11071107

@@ -1118,9 +1118,9 @@ func TestSameLevel(t *testing.T) {
11181118
cdef.t.baseLevel = 1
11191119
require.NoError(t, db.lc.runCompactDef(-1, 6, cdef))
11201120
getAllAndCheck(t, db, []keyValVersion{
1121-
{"A", "bar", 4, bitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
1122-
{"B", "bar", 4, bitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
1123-
{"C", "bar", 4, bitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
1121+
{"A", "bar", 4, BitDiscardEarlierVersions}, {"Afoo", "baz", 2, 0},
1122+
{"B", "bar", 4, BitDiscardEarlierVersions}, {"Bfoo", "baz", 2, 0},
1123+
{"C", "bar", 4, BitDiscardEarlierVersions}, {"Cfoo", "baz", 2, 0}})
11241124
require.NoError(t, db.lc.validate())
11251125
})
11261126
}
@@ -1186,7 +1186,7 @@ func TestStaleDataCleanup(t *testing.T) {
11861186
for i := count; i > 0; i-- {
11871187
var meta byte
11881188
if i == 0 {
1189-
meta = bitDiscardEarlierVersions
1189+
meta = BitDiscardEarlierVersions
11901190
}
11911191
b.AddStaleKey(y.KeyWithTs(key, i), y.ValueStruct{Meta: meta, Value: val}, 0)
11921192
}

0 commit comments

Comments
 (0)