-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support MultiGet operation #80
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -366,8 +366,7 @@ func TestGetMore(t *testing.T) { | |
got := string(getItemValue(t, item)) | ||
if expectedValue != got { | ||
|
||
vs, err := db.get(y.KeyWithTs(k, math.MaxUint64)) | ||
require.NoError(t, err) | ||
vs := db.get(y.KeyWithTs(k, math.MaxUint64)) | ||
fmt.Printf("wanted=%q Item: %s\n", k, item) | ||
fmt.Printf("on re-run, got version: %+v\n", vs) | ||
|
||
|
@@ -387,6 +386,24 @@ func TestGetMore(t *testing.T) { | |
txn.Discard() | ||
} | ||
|
||
// MultiGet | ||
var multiGetKeys [][]byte | ||
var expectedValues []string | ||
for i := 0; i < n; i += 100 { | ||
multiGetKeys = append(multiGetKeys, data(i)) | ||
// Set a long value to make sure we have enough sst tables. | ||
expectedValues = append(expectedValues, fmt.Sprintf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz%9d", i)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test needs the value to be large enough, so there would be multiple levels in DB. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add comments. |
||
} | ||
txn1 := db.NewTransaction(false) | ||
items, err := txn1.MultiGet(multiGetKeys) | ||
require.NoError(t, err) | ||
for i, item := range items { | ||
val, err1 := item.Value() | ||
require.NoError(t, err1) | ||
require.Equal(t, expectedValues[i], string(val)) | ||
} | ||
txn1.Discard() | ||
|
||
// "Delete" key. | ||
for i := 0; i < n; i += m { | ||
if (i % 10000) == 0 { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -174,13 +174,23 @@ func (s *levelHandler) replaceTables(newTables []*table.Table) error { | |
|
||
func decrRefs(tables []*table.Table) error { | ||
for _, table := range tables { | ||
if table == nil { | ||
continue | ||
} | ||
if err := table.DecrRef(); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func forceDecrRefs(tables []*table.Table) { | ||
err := decrRefs(tables) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
func newLevelHandler(db *DB, level int) *levelHandler { | ||
return &levelHandler{ | ||
level: level, | ||
|
@@ -224,23 +234,50 @@ func (s *levelHandler) close() error { | |
return errors.Wrap(err, "levelHandler.close") | ||
} | ||
|
||
// getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers. | ||
func (s *levelHandler) getTableForKey(key []byte) []*table.Table { | ||
// refTablesForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/tableHandlers/table |
||
func (s *levelHandler) refTablesForKey(key []byte) []*table.Table { | ||
s.RLock() | ||
defer s.RUnlock() | ||
|
||
if s.level == 0 { | ||
// For level 0, we need to check every table. Remember to make a copy as s.tables may change | ||
// once we exit this function, and we don't want to lock s.tables while seeking in tables. | ||
// CAUTION: Reverse the tables. | ||
out := make([]*table.Table, 0, len(s.tables)) | ||
for i := len(s.tables) - 1; i >= 0; i-- { | ||
out = append(out, s.tables[i]) | ||
s.tables[i].IncrRef() | ||
} | ||
return s.refLevel0Tables() | ||
} | ||
tbl := s.refLevelNTable(key) | ||
if tbl == nil { | ||
return nil | ||
} | ||
return []*table.Table{tbl} | ||
} | ||
|
||
// refTablesForKeys return tables for pairs. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/return/returns/ |
||
// level0 returns all tables. | ||
// level1+ returns tables for every key. | ||
func (s *levelHandler) refTablesForKeys(pairs []keyValuePair) []*table.Table { | ||
s.RLock() | ||
defer s.RUnlock() | ||
if s.level == 0 { | ||
return s.refLevel0Tables() | ||
} | ||
out := make([]*table.Table, len(pairs)) | ||
for i, pair := range pairs { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are some redundant tables, these tables may cause unnecessary search. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can do more optimization in further PR. |
||
out[i] = s.refLevelNTable(pair.key) | ||
} | ||
return out | ||
} | ||
|
||
return out | ||
func (s *levelHandler) refLevel0Tables() []*table.Table { | ||
// For level 0, we need to check every table. Remember to make a copy as s.tables may change | ||
// once we exit this function, and we don't want to lock s.tables while seeking in tables. | ||
// CAUTION: Reverse the tables. | ||
out := make([]*table.Table, 0, len(s.tables)) | ||
for i := len(s.tables) - 1; i >= 0; i-- { | ||
out = append(out, s.tables[i]) | ||
s.tables[i].IncrRef() | ||
} | ||
return out | ||
} | ||
|
||
func (s *levelHandler) refLevelNTable(key []byte) *table.Table { | ||
// For level >= 1, we can do a binary search as key range does not overlap. | ||
idx := sort.Search(len(s.tables), func(i int) bool { | ||
return y.CompareKeys(s.tables[i].Biggest(), key) >= 0 | ||
|
@@ -251,57 +288,91 @@ func (s *levelHandler) getTableForKey(key []byte) []*table.Table { | |
} | ||
tbl := s.tables[idx] | ||
tbl.IncrRef() | ||
return []*table.Table{tbl} | ||
return tbl | ||
} | ||
|
||
// get returns value for a given key or the key after that. If not found, return nil. | ||
func (s *levelHandler) get(key []byte) (y.ValueStruct, error) { | ||
tables := s.getTableForKey(key) | ||
defer func() { | ||
for _, t := range tables { | ||
if err := t.DecrRef(); err != nil { | ||
panic(err) | ||
} | ||
} | ||
}() | ||
|
||
keyNoTs := y.ParseKey(key) | ||
func (s *levelHandler) get(key []byte) y.ValueStruct { | ||
tables := s.refTablesForKey(key) | ||
defer forceDecrRefs(tables) | ||
return s.getInTables(key, tables) | ||
} | ||
|
||
var maxVs y.ValueStruct | ||
for _, th := range tables { | ||
var ( | ||
resultKey []byte | ||
resultVs y.ValueStruct | ||
ok bool | ||
) | ||
func (s *levelHandler) getInTables(key []byte, tables []*table.Table) y.ValueStruct { | ||
for _, table := range tables { | ||
result := s.getInTable(key, table) | ||
if result.Valid() { | ||
return result | ||
} | ||
} | ||
return y.ValueStruct{} | ||
} | ||
|
||
if th.DoesNotHave(keyNoTs) { | ||
continue | ||
func (s *levelHandler) getInTable(key []byte, table *table.Table) (result y.ValueStruct) { | ||
if table.DoesNotHave(y.ParseKey(key)) { | ||
return | ||
} | ||
resultKey, resultVs, ok := table.PointGet(key) | ||
if !ok { | ||
it := table.NewIteratorNoRef(false) | ||
it.Seek(key) | ||
if !it.Valid() { | ||
return | ||
} | ||
if !y.SameKey(key, it.Key()) { | ||
return | ||
} | ||
resultKey, resultVs = it.Key(), it.Value() | ||
} else if resultKey == nil { | ||
return | ||
} | ||
result = resultVs | ||
result.Version = y.ParseTs(resultKey) | ||
return | ||
} | ||
|
||
func (s *levelHandler) multiGet(pairs []keyValuePair) { | ||
tables := s.refTablesForKeys(pairs) | ||
defer forceDecrRefs(tables) | ||
if s.level == 0 { | ||
s.multiGetLevel0(pairs, tables) | ||
} else { | ||
s.multiGetLevelN(pairs, tables) | ||
} | ||
} | ||
|
||
resultKey, resultVs, ok = th.PointGet(key) | ||
if !ok { | ||
it := th.NewIteratorNoRef(false) | ||
it.Seek(key) | ||
if !it.Valid() { | ||
func (s *levelHandler) multiGetLevel0(pairs []keyValuePair, tables []*table.Table) { | ||
for _, table := range tables { | ||
for i := range pairs { | ||
pair := &pairs[i] | ||
if pair.found { | ||
continue | ||
} | ||
if !y.SameKey(key, it.Key()) { | ||
continue | ||
val := s.getInTable(pair.key, table) | ||
if val.Valid() { | ||
pair.val = val | ||
pair.found = true | ||
} | ||
resultKey, resultVs = it.Key(), it.Value() | ||
} else if resultKey == nil { | ||
continue | ||
} | ||
} | ||
} | ||
|
||
if version := y.ParseTs(resultKey); maxVs.Version < version { | ||
maxVs = resultVs | ||
maxVs.Version = version | ||
break | ||
func (s *levelHandler) multiGetLevelN(pairs []keyValuePair, tables []*table.Table) { | ||
for i := range pairs { | ||
pair := &pairs[i] | ||
if pair.found { | ||
continue | ||
} | ||
table := tables[i] | ||
if table == nil { | ||
continue | ||
} | ||
val := s.getInTable(pair.key, table) | ||
if val.Valid() { | ||
pair.val = val | ||
pair.found = true | ||
} | ||
} | ||
maxVs.Value = y.SafeCopy(nil, maxVs.Value) | ||
return maxVs, nil | ||
} | ||
|
||
// appendIterators appends iterators to an array of iterators, for merging. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for _, pair := range pairs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way, we cannot modify the pair in the slice because the
pair
is a value, not a pointer.