diff --git a/bolt/kv.go b/bolt/kv.go index edeb35de597..6d31d5833f7 100644 --- a/bolt/kv.go +++ b/bolt/kv.go @@ -191,6 +191,17 @@ func (b *Bucket) Delete(key []byte) error { return err } +// ForwardCursor retrieves a cursor for iterating through the entries +// in the key value store in a given direction (ascending / descending). +func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) { + cursor := b.bucket.Cursor() + cursor.Seek(seek) + return &Cursor{ + cursor: cursor, + config: kv.NewCursorConfig(opts...), + }, nil +} + // Cursor retrieves a cursor for iterating through the entries // in the key value store. func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) { @@ -203,6 +214,8 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) { // in the key value store. type Cursor struct { cursor *bolt.Cursor + + config kv.CursorConfig } // Seek seeks for the first key that matches the prefix provided. @@ -234,7 +247,12 @@ func (c *Cursor) Last() ([]byte, []byte) { // Next retrieves the next key in the bucket. func (c *Cursor) Next() ([]byte, []byte) { - k, v := c.cursor.Next() + next := c.cursor.Next + if c.config.Direction == kv.CursorDescending { + next = c.cursor.Prev + } + + k, v := next() if len(k) == 0 && len(v) == 0 { return nil, nil } @@ -243,7 +261,12 @@ func (c *Cursor) Next() ([]byte, []byte) { // Prev retrieves the previous key in the bucket. func (c *Cursor) Prev() ([]byte, []byte) { - k, v := c.cursor.Prev() + prev := c.cursor.Prev + if c.config.Direction == kv.CursorDescending { + prev = c.cursor.Next + } + + k, v := prev() if len(k) == 0 && len(v) == 0 { return nil, nil } diff --git a/inmem/kv.go b/inmem/kv.go index da719fd11c0..0f34a760e41 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -10,6 +10,9 @@ import ( "github.com/influxdata/influxdb/kv" ) +// ensure *KVStore implement kv.Store interface +var _ kv.Store = (*KVStore)(nil) + // KVStore is an in memory btree backed kv.Store. type KVStore struct { mu sync.RWMutex @@ -232,3 +235,56 @@ func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) { return pairs, nil } + +// ForwardCursor returns a directional cursor which starts at the provided seeked key +func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) { + pairs := make(chan kv.Pair) + go func() { + defer close(pairs) + + var ( + config = kv.NewCursorConfig(opts...) + fn = config.Hints.PredicateFn + iterate = func(it btree.ItemIterator) { + b.btree.AscendGreaterOrEqual(&item{key: seek}, it) + } + ) + + if config.Direction == kv.CursorDescending { + iterate = func(it btree.ItemIterator) { + b.btree.DescendLessOrEqual(&item{key: seek}, it) + } + } + + iterate(func(i btree.Item) bool { + j, ok := i.(*item) + if !ok { + // this shouldn't happen + return false + } + + if fn == nil || fn(j.key, j.value) { + pairs <- kv.Pair{Key: j.key, Value: j.value} + } + + return true + }) + }() + + return &ForwardCursor{pairs}, nil +} + +// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree +type ForwardCursor struct { + pairs <-chan kv.Pair +} + +// Next returns the next key/value pair in the cursor +func (c *ForwardCursor) Next() ([]byte, []byte) { + pair, ok := <-c.pairs + if !ok { + return nil, nil + } + + return pair.Key, pair.Value +} diff --git a/kv/store.go b/kv/store.go index a20b7460868..c08fc3d45c1 100644 --- a/kv/store.go +++ b/kv/store.go @@ -137,6 +137,16 @@ type CursorConfig struct { Hints CursorHints } +// NewCursorConfig constructs and configures a CursorConfig used to configure +// a forward cursor. +func NewCursorConfig(opts ...CursorOption) CursorConfig { + conf := CursorConfig{} + for _, opt := range opts { + opt(&conf) + } + return conf +} + // CursorOption is a functional option for configuring a forward cursor type CursorOption func(*CursorConfig) diff --git a/mock/kv.go b/mock/kv.go index d7304bcddfb..87359a9c013 100644 --- a/mock/kv.go +++ b/mock/kv.go @@ -54,10 +54,11 @@ var _ (kv.Bucket) = (*Bucket)(nil) // Bucket is the abstraction used to perform get/put/delete/get-many operations // in a key value store type Bucket struct { - GetFn func(key []byte) ([]byte, error) - CursorFn func() (kv.Cursor, error) - PutFn func(key, value []byte) error - DeleteFn func(key []byte) error + GetFn func(key []byte) ([]byte, error) + CursorFn func() (kv.Cursor, error) + PutFn func(key, value []byte) error + DeleteFn func(key []byte) error + ForwardCursorFn func([]byte, ...kv.CursorOption) kv.ForwardCursor } // Get returns a key within this bucket. Errors if key does not exist. @@ -80,6 +81,11 @@ func (b *Bucket) Delete(key []byte) error { return b.DeleteFn(key) } +// ForwardCursor returns a cursor from the seek points in the configured direction. +func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) { + return b.ForwardCursorFn(seek, opts...), nil +} + var _ (kv.Cursor) = (*Cursor)(nil) // Cursor is an abstraction for iterating/ranging through data. A concrete implementation