Skip to content

Commit

Permalink
Improve write stalling on level 0 and 1
Browse files Browse the repository at this point in the history
We don't need to stall writes if Level 1 does not have enough space.
Level 1 is stored on the disk and it should be okay to have more
number of tables (more size) on Level 1 than the `max level 1 size`.
These tables will eventually be compacted to lower levels.

This commit changes the following
- We no longer stall writes if L1 doesn't have enough space.
- We stall writes on level 0 only if `KeepL0InMemory` is true.
- Upper levels (L0, L1, etc) get priority in compaction (previously,
    level with higher priority score would get preference)
  • Loading branch information
Ibrahim Jarif authored Jan 15, 2020
1 parent 5870b7b commit 3747be5
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 12 deletions.
4 changes: 3 additions & 1 deletion level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
// Need lock as we may be deleting the first table during a level 0 compaction.
s.Lock()
defer s.Unlock()
if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
// Return false only if L0 is in memory and number of tables is more than number of
// ZeroTableStall. For on disk L0, we should just add the tables to the level.
if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
return false
}

Expand Down
21 changes: 10 additions & 11 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,10 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
prios = append(prios, pri)
}
}
sort.Slice(prios, func(i, j int) bool {
return prios[i].score > prios[j].score
})
// We used to sort compaction priorities based on the score. But, we
// decided to compact based on the level, not the priority. So, upper
// levels (level 0, level 1, etc) always get compacted first, before the
// lower levels -- this allows us to avoid stalls.
return prios
}

Expand Down Expand Up @@ -937,15 +938,13 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
s.cstatus.RUnlock()
timeStart = time.Now()
}
// Before we unstall, we need to make sure that level 0 and 1 are healthy. Otherwise, we
// will very quickly fill up level 0 again and if the compaction strategy favors level 0,
// then level 1 is going to super full.
// Before we unstall, we need to make sure that level 0 is healthy. Otherwise, we
// will very quickly fill up level 0 again.
for i := 0; ; i++ {
// Passing 0 for delSize to compactable means we're treating incomplete compactions as
// not having finished -- we wait for them to finish. Also, it's crucial this behavior
// replicates pickCompactLevels' behavior in computing compactability in order to
// guarantee progress.
if !s.isLevel0Compactable() && !s.levels[1].isCompactable(0) {
// It's crucial that this behavior replicates pickCompactLevels' behavior in
// computing compactability in order to guarantee progress.
// Break the loop once L0 has enough space to accommodate new tables.
if !s.isLevel0Compactable() {
break
}
time.Sleep(10 * time.Millisecond)
Expand Down
138 changes: 138 additions & 0 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package badger
import (
"math"
"testing"
"time"

"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
Expand Down Expand Up @@ -439,3 +440,140 @@ func TestDiscardFirstVersion(t *testing.T) {
getAllAndCheck(t, db, ExpectedKeys)
})
}

// This test ensures we don't stall when L1's size is greater than opt.LevelOneSize.
// We should stall only when L0 tables more than the opt.NumLevelZeroTableStall.
func TestL1Stall(t *testing.T) {
opt := DefaultOptions("")
// Disable all compactions.
opt.NumCompactors = 0
// Number of level zero tables.
opt.NumLevelZeroTables = 3
// Addition of new tables will stall if there are 4 or more L0 tables.
opt.NumLevelZeroTablesStall = 4
// Level 1 size is 10 bytes.
opt.LevelOneSize = 10

runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
// Level 0 has 4 tables.
db.lc.levels[0].Lock()
db.lc.levels[0].tables = []*table.Table{createEmptyTable(db), createEmptyTable(db),
createEmptyTable(db), createEmptyTable(db)}
db.lc.levels[0].Unlock()

timeout := time.After(5 * time.Second)
done := make(chan bool)

// This is important. Set level 1 size more than the opt.LevelOneSize (we've set it to 10).
db.lc.levels[1].totalSize = 100
go func() {
tab := createEmptyTable(db)
db.lc.addLevel0Table(tab)
tab.DecrRef()
done <- true
}()
time.Sleep(time.Second)

db.lc.levels[0].Lock()
// Drop two tables from Level 0 so that addLevel0Table can make progress. Earlier table
// count was 4 which is equal to L0 stall count.
toDrop := db.lc.levels[0].tables[:2]
decrRefs(toDrop)
db.lc.levels[0].tables = db.lc.levels[0].tables[2:]
db.lc.levels[0].Unlock()

select {
case <-timeout:
t.Fatal("Test didn't finish in time")
case <-done:
}
})
}

func createEmptyTable(db *DB) *table.Table {
opts := table.Options{
BloomFalsePositive: db.opt.BloomFalsePositive,
LoadingMode: options.LoadToRAM,
ChkMode: options.NoVerification,
}
b := table.NewTableBuilder(opts)
// Add one key so that we can open this table.
b.Add(y.KeyWithTs([]byte("foo"), 1), y.ValueStruct{}, 0)
fd, err := y.CreateSyncedFile(table.NewFilename(db.lc.reserveFileID(), db.opt.Dir), true)
if err != nil {
panic(err)
}

if _, err := fd.Write(b.Finish()); err != nil {
panic(err)
}
tab, err := table.OpenTable(fd, table.Options{})
if err != nil {
panic(err)
}
// Add dummy entry to manifest file so that it doesn't complain during compaction.
if err := db.manifest.addChanges([]*pb.ManifestChange{
newCreateChange(tab.ID(), 0, 0, tab.CompressionType()),
}); err != nil {
panic(err)
}

return tab
}

func TestL0Stall(t *testing.T) {
test := func(t *testing.T, opt *Options) {
runBadgerTest(t, opt, func(t *testing.T, db *DB) {
db.lc.levels[0].Lock()
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
// zero and all new additions are expected to stall if L0 is in memory.
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
}
db.lc.levels[0].Unlock()

timeout := time.After(5 * time.Second)
done := make(chan bool)

go func() {
tab := createEmptyTable(db)
db.lc.addLevel0Table(tab)
tab.DecrRef()
done <- true
}()
// Let it stall for a second.
time.Sleep(time.Second)

select {
case <-timeout:
if opt.KeepL0InMemory {
t.Log("Timeout triggered")
// Mark this test as successful since L0 is in memory and the
// addition of new table to L0 is supposed to stall.
} else {
t.Fatal("Test didn't finish in time")
}
case <-done:
// The test completed before 5 second timeout. Mark it as successful.
}
})
}

opt := DefaultOptions("")
opt.EventLogging = false
// Disable all compactions.
opt.NumCompactors = 0
// Number of level zero tables.
opt.NumLevelZeroTables = 3
// Addition of new tables will stall if there are 4 or more L0 tables.
opt.NumLevelZeroTablesStall = 4

t.Run("with KeepL0InMemory", func(t *testing.T) {
opt.KeepL0InMemory = true
test(t, &opt)
})
t.Run("with L0 on disk", func(t *testing.T) {
opt.KeepL0InMemory = false
test(t, &opt)
})
}

0 comments on commit 3747be5

Please sign in to comment.