Skip to content

Commit

Permalink
feat(kv): implement ForwardCursor on bolt and inmem buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgeMac committed Dec 12, 2019
1 parent 9016a48 commit d0ac4c8
Showing 4 changed files with 101 additions and 6 deletions.
27 changes: 25 additions & 2 deletions bolt/kv.go
Original file line number Diff line number Diff line change
@@ -191,6 +191,17 @@ func (b *Bucket) Delete(key []byte) error {
return err
}

// ForwardCursor retrieves a cursor for iterating through the entries
// in the key value store in a given direction (ascending / descending).
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
cursor := b.bucket.Cursor()
cursor.Seek(seek)
return &Cursor{
cursor: cursor,
config: kv.NewCursorConfig(opts...),
}, nil
}

// Cursor retrieves a cursor for iterating through the entries
// in the key value store.
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
@@ -203,6 +214,8 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
// in the key value store.
type Cursor struct {
cursor *bolt.Cursor

config kv.CursorConfig
}

// Seek seeks for the first key that matches the prefix provided.
@@ -234,7 +247,12 @@ func (c *Cursor) Last() ([]byte, []byte) {

// Next retrieves the next key in the bucket.
func (c *Cursor) Next() ([]byte, []byte) {
k, v := c.cursor.Next()
next := c.cursor.Next
if c.config.Direction == kv.CursorDescending {
next = c.cursor.Prev
}

k, v := next()
if len(k) == 0 && len(v) == 0 {
return nil, nil
}
@@ -243,7 +261,12 @@ func (c *Cursor) Next() ([]byte, []byte) {

// Prev retrieves the previous key in the bucket.
func (c *Cursor) Prev() ([]byte, []byte) {
k, v := c.cursor.Prev()
prev := c.cursor.Prev
if c.config.Direction == kv.CursorDescending {
prev = c.cursor.Next
}

k, v := prev()
if len(k) == 0 && len(v) == 0 {
return nil, nil
}
56 changes: 56 additions & 0 deletions inmem/kv.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,9 @@ import (
"github.com/influxdata/influxdb/kv"
)

// ensure *KVStore implement kv.Store interface
var _ kv.Store = (*KVStore)(nil)

// KVStore is an in memory btree backed kv.Store.
type KVStore struct {
mu sync.RWMutex
@@ -232,3 +235,56 @@ func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {

return pairs, nil
}

// ForwardCursor returns a directional cursor which starts at the provided seeked key
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
pairs := make(chan kv.Pair)
go func() {
defer close(pairs)

var (
config = kv.NewCursorConfig(opts...)
fn = config.Hints.PredicateFn
iterate = func(it btree.ItemIterator) {
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
}
)

if config.Direction == kv.CursorDescending {
iterate = func(it btree.ItemIterator) {
b.btree.DescendLessOrEqual(&item{key: seek}, it)
}
}

iterate(func(i btree.Item) bool {
j, ok := i.(*item)
if !ok {
// this shouldn't happen
return false
}

if fn == nil || fn(j.key, j.value) {
pairs <- kv.Pair{Key: j.key, Value: j.value}
}

return true
})
}()

return &ForwardCursor{pairs}, nil
}

// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
type ForwardCursor struct {
pairs <-chan kv.Pair
}

// Next returns the next key/value pair in the cursor
func (c *ForwardCursor) Next() ([]byte, []byte) {
pair, ok := <-c.pairs
if !ok {
return nil, nil
}

return pair.Key, pair.Value
}
10 changes: 10 additions & 0 deletions kv/store.go
Original file line number Diff line number Diff line change
@@ -137,6 +137,16 @@ type CursorConfig struct {
Hints CursorHints
}

// NewCursorConfig constructs and configures a CursorConfig used to configure
// a forward cursor.
func NewCursorConfig(opts ...CursorOption) CursorConfig {
conf := CursorConfig{}
for _, opt := range opts {
opt(&conf)
}
return conf
}

// CursorOption is a functional option for configuring a forward cursor
type CursorOption func(*CursorConfig)

14 changes: 10 additions & 4 deletions mock/kv.go
Original file line number Diff line number Diff line change
@@ -54,10 +54,11 @@ var _ (kv.Bucket) = (*Bucket)(nil)
// Bucket is the abstraction used to perform get/put/delete/get-many operations
// in a key value store
type Bucket struct {
GetFn func(key []byte) ([]byte, error)
CursorFn func() (kv.Cursor, error)
PutFn func(key, value []byte) error
DeleteFn func(key []byte) error
GetFn func(key []byte) ([]byte, error)
CursorFn func() (kv.Cursor, error)
PutFn func(key, value []byte) error
DeleteFn func(key []byte) error
ForwardCursorFn func([]byte, ...kv.CursorOption) kv.ForwardCursor
}

// Get returns a key within this bucket. Errors if key does not exist.
@@ -80,6 +81,11 @@ func (b *Bucket) Delete(key []byte) error {
return b.DeleteFn(key)
}

// ForwardCursor returns a cursor from the seek points in the configured direction.
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
return b.ForwardCursorFn(seek, opts...), nil
}

var _ (kv.Cursor) = (*Cursor)(nil)

// Cursor is an abstraction for iterating/ranging through data. A concrete implementation

0 comments on commit d0ac4c8

Please sign in to comment.