Skip to content

Commit

Permalink
kv: check iter.Next() errors. (pingcap#2198)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Dec 8, 2016
1 parent 450c904 commit 5f53175
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 31 deletions.
54 changes: 37 additions & 17 deletions kv/union_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

package kv

import "github.com/ngaut/log"
import (
"github.com/juju/errors"
"github.com/ngaut/log"
)

// UnionIter is the iterator on an UnionStore.
type UnionIter struct {
Expand Down Expand Up @@ -41,35 +44,39 @@ func newUnionIter(dirtyIt Iterator, snapshotIt Iterator, reverse bool) *UnionIte
}

// Go next and update valid status.
func (iter *UnionIter) dirtyNext() {
iter.dirtyIt.Next()
func (iter *UnionIter) dirtyNext() error {
err := iter.dirtyIt.Next()
iter.dirtyValid = iter.dirtyIt.Valid()
return errors.Trace(err)
}

// Go next and update valid status.
func (iter *UnionIter) snapshotNext() {
iter.snapshotIt.Next()
func (iter *UnionIter) snapshotNext() error {
err := iter.snapshotIt.Next()
iter.snapshotValid = iter.snapshotIt.Valid()
return errors.Trace(err)
}

func (iter *UnionIter) updateCur() {
func (iter *UnionIter) updateCur() error {
iter.isValid = true
for {
if !iter.dirtyValid && !iter.snapshotValid {
iter.isValid = false
return
break
}

if !iter.dirtyValid {
iter.curIsDirty = false
return
break
}

if !iter.snapshotValid {
iter.curIsDirty = true
// if delete it
if len(iter.dirtyIt.Value()) == 0 {
iter.dirtyNext()
if err := iter.dirtyNext(); err != nil {
return errors.Trace(err)
}
continue
}
break
Expand All @@ -88,12 +95,18 @@ func (iter *UnionIter) updateCur() {
if len(iter.dirtyIt.Value()) == 0 {
// snapshot has a record, but txn says we have deleted it
// just go next
iter.dirtyNext()
iter.snapshotNext()
if err := iter.dirtyNext(); err != nil {
return errors.Trace(err)
}
if err := iter.snapshotNext(); err != nil {
return errors.Trace(err)
}
continue
}
// both go next
iter.snapshotNext()
if err := iter.snapshotNext(); err != nil {
return errors.Trace(err)
}
iter.curIsDirty = true
break
} else if cmp > 0 {
Expand All @@ -105,25 +118,32 @@ func (iter *UnionIter) updateCur() {
if len(iter.dirtyIt.Value()) == 0 {
log.Warnf("[kv] delete a record not exists? k = %q", iter.dirtyIt.Key())
// jump over this deletion
iter.dirtyNext()
if err := iter.dirtyNext(); err != nil {
return errors.Trace(err)
}
continue
}
iter.curIsDirty = true
break
}
}
}
return nil
}

// Next implements the Iterator Next interface.
func (iter *UnionIter) Next() error {
var err error
if !iter.curIsDirty {
iter.snapshotNext()
err = iter.snapshotNext()
} else {
iter.dirtyNext()
err = iter.dirtyNext()
}
iter.updateCur()
return nil
if err != nil {
return errors.Trace(err)
}
err = iter.updateCur()
return errors.Trace(err)
}

// Value implements the Iterator Value interface.
Expand Down
9 changes: 0 additions & 9 deletions store/localstore/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ func createMemStore(suffix int) kv.Storage {
return store
}

func (t *testMvccSuite) addDirtyData() {
engineDB := t.s.(*dbStore).db
b := engineDB.NewBatch()
b.Put([]byte("\xf0dirty"), []byte("testvalue"))
b.Put([]byte("\x00dirty"), []byte("testvalue"))
engineDB.Commit(b)
}

func (t *testMvccSuite) TestMvccEncode(c *C) {
encodedKey1 := MvccEncodeVersionKey([]byte("A"), kv.Version{Ver: 1})
encodedKey2 := MvccEncodeVersionKey([]byte("A"), kv.Version{Ver: 2})
Expand Down Expand Up @@ -82,7 +74,6 @@ func (t *testMvccSuite) scanRawEngine(c *C, f func([]byte, []byte)) {
func (t *testMvccSuite) SetUpTest(c *C) {
// create new store
t.s = createMemStore(time.Now().Nanosecond())
t.addDirtyData()
// insert test data
txn, err := t.s.Begin()
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Scanner) Next() error {
if s.idx >= len(s.cache) {
if s.eof {
s.Close()
return kv.ErrNotExist
return nil
}
err := s.getData(bo)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions store/tikv/scan_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
if ch < byte('z') {
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Next(), IsNil)
}
c.Assert(scanner.Next(), NotNil)
c.Assert(scanner.Valid(), IsFalse)
}

0 comments on commit 5f53175

Please sign in to comment.