Skip to content

Commit

Permalink
Do not stall Level 1
Browse files Browse the repository at this point in the history
  • Loading branch information
Ibrahim Jarif authored and coocood committed Feb 25, 2020
1 parent 264a464 commit 6985932
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 31 deletions.
6 changes: 3 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func replayFunction(out *DB) func(Entry) error {
first := true
return func(e Entry) error { // Function for replaying.
if first {
log.Infof("First key=%s\n", e.Key)
log.Infof("First key=%s", e.Key)
}
first = false

Expand Down Expand Up @@ -787,7 +787,7 @@ func (db *DB) flushMemTable() *sync.WaitGroup {
db.mtbls.Store(newTbls)
ft := newFlushTask(mTbls.getMutable(), db.logOff)
db.flushChan <- ft
log.Infof("Flushing memtable, mt.size=%d, size of flushChan: %d\n",
log.Infof("Flushing memtable, mt.size=%d, size of flushChan: %d",
mTbls.getMutable().MemSize(), len(db.flushChan))

// New memtable is empty. We certainly have room.
Expand Down Expand Up @@ -891,7 +891,7 @@ func (db *DB) runFlushMemTable(c *y.Closer) error {
LogOffset: ft.off.offset,
}
// Store badger head even if vptr is zero, need it for readTs
log.Infof("Storing offset: %+v\n", ft.off)
log.Infof("Storing offset: %+v", ft.off)
}

fileID := db.lc.reserveFileID()
Expand Down
3 changes: 2 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,8 @@ func TestCompactionFilter(t *testing.T) {
return nil
})
require.True(t, deleteNotFoundCount > 0)
require.True(t, dropAppearOldCount > 0)
// Since compaction always pick upper level first, the never has chance to reappear old value.
// require.True(t, dropAppearOldCount > 0)
}

func (f *testFilter) Guards() []Guard {
Expand Down
2 changes: 2 additions & 0 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
// Need lock as we may be deleting the first table during a level 0 compaction.
s.Lock()
defer s.Unlock()
// Return false only if number of tables is more than number of
// ZeroTableStall. For on disk L0, we should just add the tables to the level.
if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
return false
}
Expand Down
55 changes: 28 additions & 27 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
// 2. Delete files that shouldn't exist.
for id := range idMap {
if _, ok := mf.Tables[id]; !ok {
log.Infof("Table file %d not referenced in MANIFEST\n", id)
log.Infof("Table file %d not referenced in MANIFEST", id)
filename := table.NewFilename(id, kv.opt.Dir)
if err := os.Remove(filename); err != nil {
return y.Wrapf(err, "While removing table %d", id)
Expand Down Expand Up @@ -256,9 +256,10 @@ func (lc *levelsController) pickCompactLevels() (prios []compactionPriority) {
prios = append(prios, pri)
}
}
sort.Slice(prios, func(i, j int) bool {
return prios[i].score > prios[j].score
})
// We used to sort compaction priorities based on the score. But, we
// decided to compact based on the level, not the priority. So, upper
// levels (level 0, level 1, etc) always get compacted first, before the
// lower levels -- this allows us to avoid stalls.
return prios
}

Expand Down Expand Up @@ -482,7 +483,7 @@ func (lc *levelsController) compactBuildTables(level int, cd compactDef,
}
// It was true that it.Valid() at least once in the loop above, which means we
// called Add() at least once, and builder is not Empty().
log.Infof("LOG Compact. Iteration took: %v\n", time.Since(timeStart))
log.Infof("LOG Compact. Iteration took: %v", time.Since(timeStart))
if err = builder.Finish(); err != nil {
return
}
Expand Down Expand Up @@ -750,7 +751,7 @@ func (lc *levelsController) runCompactDef(l int, cd compactDef, limiter *rate.Li
// Note: For level 0, while doCompact is running, it is possible that new tables are added.
// However, the tables are added only to the end, so it is ok to just delete the first table.

log.Infof("LOG Compact %d->%d, del %d tables, add %d tables, took %v\n",
log.Infof("LOG Compact %d->%d, del %d tables, add %d tables, took %v",
l, l+1, len(cd.top)+len(cd.bot), len(newTables), time.Since(timeStart))
return nil
}
Expand All @@ -771,12 +772,12 @@ func (lc *levelsController) doCompact(p compactionPriority, guard *epoch.Guard)
// remain unchanged.
if l == 0 {
if !lc.fillTablesL0(&cd) {
log.Infof("fillTables failed for level: %d\n", l)
log.Infof("fillTables failed for level: %d", l)
return false, nil
}
} else {
if !lc.fillTables(&cd) {
log.Infof("fillTables failed for level: %d\n", l)
log.Infof("fillTables failed for level: %d", l)
return false, nil
}
}
Expand Down Expand Up @@ -807,34 +808,34 @@ func (lc *levelsController) addLevel0Table(t *table.Table, head *protos.HeadInfo

for !lc.levels[0].tryAddLevel0Table(t) {
// Stall. Make sure all levels are healthy before we unstall.
log.Warnf("STALLED STALLED STALLED STALLED STALLED STALLED STALLED STALLED: %v\n",
time.Since(lastUnstalled))
lc.cstatus.RLock()
for i := 0; i < lc.kv.opt.TableBuilderOptions.MaxLevels; i++ {
log.Infof("level=%d. Status=%s Size=%d\n",
i, lc.cstatus.levels[i].debug(), lc.levels[i].getTotalSize())
}
lc.cstatus.RUnlock()
timeStart := time.Now()
// Before we unstall, we need to make sure that level 0 and 1 are healthy. Otherwise, we
// will very quickly fill up level 0 again and if the compaction strategy favors level 0,
// then level 1 is going to super full.
var timeStart time.Time
{
log.Warnf("STALLED STALLED STALLED: %v", time.Since(lastUnstalled))
lc.cstatus.RLock()
for i := 0; i < lc.kv.opt.TableBuilderOptions.MaxLevels; i++ {
log.Warnf("level=%d. Status=%s Size=%d",
i, lc.cstatus.levels[i].debug(), lc.levels[i].getTotalSize())
}
lc.cstatus.RUnlock()
timeStart = time.Now()
}
// Before we unstall, we need to make sure that level 0 is healthy. Otherwise, we
// will very quickly fill up level 0 again.
for i := 0; ; i++ {
// Passing 0 for deltaSize to compactable means we're treating incomplete compactions as
// not having finished -- we wait for them to finish. Also, it's crucial this behavior
// replicates pickCompactLevels' behavior in computing compactability in order to
// guarantee progress.
if !lc.isL0Compactable() && !lc.levels[1].isCompactable(0) {
// It's crucial that this behavior replicates pickCompactLevels' behavior in
// computing compactability in order to guarantee progress.
// Break the loop once L0 has enough space to accommodate new tables.
if !lc.isL0Compactable() {
break
}
time.Sleep(10 * time.Millisecond)
if i%100 == 0 {
prios := lc.pickCompactLevels()
log.Warnf("Waiting to add level 0 table. Compaction priorities: %+v\n", prios)
log.Warnf("Waiting to add level 0 table. Compaction priorities: %+v", prios)
i = 0
}
}
log.Infof("UNSTALLED UNSTALLED UNSTALLED UNSTALLED UNSTALLED UNSTALLED: %v\n",
log.Infof("UNSTALLED UNSTALLED UNSTALLED UNSTALLED UNSTALLED UNSTALLED: %v",
time.Since(timeStart))
lastUnstalled = time.Now()
}
Expand Down

0 comments on commit 6985932

Please sign in to comment.