Skip to content

Commit

Permalink
feat(kv): define forward cursor interface (#16212)
Browse files Browse the repository at this point in the history
* feat(kv): define forward cursor interface

* feat(kv): implement ForwardCursor on bolt and inmem buckets

* feat(kv): update tests to capture forward cursor

* fix(kv): typo in docs

* feat(kv): add Err method to ForwardCursor interface

* feat(inmem): batch pair channel sends in forward cursor

* fix(kv): remove Err field from kv.Pair

* feat(kv): add Close to kv.ForwardCursor interface
  • Loading branch information
GeorgeMac authored and alexpaxton committed Jan 9, 2020
1 parent d7027d9 commit 19f8009
Show file tree
Hide file tree
Showing 6 changed files with 542 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
4. [16262](https://github.com/influxdata/influxdb/pull/16262): add support for check resource dry run functionality
5. [16275](https://github.com/influxdata/influxdb/pull/16275): add support for check resource apply functionality
6. [16283](https://github.com/influxdata/influxdb/pull/16283): add support for check resource export functionality
1. [16212](https://github.com/influxdata/influxdb/pull/16212): Add new kv.ForwardCursor interface

### Bug Fixes

Expand Down
82 changes: 78 additions & 4 deletions bolt/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"go.uber.org/zap"
)

// check that *KVStore implement kv.Store interface.
var _ (kv.Store) = (*KVStore)(nil)

// KVStore is a kv.Store backed by boltdb.
type KVStore struct {
path string
Expand Down Expand Up @@ -191,6 +194,22 @@ func (b *Bucket) Delete(key []byte) error {
return err
}

// ForwardCursor retrieves a cursor for iterating through the entries
// in the key value store in a given direction (ascending / descending).
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
var (
cursor = b.bucket.Cursor()
key, value = cursor.Seek(seek)
)

return &Cursor{
cursor: cursor,
key: key,
value: value,
config: kv.NewCursorConfig(opts...),
}, nil
}

// Cursor retrieves a cursor for iterating through the entries
// in the key value store.
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
Expand All @@ -203,10 +222,26 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
// in the key value store.
type Cursor struct {
cursor *bolt.Cursor

// previously seeked key/value
key, value []byte

config kv.CursorConfig
closed bool
}

// Close sets the closed to closed
func (c *Cursor) Close() error {
c.closed = true

return nil
}

// Seek seeks for the first key that matches the prefix provided.
func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.Seek(prefix)
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -216,6 +251,9 @@ func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {

// First retrieves the first key value pair in the bucket.
func (c *Cursor) First() ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.First()
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -225,6 +263,9 @@ func (c *Cursor) First() ([]byte, []byte) {

// Last retrieves the last key value pair in the bucket.
func (c *Cursor) Last() ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.Last()
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -233,19 +274,52 @@ func (c *Cursor) Last() ([]byte, []byte) {
}

// Next retrieves the next key in the bucket.
func (c *Cursor) Next() ([]byte, []byte) {
k, v := c.cursor.Next()
func (c *Cursor) Next() (k []byte, v []byte) {
if c.closed {
return nil, nil
}
// get and unset previously seeked values if they exist
k, v, c.key, c.value = c.key, c.value, nil, nil
if len(k) > 0 && len(v) > 0 {
return
}

next := c.cursor.Next
if c.config.Direction == kv.CursorDescending {
next = c.cursor.Prev
}

k, v = next()
if len(k) == 0 && len(v) == 0 {
return nil, nil
}
return k, v
}

// Prev retrieves the previous key in the bucket.
func (c *Cursor) Prev() ([]byte, []byte) {
k, v := c.cursor.Prev()
func (c *Cursor) Prev() (k []byte, v []byte) {
if c.closed {
return nil, nil
}
// get and unset previously seeked values if they exist
k, v, c.key, c.value = c.key, c.value, nil, nil
if len(k) > 0 && len(v) > 0 {
return
}

prev := c.cursor.Prev
if c.config.Direction == kv.CursorDescending {
prev = c.cursor.Next
}

k, v = prev()
if len(k) == 0 && len(v) == 0 {
return nil, nil
}
return k, v
}

// Err always returns nil as nothing can go wrong™ during iteration
func (c *Cursor) Err() error {
return nil
}
146 changes: 146 additions & 0 deletions inmem/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
"github.com/influxdata/influxdb/kv"
)

// ensure *KVStore implement kv.Store interface
var _ kv.Store = (*KVStore)(nil)

// cursorBatchSize is the size of a batch sent by a forward cursors
// tree iterator
const cursorBatchSize = 1000

// KVStore is an in memory btree backed kv.Store.
type KVStore struct {
mu sync.RWMutex
Expand Down Expand Up @@ -225,3 +232,142 @@ func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {

return pairs, nil
}

type pair struct {
kv.Pair
err error
}

// ForwardCursor returns a directional cursor which starts at the provided seeked key
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
var (
pairs = make(chan []pair)
stop = make(chan struct{})
send = func(batch []pair) bool {
if len(batch) == 0 {
return true
}

select {
case pairs <- batch:
return true
case <-stop:
return false
}
}
)

go func() {
defer close(pairs)

var (
batch []pair
config = kv.NewCursorConfig(opts...)
fn = config.Hints.PredicateFn
iterate = func(it btree.ItemIterator) {
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
}
)

if config.Direction == kv.CursorDescending {
iterate = func(it btree.ItemIterator) {
b.btree.DescendLessOrEqual(&item{key: seek}, it)
}
}

iterate(func(i btree.Item) bool {
select {
case <-stop:
// if signalled to stop then exit iteration
return false
default:
}

j, ok := i.(*item)
if !ok {
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})

return false
}

if fn == nil || fn(j.key, j.value) {
batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}})
}

if len(batch) < cursorBatchSize {
return true
}

if send(batch) {
// batch flushed successfully so we can
// begin a new batch
batch = nil

return true
}

// we've been signalled to stop
return false
})

// send if any left in batch
send(batch)
}()

return &ForwardCursor{pairs: pairs, stop: stop}, nil
}

// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
type ForwardCursor struct {
pairs <-chan []pair

cur []pair
n int

stop chan struct{}
closed bool
// error found during iteration
err error
}

// Err returns a non-nil error when an error occurred during cursor iteration.
func (c *ForwardCursor) Err() error {
return c.err
}

// Close releases the producing goroutines for the forward cursor.
// It blocks until the producing goroutine exits.
func (c *ForwardCursor) Close() error {
if c.closed {
return nil
}

close(c.stop)

c.closed = true

return nil
}

// Next returns the next key/value pair in the cursor
func (c *ForwardCursor) Next() ([]byte, []byte) {
if c.err != nil || c.closed {
return nil, nil
}

if c.n >= len(c.cur) {
var ok bool
c.cur, ok = <-c.pairs
if !ok {
return nil, nil
}

c.n = 0
}

pair := c.cur[c.n]
c.err = pair.err
c.n++

return pair.Key, pair.Value
}
61 changes: 61 additions & 0 deletions kv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type Bucket interface {
Put(key, value []byte) error
// Delete should error if the transaction it was called in is not writable.
Delete(key []byte) error
// ForwardCursor returns a forward cursor from the seek position provided.
// Other options can be supplied to provide direction and hints.
ForwardCursor(seek []byte, opts ...CursorOption) (ForwardCursor, error)
}

// Cursor is an abstraction for iterating/ranging through data. A concrete implementation
Expand All @@ -110,3 +113,61 @@ type Cursor interface {
// Prev moves the cursor to the prev key in the bucket.
Prev() (k []byte, v []byte)
}

// ForwardCursor is an abstraction for interacting/ranging through data in one direction.
type ForwardCursor interface {
// Next moves the cursor to the next key in the bucket.
Next() (k, v []byte)
// Err returns non-nil if an error occurred during cursor iteration.
// This should always be checked after Next returns a nil key/value.
Err() error
// Close is reponsible for freeing any resources created by the cursor.
Close() error
}

// CursorDirection is an integer used to define the direction
// a request cursor operates in.
type CursorDirection int

const (
// CursorAscending directs a cursor to range in ascending order
CursorAscending CursorDirection = iota
// CursorAscending directs a cursor to range in descending order
CursorDescending
)

// CursorConfig is a type used to configure a new forward cursor.
// It includes a direction and a set of hints
type CursorConfig struct {
Direction CursorDirection
Hints CursorHints
}

// NewCursorConfig constructs and configures a CursorConfig used to configure
// a forward cursor.
func NewCursorConfig(opts ...CursorOption) CursorConfig {
conf := CursorConfig{}
for _, opt := range opts {
opt(&conf)
}
return conf
}

// CursorOption is a functional option for configuring a forward cursor
type CursorOption func(*CursorConfig)

// WithCursorDirection sets the cursor direction on a provided cursor config
func WithCursorDirection(direction CursorDirection) CursorOption {
return func(c *CursorConfig) {
c.Direction = direction
}
}

// WithCursorHints configs the provided hints on the cursor config
func WithCursorHints(hints ...CursorHint) CursorOption {
return func(c *CursorConfig) {
for _, hint := range hints {
hint(&c.Hints)
}
}
}
Loading

0 comments on commit 19f8009

Please sign in to comment.