Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support sub compaction to speed up large compaction #70

Merged
merged 6 commits into from
Dec 20, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,15 @@ type levelHandlerRLocked struct{}
// This function should already have acquired a read lock, and this is so important the caller must
// pass an empty parameter declaring such.
func (s *levelHandler) overlappingTables(_ levelHandlerRLocked, kr keyRange) (int, int) {
left := sort.Search(len(s.tables), func(i int) bool {
return y.CompareKeys(kr.left, s.tables[i].Biggest()) <= 0
return getTablesInRange(s.tables, kr.left, kr.right)
}

func getTablesInRange(tbls []*table.Table, start, end []byte) (int, int) {
left := sort.Search(len(tbls), func(i int) bool {
return y.CompareKeys(start, tbls[i].Biggest()) <= 0
})
right := sort.Search(len(s.tables), func(i int) bool {
return y.CompareKeys(kr.right, s.tables[i].Smallest()) < 0
right := sort.Search(len(tbls), func(i int) bool {
return y.CompareKeys(end, tbls[i].Smallest()) < 0
})
return left, right
}
163 changes: 153 additions & 10 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ package badger

import (
"fmt"
"github.com/coocood/badger/options"
"golang.org/x/time/rate"
"math"
"math/rand"
"os"
"sort"
"sync"
"time"

"github.com/coocood/badger/options"
"golang.org/x/time/rate"

"github.com/coocood/badger/protos"
"github.com/coocood/badger/table"
"github.com/coocood/badger/y"
Expand Down Expand Up @@ -291,7 +293,7 @@ func (ds *DiscardStats) collect(vs y.ValueStruct) {
}

// compactBuildTables merge topTables and botTables to form a list of new tables.
func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter *rate.Limiter) ([]*table.Table, func() error, error) {
func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter *rate.Limiter, start, end []byte) ([]*table.Table, error) {
topTables := cd.top
botTables := cd.bot

Expand Down Expand Up @@ -329,7 +331,10 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
var firstErr error
var builder *table.Builder

for it.Valid() {
if start != nil {
it.Seek(start)
}
for ; it.Valid() && (end == nil || y.CompareKeys(it.Key(), end) < 0); {
timeStart := time.Now()
fileID := s.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), false)
Expand All @@ -344,7 +349,7 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
}

var numKeys uint64
for ; it.Valid(); it.Next() {
for ; it.Valid() && (end == nil || y.CompareKeys(it.Key(), end) < 0); it.Next() {
// See if we need to skip this key.
if len(skipKey) > 0 {
if y.SameKey(it.Key(), skipKey) {
Expand Down Expand Up @@ -428,15 +433,15 @@ func (s *levelsController) compactBuildTables(level int, cd compactDef, limiter
tbl.DecrRef()
}
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
return nil, nil, errorReturn
return nil, errorReturn
}

sort.Slice(newTables, func(i, j int) bool {
return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
})
s.kv.vlog.updateGCStats(discardStats.discardSpaces)
log.Infof("Discard stats: %v", discardStats)
return newTables, func() error { return decrRefs(newTables) }, nil
return newTables, nil
}

func buildChangeSet(cd *compactDef, newTables []*table.Table) protos.ManifestChangeSet {
Expand Down Expand Up @@ -476,6 +481,49 @@ func (cd *compactDef) unlockLevels() {
cd.thisLevel.RUnlock()
}

type rangeWithSize struct {
start []byte
end []byte
sz int
}

func (cd *compactDef) getInputBounds() []rangeWithSize {
bounds := make([][]byte, 0, len(cd.bot)+1)
for _, tbl := range cd.bot {
smallest := y.KeyWithTs(y.ParseKey(tbl.Smallest()), math.MaxUint64)
bounds = append(bounds, smallest)
}
biggest := y.KeyWithTs(y.ParseKey(cd.bot[len(cd.bot)-1].Biggest()), 0)
bounds = append(bounds, biggest)

ranges := make([]rangeWithSize, 0, len(bounds))
for i := 0; i < len(bounds)-1; i++ {
start, end := bounds[i], bounds[i+1]
sz := cd.sizeInRange(cd.top, cd.thisLevel.level, start, end)
if len(cd.bot) != 0 {
sz += cd.sizeInRange(cd.bot, cd.nextLevel.level, start, end)
}
ranges = append(ranges, rangeWithSize{start: start, end: end, sz: sz})
}

ranges[0].start = nil
ranges[len(ranges)-1].end = nil

return ranges
}

func (cd *compactDef) sizeInRange(tbls []*table.Table, level int, start, end []byte) int {
var sz int
left, right := 0, len(tbls)
if level != 0 {
left, right = getTablesInRange(tbls, start, end)
}
for _, tbl := range tbls[left:right] {
sz += tbl.ApproximateSizeInRange(start, end)
}
return sz
}

func (s *levelsController) fillTablesL0(cd *compactDef) bool {
cd.lockLevels()
defer cd.unlockLevels()
Expand Down Expand Up @@ -560,7 +608,96 @@ func (s *levelsController) fillTables(cd *compactDef) bool {
return false
}

func (s *levelsController) runCompactDef(l int, cd compactDef, limiter *rate.Limiter) (err error) {
func (s *levelsController) determineSubCompactPlan(bounds []rangeWithSize) (int, int) {
bobotu marked this conversation as resolved.
Show resolved Hide resolved
n := s.kv.opt.MaxSubCompaction
if len(bounds) < n {
n = len(bounds)
}

var size int
for _, bound := range bounds {
size += bound.sz
}
maxOutPutFiles := int(float32(size) / (4.0 / 5.0) / float32(s.kv.opt.MaxTableSize))
if maxOutPutFiles < n {
n = maxOutPutFiles
}

if n == 0 {
return 1, size
}
return n, size / n
}

func (s *levelsController) runSubCompacts(l int, cd compactDef, limiter *rate.Limiter) ([]*table.Table, error) {
type jobResult struct {
tbls []*table.Table
err error
}

inputBounds := cd.getInputBounds()
numSubCompact, avgSize := s.determineSubCompactPlan(inputBounds)
if numSubCompact == 1 {
return s.compactBuildTables(l, cd, limiter, nil, nil)
}

results := make([]jobResult, numSubCompact)
var wg sync.WaitGroup
var currSize, begin, jobNo int

for i := range inputBounds {
currSize += inputBounds[i].sz
if currSize >= avgSize || i == len(inputBounds)-1 {
start, end := inputBounds[begin].start, inputBounds[i].end

wg.Add(1)
go func(job int) {
newTables, err := s.compactBuildTables(l, cd, limiter, start, end)
results[job].tbls = newTables
results[job].err = err
wg.Done()
}(jobNo)

currSize = 0
begin = i + 1
jobNo++
}
}

log.Infof("Started %d SubCompaction Jobs", jobNo)
wg.Wait()

var numTables int
for _, result := range results {
if result.err != nil {
return nil, result.err
}
numTables += len(result.tbls)
}

newTables := make([]*table.Table, 0, numTables)
for _, result := range results {
newTables = append(newTables, result.tbls...)
}

return newTables, nil
}

func (s *levelsController) shouldStartSubCompaction(cd compactDef) bool {
if s.kv.opt.MaxSubCompaction <= 1 || len(cd.bot) == 0 {
return false
}
if cd.thisLevel.level == 0 {
return true
}
if cd.thisLevel.level == 1 {
// Only speed up large L1 compaction.
return len(cd.bot)+len(cd.top) >= 10
}
return false
}

func (s *levelsController) runCompactDef(l int, cd compactDef, limiter *rate.Limiter) error {
timeStart := time.Now()

thisLevel := cd.thisLevel
Expand All @@ -569,13 +706,19 @@ func (s *levelsController) runCompactDef(l int, cd compactDef, limiter *rate.Lim
// Table should never be moved directly between levels, always be rewritten to allow discarding
// invalid versions.

newTables, decr, err := s.compactBuildTables(l, cd, limiter)
var newTables []*table.Table
var err error
if s.shouldStartSubCompaction(cd) {
newTables, err = s.runSubCompacts(l, cd, limiter)
} else {
newTables, err = s.compactBuildTables(l, cd, limiter, nil, nil)
}
if err != nil {
return err
}
defer func() {
// Only assign to err, if it's not already nil.
if decErr := decr(); err == nil {
if decErr := decrRefs(newTables); err == nil {
err = decErr
}
}()
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Options struct {
// Number of compaction workers to run concurrently.
NumCompactors int

// Max number of sub compaction, set 1 or 0 to disable sub compaction.
MaxSubCompaction int

// Transaction start and commit timestamps are manaVgedTxns by end-user. This
// is a private option used by ManagedDB.
managedTxns bool
Expand Down Expand Up @@ -121,6 +124,7 @@ var DefaultOptions = Options{
MaxLevels: 7,
MaxTableSize: 64 << 20,
NumCompactors: 3,
MaxSubCompaction: 3,
NumLevelZeroTables: 5,
NumLevelZeroTablesStall: 10,
NumMemtables: 5,
Expand Down
16 changes: 10 additions & 6 deletions table/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,8 @@ func (itr *Iterator) seekFromOffset(blockIdx int, offset int, key []byte) {
itr.bi.seek(key)
}

// seekFrom brings us to a key that is >= input key.
func (itr *Iterator) seekFrom(key []byte) {
itr.err = nil
itr.reset()

idx := sort.Search(len(itr.t.blockEndOffsets), func(idx int) bool {
func (itr *Iterator) seekBlock(key []byte) int {
return sort.Search(len(itr.t.blockEndOffsets), func(idx int) bool {
baseKeyStartOff := 0
if idx > 0 {
baseKeyStartOff = int(itr.t.baseKeysEndOffs[idx-1])
Expand All @@ -242,6 +238,14 @@ func (itr *Iterator) seekFrom(key []byte) {
baseKey := itr.t.baseKeys[baseKeyStartOff:baseKeyEndOff]
return y.CompareKeys(baseKey, key) > 0
})
}

// seekFrom brings us to a key that is >= input key.
func (itr *Iterator) seekFrom(key []byte) {
itr.err = nil
itr.reset()

idx := itr.seekBlock(key)
if idx == 0 {
// The smallest key in our table is already strictly > key. We can return that.
// This is like a SeekToFirst.
Expand Down
19 changes: 19 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,25 @@ func (t *Table) block(idx int) (block, error) {
return blk, err
}

func (t *Table) ApproximateSizeInRange(start, end []byte) int {
it := t.NewIteratorNoRef(false)
startOff, endOff := t.approximateOffset(it, start), t.approximateOffset(it, end)
return endOff - startOff
}

func (t *Table) approximateOffset(it *Iterator, key []byte) int {
if y.CompareKeys(t.Biggest(), key) < 0 {
return int(t.blockEndOffsets[len(t.blockEndOffsets)-1])
} else if y.CompareKeys(t.Smallest(), key) > 0 {
return 0
}
blk := it.seekBlock(key)
if blk != 0 {
return int(t.blockEndOffsets[blk-1])
}
return 0
}

// Size is its file size in bytes
func (t *Table) Size() int64 { return int64(t.tableSize) }

Expand Down