Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support MultiGet operation #80

Merged
merged 5 commits into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,7 @@ func Open(opt Options) (db *DB, err error) {

headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
vs, err := db.get(headKey)
if err != nil {
return nil, errors.Wrap(err, "Retrieving head")
}
vs := db.get(headKey)
db.orc.curRead = vs.Version
var vptr valuePointer
if len(vs.Value) > 0 {
Expand Down Expand Up @@ -483,7 +480,7 @@ func (db *DB) getMemTables() []*table.MemTable {
// tables and find the max version among them. To maintain this invariant, we also need to ensure
// that all versions of a key are always present in the same table from level 1, because compaction
// can push any table down.
func (db *DB) get(key []byte) (y.ValueStruct, error) {
func (db *DB) get(key []byte) y.ValueStruct {
tables := db.getMemTables() // Lock should be released.
defer func() {
for _, tbl := range tables {
Expand All @@ -492,16 +489,46 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) {
}()

y.NumGets.Add(1)
for i := 0; i < len(tables); i++ {
vs := tables[i].Get(key)
for _, table := range tables {
vs := table.Get(key)
y.NumMemtableGets.Add(1)
if vs.Meta != 0 || vs.Value != nil {
return vs, nil
if vs.Valid() {
return vs
}
}
return db.lc.get(key)
}

func (db *DB) multiGet(pairs []keyValuePair) {
tables := db.getMemTables() // Lock should be released.
defer func() {
for _, tbl := range tables {
tbl.DecrRef()
}
}()
y.NumGets.Add(int64(len(pairs)))
var foundCount int
for _, table := range tables {
for j := range pairs {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for _, pair := range pairs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way, we cannot modify the pair in the slice because the pair is a value, not a pointer.

pair := &pairs[j]
if pair.found {
continue
}
val := table.Get(pair.key)
y.NumMemtableGets.Add(1)
if val.Valid() {
pair.val = val
pair.found = true
foundCount++
}
}
}
if foundCount == len(pairs) {
return
}
db.lc.multiGet(pairs)
}

func (db *DB) updateOffset(ptrs []valuePointer) {
var ptr valuePointer
for i := len(ptrs) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -804,10 +831,7 @@ func (db *DB) RunValueLogGC(discardRatio float64) error {
// Find head on disk
headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
vs, err := db.lc.get(headKey)
if err != nil {
return errors.Wrap(err, "Retrieving head from on-disk LSM")
}
vs := db.lc.get(headKey)

var head valuePointer
if len(vs.Value) > 0 {
Expand Down
21 changes: 19 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,7 @@ func TestGetMore(t *testing.T) {
got := string(getItemValue(t, item))
if expectedValue != got {

vs, err := db.get(y.KeyWithTs(k, math.MaxUint64))
require.NoError(t, err)
vs := db.get(y.KeyWithTs(k, math.MaxUint64))
fmt.Printf("wanted=%q Item: %s\n", k, item)
fmt.Printf("on re-run, got version: %+v\n", vs)

Expand All @@ -387,6 +386,24 @@ func TestGetMore(t *testing.T) {
txn.Discard()
}

// MultiGet
var multiGetKeys [][]byte
var expectedValues []string
for i := 0; i < n; i += 100 {
multiGetKeys = append(multiGetKeys, data(i))
// Set a long value to make sure we have enough sst tables.
expectedValues = append(expectedValues, fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", i))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test needs the value to be large enough, so there would be multiple levels in DB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments.

}
txn1 := db.NewTransaction(false)
items, err := txn1.MultiGet(multiGetKeys)
require.NoError(t, err)
for i, item := range items {
val, err1 := item.Value()
require.NoError(t, err1)
require.Equal(t, expectedValues[i], string(val))
}
txn1.Discard()

// "Delete" key.
for i := 0; i < n; i += m {
if (i % 10000) == 0 {
Expand Down
5 changes: 1 addition & 4 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ func (item *Item) yieldItemValue(dst []byte) ([]byte, error) {
// instead.
key := y.KeyWithTs(item.Key(), item.Version())
moveKey := append(badgerMove, key...)
vs, err := item.db.get(moveKey)
if err != nil {
return nil, err
}
vs := item.db.get(moveKey)
if vs.Version != item.Version() {
return nil, nil
}
Expand Down
167 changes: 119 additions & 48 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,23 @@ func (s *levelHandler) replaceTables(newTables []*table.Table) error {

func decrRefs(tables []*table.Table) error {
for _, table := range tables {
if table == nil {
continue
}
if err := table.DecrRef(); err != nil {
return err
}
}
return nil
}

func forceDecrRefs(tables []*table.Table) {
err := decrRefs(tables)
if err != nil {
panic(err)
}
}

func newLevelHandler(db *DB, level int) *levelHandler {
return &levelHandler{
level: level,
Expand Down Expand Up @@ -224,23 +234,50 @@ func (s *levelHandler) close() error {
return errors.Wrap(err, "levelHandler.close")
}

// getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
func (s *levelHandler) getTableForKey(key []byte) []*table.Table {
// refTablesForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/tableHandlers/table

func (s *levelHandler) refTablesForKey(key []byte) []*table.Table {
s.RLock()
defer s.RUnlock()

if s.level == 0 {
// For level 0, we need to check every table. Remember to make a copy as s.tables may change
// once we exit this function, and we don't want to lock s.tables while seeking in tables.
// CAUTION: Reverse the tables.
out := make([]*table.Table, 0, len(s.tables))
for i := len(s.tables) - 1; i >= 0; i-- {
out = append(out, s.tables[i])
s.tables[i].IncrRef()
}
return s.refLevel0Tables()
}
tbl := s.refLevelNTable(key)
if tbl == nil {
return nil
}
return []*table.Table{tbl}
}

// refTablesForKeys return tables for pairs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/return/returns/

// level0 returns all tables.
// level1+ returns tables for every key.
func (s *levelHandler) refTablesForKeys(pairs []keyValuePair) []*table.Table {
s.RLock()
defer s.RUnlock()
if s.level == 0 {
return s.refLevel0Tables()
}
out := make([]*table.Table, len(pairs))
for i, pair := range pairs {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some redundant tables, these tables may cause unnecessary search.
We should group by level, table, and keys. Also we do a lot of redundant work, such as recalculate key's hash when do bloom filter looking up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do more optimization in further PR.

out[i] = s.refLevelNTable(pair.key)
}
return out
}

return out
func (s *levelHandler) refLevel0Tables() []*table.Table {
// For level 0, we need to check every table. Remember to make a copy as s.tables may change
// once we exit this function, and we don't want to lock s.tables while seeking in tables.
// CAUTION: Reverse the tables.
out := make([]*table.Table, 0, len(s.tables))
for i := len(s.tables) - 1; i >= 0; i-- {
out = append(out, s.tables[i])
s.tables[i].IncrRef()
}
return out
}

func (s *levelHandler) refLevelNTable(key []byte) *table.Table {
// For level >= 1, we can do a binary search as key range does not overlap.
idx := sort.Search(len(s.tables), func(i int) bool {
return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
Expand All @@ -251,57 +288,91 @@ func (s *levelHandler) getTableForKey(key []byte) []*table.Table {
}
tbl := s.tables[idx]
tbl.IncrRef()
return []*table.Table{tbl}
return tbl
}

// get returns value for a given key or the key after that. If not found, return nil.
func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
tables := s.getTableForKey(key)
defer func() {
for _, t := range tables {
if err := t.DecrRef(); err != nil {
panic(err)
}
}
}()

keyNoTs := y.ParseKey(key)
func (s *levelHandler) get(key []byte) y.ValueStruct {
tables := s.refTablesForKey(key)
defer forceDecrRefs(tables)
return s.getInTables(key, tables)
}

var maxVs y.ValueStruct
for _, th := range tables {
var (
resultKey []byte
resultVs y.ValueStruct
ok bool
)
func (s *levelHandler) getInTables(key []byte, tables []*table.Table) y.ValueStruct {
for _, table := range tables {
result := s.getInTable(key, table)
if result.Valid() {
return result
}
}
return y.ValueStruct{}
}

if th.DoesNotHave(keyNoTs) {
continue
func (s *levelHandler) getInTable(key []byte, table *table.Table) (result y.ValueStruct) {
if table.DoesNotHave(y.ParseKey(key)) {
return
}
resultKey, resultVs, ok := table.PointGet(key)
if !ok {
it := table.NewIteratorNoRef(false)
it.Seek(key)
if !it.Valid() {
return
}
if !y.SameKey(key, it.Key()) {
return
}
resultKey, resultVs = it.Key(), it.Value()
} else if resultKey == nil {
return
}
result = resultVs
result.Version = y.ParseTs(resultKey)
return
}

func (s *levelHandler) multiGet(pairs []keyValuePair) {
tables := s.refTablesForKeys(pairs)
defer forceDecrRefs(tables)
if s.level == 0 {
s.multiGetLevel0(pairs, tables)
} else {
s.multiGetLevelN(pairs, tables)
}
}

resultKey, resultVs, ok = th.PointGet(key)
if !ok {
it := th.NewIteratorNoRef(false)
it.Seek(key)
if !it.Valid() {
func (s *levelHandler) multiGetLevel0(pairs []keyValuePair, tables []*table.Table) {
for _, table := range tables {
for i := range pairs {
pair := &pairs[i]
if pair.found {
continue
}
if !y.SameKey(key, it.Key()) {
continue
val := s.getInTable(pair.key, table)
if val.Valid() {
pair.val = val
pair.found = true
}
resultKey, resultVs = it.Key(), it.Value()
} else if resultKey == nil {
continue
}
}
}

if version := y.ParseTs(resultKey); maxVs.Version < version {
maxVs = resultVs
maxVs.Version = version
break
func (s *levelHandler) multiGetLevelN(pairs []keyValuePair, tables []*table.Table) {
for i := range pairs {
pair := &pairs[i]
if pair.found {
continue
}
table := tables[i]
if table == nil {
continue
}
val := s.getInTable(pair.key, table)
if val.Valid() {
pair.val = val
pair.found = true
}
}
maxVs.Value = y.SafeCopy(nil, maxVs.Value)
return maxVs, nil
}

// appendIterators appends iterators to an array of iterators, for merging.
Expand Down
20 changes: 11 additions & 9 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,23 +849,25 @@ func (s *levelsController) close() error {
}

// get returns the found value if any. If not found, we return nil.
func (s *levelsController) get(key []byte) (y.ValueStruct, error) {
func (s *levelsController) get(key []byte) y.ValueStruct {
// It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
// in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
// read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
// parallelize this, we will need to call the h.RLock() function by increasing order of level
// number.)
for _, h := range s.levels {
vs, err := h.get(key) // Calls h.RLock() and h.RUnlock().
if err != nil {
return y.ValueStruct{}, errors.Wrapf(err, "get key: %q", key)
}
if vs.Value == nil && vs.Meta == 0 {
continue
vs := h.get(key) // Calls h.RLock() and h.RUnlock().
if vs.Valid() {
return vs
}
return vs, nil
}
return y.ValueStruct{}, nil
return y.ValueStruct{}
}

func (s *levelsController) multiGet(pairs []keyValuePair) {
for _, h := range s.levels {
h.multiGet(pairs)
}
}

func appendIteratorsReversed(out []y.Iterator, th []*table.Table, reversed bool) []y.Iterator {
Expand Down
Loading