Skip to content

Commit

Permalink
tikvclient: Add endKey param to Scanner (#8178) (#8309)
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored and winkyao committed Nov 19, 2018
1 parent 4201baa commit 7ebfa28
Show file tree
Hide file tree
Showing 31 changed files with 173 additions and 128 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func (s *testDBSuite) TestTruncateTable(c *C) {
hasOldTableData := true
for i := 0; i < waitForCleanDataRound; i++ {
err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
it, err1 := txn.Seek(tablePrefix)
it, err1 := txn.Iter(tablePrefix, nil)
if err1 != nil {
return err1
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.d.store, false, func(txn kv.Transaction) error {
iter, err := txn.Seek(oldStartKey)
iter, err := txn.Iter(oldStartKey, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHa
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Seek(firstKey)
it, err := snap.Iter(firstKey, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
16 changes: 8 additions & 8 deletions kv/buffer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,26 @@ func (s *BufferStore) Get(k Key) ([]byte, error) {
return val, nil
}

// Seek implements the Retriever interface.
func (s *BufferStore) Seek(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Seek(k)
// Iter implements the Retriever interface.
func (s *BufferStore) Iter(k Key, upperBound Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.Seek(k)
retrieverIt, err := s.r.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
return NewUnionIter(bufferIt, retrieverIt, false)
}

// SeekReverse implements the Retriever interface.
func (s *BufferStore) SeekReverse(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.SeekReverse(k)
// IterReverse implements the Retriever interface.
func (s *BufferStore) IterReverse(k Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.SeekReverse(k)
retrieverIt, err := s.r.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion kv/buffer_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) {
err := bs.SaveTo(mutator)
c.Check(err, IsNil)

iter, err := mutator.Seek(nil)
iter, err := mutator.Iter(nil, nil)
c.Check(err, IsNil)
for iter.Valid() {
cmp := bytes.Compare(iter.Key(), iter.Value())
Expand Down
10 changes: 6 additions & 4 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ type Retriever interface {
// Get gets the value for key k from kv store.
// If corresponding kv pair does not exist, it returns nil and ErrNotExist.
Get(k Key) ([]byte, error)
// Seek creates an Iterator positioned on the first entry that k <= entry's key.
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
Seek(k Key) (Iterator, error)
Iter(k Key, upperBound Key) (Iterator, error)

// SeekReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
SeekReverse(k Key) (Iterator, error)
// TODO: Add lower bound limit
IterReverse(k Key) (Iterator, error)
}

// Mutator is the interface wraps the basic Set and Delete methods.
Expand Down
20 changes: 10 additions & 10 deletions kv/mem_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func valToStr(c *C, iter Iterator) string {
func checkNewIterator(c *C, buffer MemBuffer) {
for i := startIndex; i < testCount; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Seek(val)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(decodeInt([]byte(valToStr(c, iter))), Equals, i*indexStep)
Expand All @@ -86,7 +86,7 @@ func checkNewIterator(c *C, buffer MemBuffer) {
// Test iterator Next()
for i := startIndex; i < testCount-1; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Seek(val)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(valToStr(c, iter), Equals, string(val))
Expand All @@ -102,15 +102,15 @@ func checkNewIterator(c *C, buffer MemBuffer) {
}

// Non exist and beyond maximum seek test
iter, err := buffer.Seek(encodeInt(testCount * indexStep))
iter, err := buffer.Iter(encodeInt(testCount*indexStep), nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

// Non exist but between existing keys seek test,
// it returns the smallest key that larger than the one we are seeking
inBetween := encodeInt((testCount-1)*indexStep - 1)
last := encodeInt((testCount - 1) * indexStep)
iter, err = buffer.Seek(inBetween)
iter, err = buffer.Iter(inBetween, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsTrue)
c.Assert([]byte(iter.Key()), Not(BytesEquals), inBetween)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *testKVSuite) TestNewIterator(c *C) {
defer testleak.AfterTest(c)()
for _, buffer := range s.bs {
// should be invalid
iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

Expand All @@ -155,7 +155,7 @@ func (s *testKVSuite) TestIterNextUntil(c *C) {
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
insertData(c, buffer)

iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)

err = NextUntil(iter, func(k Key) bool {
Expand All @@ -168,7 +168,7 @@ func (s *testKVSuite) TestIterNextUntil(c *C) {
func (s *testKVSuite) TestBasicNewIterator(c *C) {
defer testleak.AfterTest(c)()
for _, buffer := range s.bs {
it, err := buffer.Seek([]byte("2"))
it, err := buffer.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsFalse)
}
Expand All @@ -193,15 +193,15 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) {
}

cnt := 0
it, err := buffer.Seek(nil)
it, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
for it.Valid() {
cnt++
it.Next()
}
c.Assert(cnt, Equals, 6)

it, err = buffer.Seek([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"))
it, err = buffer.Iter([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"), nil)
c.Assert(err, IsNil)
c.Assert(string(it.Key()), Equals, "DATA_test_main_db_tbl_tbl_test_record__00000000000000000001")
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func benchIterator(b *testing.B, buffer MemBuffer) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
iter, err := buffer.Seek(nil)
iter, err := buffer.Iter(nil, nil)
if err != nil {
b.Error(err)
}
Expand Down
16 changes: 6 additions & 10 deletions kv/memdb_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,10 @@ func NewMemDbBuffer(cap int) MemBuffer {
}
}

// Seek creates an Iterator.
func (m *memDbBuffer) Seek(k Key) (Iterator, error) {
var i Iterator
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: false}
} else {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k)}), reverse: false}
}
// Iter creates an Iterator.
func (m *memDbBuffer) Iter(k Key, upperBound Key) (Iterator, error) {
i := &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k), Limit: []byte(upperBound)}), reverse: false}

err := i.Next()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -69,7 +65,7 @@ func (m *memDbBuffer) SetCap(cap int) {

}

func (m *memDbBuffer) SeekReverse(k Key) (Iterator, error) {
func (m *memDbBuffer) IterReverse(k Key) (Iterator, error) {
var i *memDbIter
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: true}
Expand Down Expand Up @@ -161,7 +157,7 @@ func (i *memDbIter) Close() {

// WalkMemBuffer iterates all buffered kv pairs in memBuf
func WalkMemBuffer(memBuf MemBuffer, f func(k Key, v []byte) error) error {
iter, err := memBuf.Seek(nil)
iter, err := memBuf.Iter(nil, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func (t *mockTxn) Get(k Key) ([]byte, error) {
return nil, nil
}

func (t *mockTxn) Seek(k Key) (Iterator, error) {
func (t *mockTxn) Iter(k Key, upperBound Key) (Iterator, error) {
return nil, nil
}

func (t *mockTxn) SeekReverse(k Key) (Iterator, error) {
func (t *mockTxn) IterReverse(k Key) (Iterator, error) {
return nil, nil
}

Expand Down Expand Up @@ -207,10 +207,10 @@ func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {
return m, nil
}

func (s *mockSnapshot) Seek(k Key) (Iterator, error) {
return s.store.Seek(k)
func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
return s.store.Iter(k, upperBound)
}

func (s *mockSnapshot) SeekReverse(k Key) (Iterator, error) {
return s.store.SeekReverse(k)
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}
4 changes: 2 additions & 2 deletions kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (s testMockSuite) TestInterface(c *C) {
if transaction.IsReadOnly() {
transaction.Get(Key("lock"))
transaction.Set(Key("lock"), []byte{})
transaction.Seek(Key("lock"))
transaction.SeekReverse(Key("lock"))
transaction.Iter(Key("lock"), nil)
transaction.IterReverse(Key("lock"))
}
transaction.Commit(context.Background())

Expand Down
8 changes: 4 additions & 4 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ func (lmb *lazyMemBuffer) Delete(k Key) error {
return lmb.mb.Delete(k)
}

func (lmb *lazyMemBuffer) Seek(k Key) (Iterator, error) {
func (lmb *lazyMemBuffer) Iter(k Key, upperBound Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.Seek(k)
return lmb.mb.Iter(k, upperBound)
}

func (lmb *lazyMemBuffer) SeekReverse(k Key) (Iterator, error) {
func (lmb *lazyMemBuffer) IterReverse(k Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.SeekReverse(k)
return lmb.mb.IterReverse(k)
}

func (lmb *lazyMemBuffer) Size() int {
Expand Down
18 changes: 9 additions & 9 deletions kv/union_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,46 @@ func (s *testUnionStoreSuite) TestSeek(c *C) {
s.store.Set([]byte("2"), []byte("2"))
s.store.Set([]byte("3"), []byte("3"))

iter, err := s.us.Seek(nil)
iter, err := s.us.Iter(nil, nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("1"), []byte("2"), []byte("3")}, [][]byte{[]byte("1"), []byte("2"), []byte("3")})

iter, err = s.us.Seek([]byte("2"))
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("3")}, [][]byte{[]byte("2"), []byte("3")})

s.us.Set([]byte("4"), []byte("4"))
iter, err = s.us.Seek([]byte("2"))
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("3"), []byte("4")}, [][]byte{[]byte("2"), []byte("3"), []byte("4")})

s.us.Delete([]byte("3"))
iter, err = s.us.Seek([]byte("2"))
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("4")}, [][]byte{[]byte("2"), []byte("4")})
}

func (s *testUnionStoreSuite) TestSeekReverse(c *C) {
func (s *testUnionStoreSuite) TestIterReverse(c *C) {
defer testleak.AfterTest(c)()
s.store.Set([]byte("1"), []byte("1"))
s.store.Set([]byte("2"), []byte("2"))
s.store.Set([]byte("3"), []byte("3"))

iter, err := s.us.SeekReverse(nil)
iter, err := s.us.IterReverse(nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("3"), []byte("2"), []byte("1")}, [][]byte{[]byte("3"), []byte("2"), []byte("1")})

iter, err = s.us.SeekReverse([]byte("3"))
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("1")}, [][]byte{[]byte("2"), []byte("1")})

s.us.Set([]byte("0"), []byte("0"))
iter, err = s.us.SeekReverse([]byte("3"))
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("1"), []byte("0")}, [][]byte{[]byte("2"), []byte("1"), []byte("0")})

s.us.Delete([]byte("1"))
iter, err = s.us.SeekReverse([]byte("3"))
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("0")}, [][]byte{[]byte("2"), []byte("0")})
}
Expand Down
16 changes: 8 additions & 8 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,26 +163,26 @@ func (st *TxnState) Delete(k kv.Key) error {
return st.buf.Delete(k)
}

// Seek overrides the Transaction interface.
func (st *TxnState) Seek(k kv.Key) (kv.Iterator, error) {
bufferIt, err := st.buf.Seek(k)
// Iter overrides the Transaction interface.
func (st *TxnState) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
bufferIt, err := st.buf.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := st.Transaction.Seek(k)
retrieverIt, err := st.Transaction.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
return kv.NewUnionIter(bufferIt, retrieverIt, false)
}

// SeekReverse overrides the Transaction interface.
func (st *TxnState) SeekReverse(k kv.Key) (kv.Iterator, error) {
bufferIt, err := st.buf.SeekReverse(k)
// IterReverse overrides the Transaction interface.
func (st *TxnState) IterReverse(k kv.Key) (kv.Iterator, error) {
bufferIt, err := st.buf.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := st.Transaction.SeekReverse(k)
retrieverIt, err := st.Transaction.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 7ebfa28

Please sign in to comment.