Skip to content

Commit

Permalink
Rework DB.DropPrefix (#1381) (#1514)
Browse files Browse the repository at this point in the history
Fixes three issues with the current implementation:

- It can generate compaction requests that break the invariant that bottom
  tables need to be consecutive (issue #1380). See
  #1380 (comment)
- It performs the same level compactions in increasing order of levels
  (starting from L0) which leads to old versions of keys for the prefix
  re-surfacing to active transactions.
- When you have to drop multiple prefixes, the API forces you to drop one
  prefix at a time and go through the whole expensive table rewriting multiple
  times.

Fixes #1381

Co-authored-by: Ibrahim Jarif <ibrahim@dgraph.io>
(cherry picked from commit e013bfd)

Co-authored-by: Damien Tournoud <damien@platform.sh>
  • Loading branch information
Ibrahim Jarif and damz authored Sep 10, 2020
1 parent 149918c commit 058a414
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 59 deletions.
17 changes: 9 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ func writeLevel0Table(ft flushTask, f io.Writer) error {
b := table.NewTableBuilder()
defer b.Close()
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) {
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
continue
}
b.Add(iter.Key(), iter.Value())
Expand All @@ -866,9 +866,9 @@ func writeLevel0Table(ft flushTask, f io.Writer) error {
}

type flushTask struct {
mt *skl.Skiplist
vptr valuePointer
dropPrefix []byte
mt *skl.Skiplist
vptr valuePointer
dropPrefixes [][]byte
}

func (db *DB) pushHead(ft flushTask) error {
Expand Down Expand Up @@ -1464,7 +1464,8 @@ func (db *DB) dropAll() (func(), error) {
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefix []byte) error {
func (db *DB) DropPrefix(prefixes ...[]byte) error {
db.opt.Infof("DropPrefix Called")
f := db.prepareToDrop()
defer f()
// Block all foreign interactions with memory tables.
Expand All @@ -1480,8 +1481,8 @@ func (db *DB) DropPrefix(prefix []byte) error {
task := flushTask{
mt: memtable,
// Ensure that the head of value log gets persisted to disk.
vptr: db.vhead,
dropPrefix: prefix,
vptr: db.vhead,
dropPrefixes: prefixes,
}
db.opt.Debugf("Flushing memtable")
if err := db.handleFlushTask(task); err != nil {
Expand All @@ -1496,7 +1497,7 @@ func (db *DB) DropPrefix(prefix []byte) error {
db.mt = skl.NewSkiplist(arenaSize(db.opt))

// Drop prefixes from the levels.
if err := db.lc.dropPrefix(prefix); err != nil {
if err := db.lc.dropPrefixes(prefixes); err != nil {
return err
}
db.opt.Infof("DropPrefix done")
Expand Down
141 changes: 92 additions & 49 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,25 @@ func (s *levelsController) dropTree() (int, error) {
// tables who only have keys with this prefix are quickly dropped. The ones which have other keys
// are run through MergeIterator and compacted to create new tables. All the mechanisms of
// compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
func (s *levelsController) dropPrefix(prefix []byte) error {
func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
// Internal move keys related to the given prefix should also be skipped.
for _, prefix := range prefixes {
key := make([]byte, 0, len(badgerMove)+len(prefix))
key = append(key, badgerMove...)
key = append(key, prefix...)
prefixes = append(prefixes, key)
}

opt := s.kv.opt
for _, l := range s.levels {
// Iterate levels in the reverse order because if we were to iterate from
// lower level (say level 0) to a higher level (say level 3) we could have
// a state in which level 0 is compacted and an older version of a key exists in lower level.
// At this point, if someone creates an iterator, they would see an old
// value for a key from lower levels. Iterating in reverse order ensures we
// drop the oldest data first so that lookups never return stale data.
for i := len(s.levels) - 1; i >= 0; i-- {
l := s.levels[i]

l.RLock()
if l.level == 0 {
size := len(l.tables)
Expand All @@ -276,7 +292,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
score: 1.74,
// A unique number greater than 1.0 does two things. Helps identify this
// function in logs, and forces a compaction.
dropPrefix: prefix,
dropPrefixes: prefixes,
}
if err := s.doCompact(cp); err != nil {
opt.Warningf("While compacting level 0: %v", err)
Expand All @@ -286,39 +302,49 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
continue
}

var tables []*table.Table
// Internal move keys related to the given prefix should also be skipped.
moveKeyForPrefix := append(badgerMove, prefix...)
prefixesToSkip := [][]byte{prefix, moveKeyForPrefix}
for _, table := range l.tables {
var absent bool
switch {
case hasAnyPrefixes(table.Smallest(), prefixesToSkip):
case hasAnyPrefixes(table.Biggest(), prefixesToSkip):
case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip):
default:
absent = true
// Build a list of compaction tableGroups affecting all the prefixes we
// need to drop. We need to build tableGroups that satisfy the invariant that
// bottom tables are consecutive.
// tableGroup contains groups of consecutive tables.
var tableGroups [][]*table.Table
var tableGroup []*table.Table

finishGroup := func() {
if len(tableGroup) > 0 {
tableGroups = append(tableGroups, tableGroup)
tableGroup = nil
}
if !absent {
tables = append(tables, table)
}

for _, table := range l.tables {
if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) {
tableGroup = append(tableGroup, table)
} else {
finishGroup()
}
}
finishGroup()

l.RUnlock()
if len(tables) == 0 {

if len(tableGroups) == 0 {
continue
}

cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
thisLevel: l,
nextLevel: l,
top: []*table.Table{},
bot: tables,
dropPrefix: prefix,
}
if err := s.runCompactDef(l.level, cd); err != nil {
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
return err
opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
for _, operation := range tableGroups {
cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
thisLevel: l,
nextLevel: l,
top: nil,
bot: operation,
dropPrefixes: prefixes,
}
if err := s.runCompactDef(l.level, cd); err != nil {
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
return err
}
}
}
return nil
Expand Down Expand Up @@ -380,9 +406,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool {
}

type compactionPriority struct {
level int
score float64
dropPrefix []byte
level int
score float64
dropPrefixes [][]byte
}

// pickCompactLevel determines which level to compact.
Expand Down Expand Up @@ -467,13 +493,19 @@ func (s *levelsController) compactBuildTables(

// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
var valid []*table.Table

nextTable:
for _, table := range botTables {
if len(cd.dropPrefix) > 0 &&
bytes.HasPrefix(table.Smallest(), cd.dropPrefix) &&
bytes.HasPrefix(table.Biggest(), cd.dropPrefix) {
// All the keys in this table have the dropPrefix. So, this table does not need to be
// in the iterator and can be dropped immediately.
continue
if len(cd.dropPrefixes) > 0 {
for _, prefix := range cd.dropPrefixes {
if bytes.HasPrefix(table.Smallest(), prefix) &&
bytes.HasPrefix(table.Biggest(), prefix) {
// All the keys in this table have the dropPrefix. So, this
// table does not need to be in the iterator and can be
// dropped immediately.
continue nextTable
}
}
}
valid = append(valid, table)
}
Expand All @@ -500,12 +532,9 @@ func (s *levelsController) compactBuildTables(
timeStart := time.Now()
builder := table.NewTableBuilder()
var numKeys, numSkips uint64
// Internal move keys related to the given prefix should also be skipped.
moveKeyForPrefix := append(badgerMove, cd.dropPrefix...)
prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix}
for ; it.Valid(); it.Next() {
// See if we need to skip the prefix.
if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) {
if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
numSkips++
updateStats(it.Value())
continue
Expand Down Expand Up @@ -669,10 +698,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
return false
}

func containsPrefix(smallValue, largeValue, prefix []byte) bool {
if bytes.HasPrefix(smallValue, prefix) {
return true
}
if bytes.HasPrefix(largeValue, prefix) {
return true
}
if bytes.Compare(prefix, smallValue) > 0 &&
bytes.Compare(prefix, largeValue) < 0 {
return true
}

return false
}

func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool {
for _, prefix := range listOfPrefixes {
if bytes.Compare(prefix, smallValue) > 0 &&
bytes.Compare(prefix, largeValue) < 0 {
if containsPrefix(smallValue, largeValue, prefix) {
return true
}
}
Expand All @@ -694,7 +737,7 @@ type compactDef struct {

thisSize int64

dropPrefix []byte
dropPrefixes [][]byte
}

func (cd *compactDef) lockLevels() {
Expand Down Expand Up @@ -863,10 +906,10 @@ func (s *levelsController) doCompact(p compactionPriority) error {
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

cd := compactDef{
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
dropPrefix: p.dropPrefix,
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
thisLevel: s.levels[l],
nextLevel: s.levels[l+1],
dropPrefixes: p.dropPrefixes,
}
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()
Expand Down
40 changes: 40 additions & 0 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,3 +672,43 @@ func TestDiscardFirstVersion(t *testing.T) {
getAllAndCheck(t, db, ExpectedKeys)
})
}

// Regression test for https://github.com/dgraph-io/dgraph/issues/5573
func TestDropPrefixMoveBug(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// l1 is used to verify that drop prefix actually drops move keys from all the levels.
l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}}
createAndOpen(db, l1, 1)

// Mutiple levels can have the exact same move key with version.
l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}}
l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}}
l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}}

// Level 2 has all the tables.
createAndOpen(db, l2, 2)
createAndOpen(db, l21, 2)
createAndOpen(db, l22, 2)

require.NoError(t, db.lc.validate())
require.NoError(t, db.DropPrefix([]byte("F")))

db.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true

it := txn.NewIterator(iopt)
defer it.Close()

specialKey := []byte("F")
droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)}
for it.Rewind(); it.Valid(); it.Next() {
key := it.Item().Key()
// Ensure we don't have any "F" or "!badger!move!F" left
require.False(t, hasAnyPrefixes(key, droppedPrefixes))
}
return nil
})
require.NoError(t, db.lc.validate())
})
}
4 changes: 2 additions & 2 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (s *levelHandler) validate() error {

if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 {
return errors.Errorf(
"Intra: %q vs %q: level=%d j=%d numTables=%d",
s.tables[j].Smallest(), s.tables[j].Biggest(), s.level, j, numTables)
"Intra: \n%s\n vs \n%s\n: level=%d j=%d numTables=%d",
hex.Dump(s.tables[j].Smallest()), hex.Dump(s.tables[j].Biggest()), s.level, j, numTables)
}
}
return nil
Expand Down

0 comments on commit 058a414

Please sign in to comment.