Skip to content

Commit

Permalink
feat(kv): add Close to kv.ForwardCursor interface
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgeMac committed Dec 19, 2019
1 parent ab04e4f commit d6fcc65
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
4. [16262](https://github.com/influxdata/influxdb/pull/16262): add support for check resource dry run functionality
5. [16275](https://github.com/influxdata/influxdb/pull/16275): add support for check resource apply functionality
6. [16283](https://github.com/influxdata/influxdb/pull/16283): add support for check resource export functionality
1. [16212](https://github.com/influxdata/influxdb/pull/16212): Add new kv.ForwardCursor interface

### Bug Fixes

Expand Down
26 changes: 26 additions & 0 deletions bolt/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"go.uber.org/zap"
)

// check that *KVStore implement kv.Store interface.
var _ (kv.Store) = (*KVStore)(nil)

// KVStore is a kv.Store backed by boltdb.
type KVStore struct {
path string
Expand Down Expand Up @@ -224,10 +227,21 @@ type Cursor struct {
key, value []byte

config kv.CursorConfig
closed bool
}

// Close sets the closed to closed
func (c *Cursor) Close() error {
c.closed = true

return nil
}

// Seek seeks for the first key that matches the prefix provided.
func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.Seek(prefix)
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -237,6 +251,9 @@ func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {

// First retrieves the first key value pair in the bucket.
func (c *Cursor) First() ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.First()
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -246,6 +263,9 @@ func (c *Cursor) First() ([]byte, []byte) {

// Last retrieves the last key value pair in the bucket.
func (c *Cursor) Last() ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.Last()
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -255,6 +275,9 @@ func (c *Cursor) Last() ([]byte, []byte) {

// Next retrieves the next key in the bucket.
func (c *Cursor) Next() (k []byte, v []byte) {
if c.closed {
return nil, nil
}
// get and unset previously seeked values if they exist
k, v, c.key, c.value = c.key, c.value, nil, nil
if len(k) > 0 && len(v) > 0 {
Expand All @@ -275,6 +298,9 @@ func (c *Cursor) Next() (k []byte, v []byte) {

// Prev retrieves the previous key in the bucket.
func (c *Cursor) Prev() (k []byte, v []byte) {
if c.closed {
return nil, nil
}
// get and unset previously seeked values if they exist
k, v, c.key, c.value = c.key, c.value, nil, nil
if len(k) > 0 && len(v) > 0 {
Expand Down
63 changes: 52 additions & 11 deletions inmem/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,28 @@ type pair struct {

// ForwardCursor returns a directional cursor which starts at the provided seeked key
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
pairs := make(chan []pair)
var (
pairs = make(chan []pair)
stop = make(chan struct{})
send = func(batch []pair) bool {
if len(batch) == 0 {
return true
}

select {
case pairs <- batch:
return true
case <-stop:
return false
}
}
)

go func() {
defer close(pairs)

var (
batch []pair
config = kv.NewCursorConfig(opts...)
fn = config.Hints.PredicateFn
iterate = func(it btree.ItemIterator) {
Expand All @@ -258,9 +275,14 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
}
}

var batch []pair

iterate(func(i btree.Item) bool {
select {
case <-stop:
// if signalled to stop then exit iteration
return false
default:
}

j, ok := i.(*item)
if !ok {
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})
Expand All @@ -276,20 +298,23 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
return true
}

pairs <- batch
if send(batch) {
// batch flushed successfully so we can
// begin a new batch
batch = nil

batch = nil
return true
}

return true
// we've been signalled to stop
return false
})

// send if any left in batch
if len(batch) > 0 {
pairs <- batch
}
send(batch)
}()

return &ForwardCursor{pairs: pairs}, nil
return &ForwardCursor{pairs: pairs, stop: stop}, nil
}

// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
Expand All @@ -299,6 +324,8 @@ type ForwardCursor struct {
cur []pair
n int

stop chan struct{}
closed bool
// error found during iteration
err error
}
Expand All @@ -308,9 +335,23 @@ func (c *ForwardCursor) Err() error {
return c.err
}

// Close releases the producing goroutines for the forward cursor.
// It blocks until the producing goroutine exits.
func (c *ForwardCursor) Close() error {
if c.closed {
return nil
}

close(c.stop)

c.closed = true

return nil
}

// Next returns the next key/value pair in the cursor
func (c *ForwardCursor) Next() ([]byte, []byte) {
if c.err != nil {
if c.err != nil || c.closed {
return nil, nil
}

Expand Down
2 changes: 0 additions & 2 deletions kv/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ type staticCursor struct {
}

// Pair is a struct for key value pairs.
// It also includes an error which should only be non-nil
// when the Key and Value are nil.
type Pair struct {
Key []byte
Value []byte
Expand Down
6 changes: 4 additions & 2 deletions kv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ type Bucket interface {
// Cursor is an abstraction for iterating/ranging through data. A concrete implementation
// of a cursor can be found in cursor.go.
type Cursor interface {
// Next moves the cursor to the next key in the bucket.
Next() (k, v []byte)
// Seek moves the cursor forward until reaching prefix in the key name.
Seek(prefix []byte) (k []byte, v []byte)
// First moves the cursor to the first key in the bucket.
First() (k []byte, v []byte)
// Last moves the cursor to the last key in the bucket.
Last() (k []byte, v []byte)
// Next moves the cursor to the next key in the bucket.
Next() (k, v []byte)
// Prev moves the cursor to the prev key in the bucket.
Prev() (k []byte, v []byte)
}
Expand All @@ -121,6 +121,8 @@ type ForwardCursor interface {
// Err returns non-nil if an error occurred during cursor iteration.
// This should always be checked after Next returns a nil key/value.
Err() error
// Close is reponsible for freeing any resources created by the cursor.
Close() error
}

// CursorDirection is an integer used to define the direction
Expand Down
9 changes: 9 additions & 0 deletions testing/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ func KVForwardCursor(
fields KVStoreFields
args args
exp []string
expErr error
}{
{
name: "no hints",
Expand Down Expand Up @@ -899,6 +900,14 @@ func KVForwardCursor(
t.Errorf("unexpected cursor values: -got/+exp\n%v", cmp.Diff(got, exp))
}

if err := cur.Err(); !cmp.Equal(err, tt.expErr) {
t.Errorf("expected error to be %v, got %v", tt.expErr, err)
}

if err := cur.Close(); err != nil {
t.Errorf("expected cursor to close with nil error, found %v", err)
}

return nil
})

Expand Down

0 comments on commit d6fcc65

Please sign in to comment.