Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeySplits checks tables and memtables when number of splits is small. #1544

Merged
merged 14 commits into from
Oct 1, 2020
72 changes: 71 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var (
lfDiscardStatsKey = []byte("!badger!discard") // For storing lfDiscardStats
)

const (
maxNumSplits = 128
)

type closers struct {
updateSize *z.Closer
compactors *z.Closer
Expand Down Expand Up @@ -1450,15 +1454,81 @@ func (db *DB) Tables(withKeysCount bool) []TableInfo {
// the DB.
func (db *DB) KeySplits(prefix []byte) []string {
var splits []string
tables := db.Tables(false)

// We just want table ranges here and not keys count.
for _, ti := range db.Tables(false) {
for _, ti := range tables {
// We don't use ti.Left, because that has a tendency to store !badger
// keys.
if bytes.HasPrefix(ti.Right, prefix) {
splits = append(splits, string(ti.Right))
}
}

// If the number of splits is low, look at the offsets inside the
// tables to generate more splits.
if len(splits) < 32 {
numTables := len(tables)
if numTables == 0 {
numTables = 1
}
numPerTable := 32 / numTables
if numPerTable == 0 {
numPerTable = 1
}
splits = db.lc.keySplits(numPerTable, prefix)
}

// If the number of splits is still < 32, then look at the memtables.
if len(splits) < 32 {
maxPerSplit := 10000
mtSplits := func(mt *skl.Skiplist) {
count := 0
iter := mt.NewIterator()
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if count%maxPerSplit == 0 {
// Add a split every maxPerSplit keys.
if bytes.HasPrefix(iter.Key(), prefix) {
splits = append(splits, string(iter.Key()))
}
}
count += 1
}
_ = iter.Close()
}

db.Lock()
defer db.Unlock()
memtables := make([]*skl.Skiplist, 0)
memtables = append(memtables, db.imm...)
for _, mt := range memtables {
mtSplits(mt)
}
mtSplits(db.mt)
}

sort.Strings(splits)

// Limit the maximum number of splits returned by this function. We check against
// maxNumberSplits * 2 so that the jump variable has a value of at least two.
// Otherwise, the entire list would be returned without any reduction in size.
if len(splits) > maxNumSplits*2 {
newSplits := make([]string, 0)
jump := len(splits) / maxNumSplits
if jump < 2 {
jump = 2
}

for i := 0; i < len(splits); i += jump {
if i >= len(splits) {
i = len(splits) - 1
}
newSplits = append(newSplits, splits[i])
}

splits = newSplits
}

return splits
}

Expand Down
16 changes: 16 additions & 0 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,3 +1199,19 @@ func (s *levelsController) verifyChecksum() error {

return nil
}

// Returns the sorted list of splits for all the levels and tables based
// on the block offsets.
func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
splits := make([]string, 0)
for _, l := range s.levels {
l.RLock()
for _, t := range l.tables {
tableSplits := t.KeySplits(numPerTable, prefix)
splits = append(splits, tableSplits...)
}
l.RUnlock()
}
sort.Strings(splits)
return splits
}
85 changes: 85 additions & 0 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package badger

import (
"fmt"
"math"
"testing"
"time"
Expand Down Expand Up @@ -923,3 +924,87 @@ func TestLevelGet(t *testing.T) {
})
}
}

func TestKeyVersions(t *testing.T) {
inMemoryOpt := DefaultOptions("").
WithSyncWrites(false).
WithInMemory(true).
WithLogRotatesToFlush(math.MaxInt32).
WithMaxTableSize(4 << 20)

t.Run("disk", func(t *testing.T) {
t.Run("small table", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
l0 := make([]keyValVersion, 0)
for i := 0; i < 10; i++ {
l0 = append(l0, keyValVersion{fmt.Sprintf("%05d", i), "foo", 1, 0})
}
createAndOpen(db, l0, 0)
require.Equal(t, 1, len(db.KeySplits(nil)))
})
})
t.Run("medium table", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
l0 := make([]keyValVersion, 0)
for i := 0; i < 1000; i++ {
l0 = append(l0, keyValVersion{fmt.Sprintf("%05d", i), "foo", 1, 0})
}
createAndOpen(db, l0, 0)
require.Equal(t, 7, len(db.KeySplits(nil)))
})
})
t.Run("large table", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
l0 := make([]keyValVersion, 0)
for i := 0; i < 10000; i++ {
l0 = append(l0, keyValVersion{fmt.Sprintf("%05d", i), "foo", 1, 0})
}
createAndOpen(db, l0, 0)
require.Equal(t, 61, len(db.KeySplits(nil)))
})
})
t.Run("prefix", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
l0 := make([]keyValVersion, 0)
for i := 0; i < 1000; i++ {
l0 = append(l0, keyValVersion{fmt.Sprintf("%05d", i), "foo", 1, 0})
}
createAndOpen(db, l0, 0)
require.Equal(t, 0, len(db.KeySplits([]byte("a"))))
})
})
})

t.Run("in-memory", func(t *testing.T) {
t.Run("small table", func(t *testing.T) {
runBadgerTest(t, &inMemoryOpt, func(t *testing.T, db *DB) {
writer := db.newWriteBatch(false)
for i := 0; i < 10; i++ {
writer.Set([]byte(fmt.Sprintf("%05d", i)), []byte("foo"))
}
require.NoError(t, writer.Flush())
require.Equal(t, 1, len(db.KeySplits(nil)))
})
})
t.Run("large table", func(t *testing.T) {
runBadgerTest(t, &inMemoryOpt, func(t *testing.T, db *DB) {
writer := db.newWriteBatch(false)
for i := 0; i < 100000; i++ {
writer.Set([]byte(fmt.Sprintf("%05d", i)), []byte("foo"))
}
require.NoError(t, writer.Flush())
require.Equal(t, 11, len(db.KeySplits(nil)))
})
})
t.Run("prefix", func(t *testing.T) {
runBadgerTest(t, &inMemoryOpt, func(t *testing.T, db *DB) {
writer := db.newWriteBatch(false)
for i := 0; i < 10000; i++ {
writer.Set([]byte(fmt.Sprintf("%05d", i)), []byte("foo"))
}
require.NoError(t, writer.Flush())
require.Equal(t, 0, len(db.KeySplits([]byte("a"))))
})
})
})
}
25 changes: 25 additions & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package table

import (
"bytes"
"crypto/aes"
"encoding/binary"
"fmt"
Expand Down Expand Up @@ -479,6 +480,30 @@ func (t *Table) initIndex() (*pb.BlockOffset, error) {
return index.Offsets[0], nil
}

// KeySplits splits the table into at least n ranges based on the block offsets.
func (t *Table) KeySplits(n int, prefix []byte) []string {
if n == 0 {
return nil
}

var res []string
offsets := t.blockOffsets()
jump := len(offsets) / n
if jump == 0 {
jump = 1
}

for i := 0; i < len(offsets); i += jump {
if i >= len(offsets) {
i = len(offsets) - 1
}
if bytes.HasPrefix(offsets[i].Key, prefix) {
res = append(res, string(offsets[i].Key))
}
}
return res
}

// blockOffsets returns block offsets of this table.
func (t *Table) blockOffsets() []*pb.BlockOffset {
if t.opt.IndexCache == nil {
Expand Down