Skip to content

Commit 0b12995

Browse files
authored
fix(store/v2): Fix PebbleDB Iteration Edge Cases (#18948)
1 parent 78ce70d commit 0b12995

File tree

3 files changed

+81
-13
lines changed

3 files changed

+81
-13
lines changed

store/storage/pebbledb/comparator.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,25 @@ func (f mvccKeyFormatter) Format(s fmt.State, verb rune) {
137137

138138
// SplitMVCCKey accepts an MVCC key and returns the "user" key, the MVCC version,
139139
// and a boolean indicating if the provided key is an MVCC key.
140+
//
141+
// Note, internally, we must make a copy of the provided mvccKey argument, which
142+
// typically comes from the Key() method as it's not safe.
140143
func SplitMVCCKey(mvccKey []byte) (key, version []byte, ok bool) {
141144
if len(mvccKey) == 0 {
142145
return nil, nil, false
143146
}
144147

145-
n := len(mvccKey) - 1
146-
tsLen := int(mvccKey[n])
148+
mvccKeyCopy := bytes.Clone(mvccKey)
149+
150+
n := len(mvccKeyCopy) - 1
151+
tsLen := int(mvccKeyCopy[n])
147152
if n < tsLen {
148153
return nil, nil, false
149154
}
150155

151-
key = mvccKey[:n-tsLen]
156+
key = mvccKeyCopy[:n-tsLen]
152157
if tsLen > 0 {
153-
version = mvccKey[n-tsLen+1 : len(mvccKey)-1]
158+
version = mvccKeyCopy[n-tsLen+1 : len(mvccKeyCopy)-1]
154159
}
155160

156161
return key, version, true

store/storage/pebbledb/iterator.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,15 @@ func (itr *iterator) Value() []byte {
113113
}
114114

115115
func (itr *iterator) Next() {
116+
currKey, _, ok := SplitMVCCKey(itr.source.Key())
117+
if !ok {
118+
// XXX: This should not happen as that would indicate we have a malformed
119+
// MVCC key.
120+
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
121+
}
122+
116123
var next bool
117124
if itr.reverse {
118-
currKey, _, ok := SplitMVCCKey(itr.source.Key())
119-
if !ok {
120-
// XXX: This should not happen as that would indicate we have a malformed
121-
// MVCC key.
122-
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
123-
}
124-
125125
// Since PebbleDB has no PrevPrefix API, we must manually seek to the next
126126
// key that is lexicographically less than the current key.
127127
next = itr.source.SeekLT(MVCCEncode(currKey, 0))
@@ -132,7 +132,7 @@ func (itr *iterator) Next() {
132132

133133
// First move the iterator to the next prefix, which may not correspond to the
134134
// desired version for that key, e.g. if the key was written at a later version,
135-
// so we seek back to the latest desired version, s.t. the version is <= itr.version.
135+
// so we seek back to the latest desired version, s.t. the version <= itr.version.
136136
if next {
137137
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
138138
if !ok {
@@ -147,10 +147,29 @@ func (itr *iterator) Next() {
147147
return
148148
}
149149

150-
// Move the iterator to the closest version to the desired version, so we
150+
// Move the iterator to the closest version of the desired version, so we
151151
// append the current iterator key to the prefix and seek to that key.
152152
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))
153153

154+
tmpKey, _, ok := SplitMVCCKey(itr.source.Key())
155+
if !ok {
156+
// XXX: This should not happen as that would indicate we have a malformed
157+
// MVCC key.
158+
itr.valid = false
159+
return
160+
}
161+
162+
// There exists cases where the SeekLT() call moved us back to the same key
163+
// we started at, so we must move to next key, i.e. two keys forward.
164+
if bytes.Equal(tmpKey, currKey) {
165+
if itr.source.NextPrefix() {
166+
itr.Next()
167+
} else {
168+
itr.valid = false
169+
return
170+
}
171+
}
172+
154173
// The cursor might now be pointing at a key/value pair that is tombstoned.
155174
// If so, we must move the cursor.
156175
if itr.valid && itr.cursorTombstoned() {

store/storage/storage_test_suite.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,10 +391,54 @@ func (s *StorageTestSuite) TestDatabase_IteratorMultiVersion() {
391391
i = (i + 1) % 10
392392
count++
393393
}
394+
394395
s.Require().Equal(10, count)
395396
s.Require().NoError(itr.Error())
396397
}
397398

399+
func (s *StorageTestSuite) TestDatabaseIterator_SkipVersion() {
400+
db, err := s.NewDB(s.T().TempDir())
401+
s.Require().NoError(err)
402+
403+
defer db.Close()
404+
405+
cs := store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
406+
{Key: []byte("keyC"), Value: []byte("value003")},
407+
}})
408+
s.Require().NoError(db.ApplyChangeset(58827506, cs))
409+
410+
cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
411+
{Key: []byte("keyE"), Value: []byte("value000")},
412+
}})
413+
s.Require().NoError(db.ApplyChangeset(58827506, cs))
414+
415+
cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
416+
{Key: []byte("keyF"), Value: []byte("value000")},
417+
}})
418+
s.Require().NoError(db.ApplyChangeset(58827506, cs))
419+
420+
cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
421+
{Key: []byte("keyC"), Value: []byte("value004")},
422+
}})
423+
s.Require().NoError(db.ApplyChangeset(58833605, cs))
424+
425+
cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
426+
{Key: []byte("keyD"), Value: []byte("value006")},
427+
}})
428+
s.Require().NoError(db.ApplyChangeset(58833606, cs))
429+
430+
itr, err := db.Iterator(storeKey1, 58831525, []byte("key"), nil)
431+
s.Require().NoError(err)
432+
defer itr.Close()
433+
434+
count := make(map[string]struct{})
435+
for ; itr.Valid(); itr.Next() {
436+
count[string(itr.Key())] = struct{}{}
437+
}
438+
439+
s.Require().Equal(3, len(count))
440+
}
441+
398442
func (s *StorageTestSuite) TestDatabase_IteratorNoDomain() {
399443
db, err := s.NewDB(s.T().TempDir())
400444
s.Require().NoError(err)

0 commit comments

Comments
 (0)