Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework DB.DropPrefix #1381

Merged
merged 4 commits into from
Jun 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
defer b.Close()
var vp valuePointer
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
vs := iter.Value()
Expand All @@ -987,9 +987,9 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
}

type flushTask struct {
mt *skl.Skiplist
vptr valuePointer
dropPrefix []byte
mt *skl.Skiplist
vptr valuePointer
dropPrefixes [][]byte
}

// handleFlushTask must be run serially.
Expand Down Expand Up @@ -1618,7 +1618,7 @@ func (db *DB) dropAll() (func(), error) {
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefix []byte) error {
func (db *DB) DropPrefix(prefixes ...[]byte) error {
db.opt.Infof("DropPrefix Called")
f, err := db.prepareToDrop()
if err != nil {
Expand All @@ -1638,8 +1638,8 @@ func (db *DB) DropPrefix(prefix []byte) error {
task := flushTask{
mt: memtable,
// Ensure that the head of value log gets persisted to disk.
vptr: db.vhead,
dropPrefix: prefix,
vptr: db.vhead,
dropPrefixes: prefixes,
}
db.opt.Debugf("Flushing memtable")
if err := db.handleFlushTask(task); err != nil {
Expand All @@ -1654,7 +1654,7 @@ func (db *DB) DropPrefix(prefix []byte) error {
db.mt = skl.NewSkiplist(arenaSize(db.opt))

// Drop prefixes from the levels.
if err := db.lc.dropPrefix(prefix); err != nil {
if err := db.lc.dropPrefixes(prefixes); err != nil {
return err
}
db.opt.Infof("DropPrefix done")
Expand Down
141 changes: 92 additions & 49 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,25 @@ func (s *levelsController) dropTree() (int, error) {
// tables who only have keys with this prefix are quickly dropped. The ones which have other keys
// are run through MergeIterator and compacted to create new tables. All the mechanisms of
// compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
func (s *levelsController) dropPrefix(prefix []byte) error {
func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
// Internal move keys related to the given prefix should also be skipped.
for _, prefix := range prefixes {
key := make([]byte, 0, len(badgerMove)+len(prefix))
key = append(key, badgerMove...)
key = append(key, prefix...)
prefixes = append(prefixes, key)
}

opt := s.kv.opt
for _, l := range s.levels {
// Iterate levels in the reverse order because if we were to iterate from
// lower level (say level 0) to a higher level (say level 3) we could have
// a state in which level 0 is compacted and an older version of a key exists in lower level.
// At this point, if someone creates an iterator, they would see an old
// value for a key from lower levels. Iterating in reverse order ensures we
// drop the oldest data first so that lookups never return stale data.
for i := len(s.levels) - 1; i >= 0; i-- {
l := s.levels[i]

l.RLock()
if l.level == 0 {
size := len(l.tables)
Expand All @@ -288,7 +304,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
score: 1.74,
// A unique number greater than 1.0 does two things. Helps identify this
// function in logs, and forces a compaction.
dropPrefix: prefix,
dropPrefixes: prefixes,
}
if err := s.doCompact(cp); err != nil {
opt.Warningf("While compacting level 0: %v", err)
Expand All @@ -298,39 +314,49 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
continue
}

var tables []*table.Table
// Internal move keys related to the given prefix should also be skipped.
moveKeyForPrefix := append(badgerMove, prefix...)
prefixesToSkip := [][]byte{prefix, moveKeyForPrefix}
for _, table := range l.tables {
var absent bool
switch {
case hasAnyPrefixes(table.Smallest(), prefixesToSkip):
case hasAnyPrefixes(table.Biggest(), prefixesToSkip):
case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip):
default:
absent = true
// Build a list of compaction tableGroups affecting all the prefixes we
// need to drop. We need to build tableGroups that satisfy the invariant that
// bottom tables are consecutive.
// tableGroup contains groups of consecutive tables.
var tableGroups [][]*table.Table
var tableGroup []*table.Table

finishGroup := func() {
if len(tableGroup) > 0 {
tableGroups = append(tableGroups, tableGroup)
tableGroup = nil
}
if !absent {
tables = append(tables, table)
}

for _, table := range l.tables {
if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) {
tableGroup = append(tableGroup, table)
} else {
finishGroup()
}
}
finishGroup()

l.RUnlock()
if len(tables) == 0 {

if len(tableGroups) == 0 {
continue
}

cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
thisLevel: l,
nextLevel: l,
top: []*table.Table{},
bot: tables,
dropPrefix: prefix,
}
if err := s.runCompactDef(l.level, cd); err != nil {
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
return err
opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
for _, operation := range tableGroups {
cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
thisLevel: l,
nextLevel: l,
top: nil,
bot: operation,
dropPrefixes: prefixes,
}
if err := s.runCompactDef(l.level, cd); err != nil {
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
return err
}
}
}
return nil
Expand Down Expand Up @@ -395,9 +421,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool {
}

type compactionPriority struct {
level int
score float64
dropPrefix []byte
level int
score float64
dropPrefixes [][]byte
}

// pickCompactLevel determines which level to compact.
Expand Down Expand Up @@ -491,13 +517,19 @@ func (s *levelsController) compactBuildTables(

// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
var valid []*table.Table

nextTable:
for _, table := range botTables {
if len(cd.dropPrefix) > 0 &&
bytes.HasPrefix(table.Smallest(), cd.dropPrefix) &&
bytes.HasPrefix(table.Biggest(), cd.dropPrefix) {
// All the keys in this table have the dropPrefix. So, this table does not need to be
// in the iterator and can be dropped immediately.
continue
if len(cd.dropPrefixes) > 0 {
for _, prefix := range cd.dropPrefixes {
if bytes.HasPrefix(table.Smallest(), prefix) &&
bytes.HasPrefix(table.Biggest(), prefix) {
// All the keys in this table have the dropPrefix. So, this
// table does not need to be in the iterator and can be
// dropped immediately.
continue nextTable
}
}
}
valid = append(valid, table)
}
Expand Down Expand Up @@ -535,12 +567,9 @@ func (s *levelsController) compactBuildTables(
bopts.BfCache = s.kv.bfCache
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
// Internal move keys related to the given prefix should also be skipped.
moveKeyForPrefix := append(badgerMove, cd.dropPrefix...)
prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix}
for ; it.Valid(); it.Next() {
// See if we need to skip the prefix.
if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) {
if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
numSkips++
updateStats(it.Value())
continue
Expand Down Expand Up @@ -719,10 +748,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
return false
}

func containsPrefix(smallValue, largeValue, prefix []byte) bool {
if bytes.HasPrefix(smallValue, prefix) {
return true
}
if bytes.HasPrefix(largeValue, prefix) {
return true
}
if bytes.Compare(prefix, smallValue) > 0 &&
bytes.Compare(prefix, largeValue) < 0 {
return true
}

return false
}

func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool {
for _, prefix := range listOfPrefixes {
if bytes.Compare(prefix, smallValue) > 0 &&
bytes.Compare(prefix, largeValue) < 0 {
if containsPrefix(smallValue, largeValue, prefix) {
return true
}
}
Expand All @@ -744,7 +787,7 @@ type compactDef struct {

thisSize int64

dropPrefix []byte
dropPrefixes [][]byte
}

func (cd *compactDef) lockLevels() {
Expand Down Expand Up @@ -918,10 +961,10 @@ func (s *levelsController) doCompact(p compactionPriority) error {
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
dropPrefix: p.dropPrefix,
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
dropPrefixes: p.dropPrefixes,
}
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()
Expand Down
40 changes: 40 additions & 0 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,3 +778,43 @@ func TestL0Stall(t *testing.T) {
test(t, &opt)
})
}

// Regression test for https://github.com/dgraph-io/dgraph/issues/5573
func TestDropPrefixMoveBug(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// l1 is used to verify that drop prefix actually drops move keys from all the levels.
l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}}
createAndOpen(db, l1, 1)

// Mutiple levels can have the exact same move key with version.
l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}}
l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}}
l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}}

// Level 2 has all the tables.
createAndOpen(db, l2, 2)
createAndOpen(db, l21, 2)
createAndOpen(db, l22, 2)

require.NoError(t, db.lc.validate())
require.NoError(t, db.DropPrefix([]byte("F")))

db.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true

it := txn.NewIterator(iopt)
defer it.Close()

specialKey := []byte("F")
droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)}
for it.Rewind(); it.Valid(); it.Next() {
key := it.Item().Key()
// Ensure we don't have any "F" or "!badger!move!F" left
require.False(t, hasAnyPrefixes(key, droppedPrefixes))
}
return nil
})
require.NoError(t, db.lc.validate())
})
}
4 changes: 2 additions & 2 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (s *levelHandler) validate() error {

if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 {
return errors.Errorf(
"Intra: %q vs %q: level=%d j=%d numTables=%d",
s.tables[j].Smallest(), s.tables[j].Biggest(), s.level, j, numTables)
"Intra: \n%s\n vs \n%s\n: level=%d j=%d numTables=%d",
hex.Dump(s.tables[j].Smallest()), hex.Dump(s.tables[j].Biggest()), s.level, j, numTables)
}
}
return nil
Expand Down