From 48b8cb84f72b25a01e34587448418c5506f5f4e3 Mon Sep 17 00:00:00 2001 From: George Date: Thu, 19 Dec 2019 17:30:05 +0100 Subject: [PATCH] feat(kv): define forward cursor interface (#16212) * feat(kv): define forward cursor interface * feat(kv): implement ForwardCursor on bolt and inmem buckets * feat(kv): update tests to capture forward cursor * fix(kv): typo in docs * feat(kv): add Err method to ForwardCursor interface * feat(inmem): batch pair channel sends in forward cursor * fix(kv): remove Err field from kv.Pair * feat(kv): add Close to kv.ForwardCursor interface --- CHANGELOG.md | 1 + bolt/kv.go | 82 ++++++++++++++++- inmem/kv.go | 146 ++++++++++++++++++++++++++++++ kv/store.go | 61 +++++++++++++ mock/kv.go | 14 ++- testing/kv.go | 246 ++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 542 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a182c4a6df1..9772f3c95be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ 4. [16262](https://github.com/influxdata/influxdb/pull/16262): add support for check resource dry run functionality 5. [16275](https://github.com/influxdata/influxdb/pull/16275): add support for check resource apply functionality 6. [16283](https://github.com/influxdata/influxdb/pull/16283): add support for check resource export functionality +1. [16212](https://github.com/influxdata/influxdb/pull/16212): Add new kv.ForwardCursor interface ### Bug Fixes diff --git a/bolt/kv.go b/bolt/kv.go index edeb35de597..35b1218ab24 100644 --- a/bolt/kv.go +++ b/bolt/kv.go @@ -13,6 +13,9 @@ import ( "go.uber.org/zap" ) +// check that *KVStore implement kv.Store interface. +var _ (kv.Store) = (*KVStore)(nil) + // KVStore is a kv.Store backed by boltdb. type KVStore struct { path string @@ -191,6 +194,22 @@ func (b *Bucket) Delete(key []byte) error { return err } +// ForwardCursor retrieves a cursor for iterating through the entries +// in the key value store in a given direction (ascending / descending). +func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) { + var ( + cursor = b.bucket.Cursor() + key, value = cursor.Seek(seek) + ) + + return &Cursor{ + cursor: cursor, + key: key, + value: value, + config: kv.NewCursorConfig(opts...), + }, nil +} + // Cursor retrieves a cursor for iterating through the entries // in the key value store. func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) { @@ -203,10 +222,26 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) { // in the key value store. type Cursor struct { cursor *bolt.Cursor + + // previously seeked key/value + key, value []byte + + config kv.CursorConfig + closed bool +} + +// Close sets the closed to closed +func (c *Cursor) Close() error { + c.closed = true + + return nil } // Seek seeks for the first key that matches the prefix provided. func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) { + if c.closed { + return nil, nil + } k, v := c.cursor.Seek(prefix) if len(k) == 0 && len(v) == 0 { return nil, nil @@ -216,6 +251,9 @@ func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) { // First retrieves the first key value pair in the bucket. func (c *Cursor) First() ([]byte, []byte) { + if c.closed { + return nil, nil + } k, v := c.cursor.First() if len(k) == 0 && len(v) == 0 { return nil, nil @@ -225,6 +263,9 @@ func (c *Cursor) First() ([]byte, []byte) { // Last retrieves the last key value pair in the bucket. func (c *Cursor) Last() ([]byte, []byte) { + if c.closed { + return nil, nil + } k, v := c.cursor.Last() if len(k) == 0 && len(v) == 0 { return nil, nil @@ -233,8 +274,22 @@ func (c *Cursor) Last() ([]byte, []byte) { } // Next retrieves the next key in the bucket. -func (c *Cursor) Next() ([]byte, []byte) { - k, v := c.cursor.Next() +func (c *Cursor) Next() (k []byte, v []byte) { + if c.closed { + return nil, nil + } + // get and unset previously seeked values if they exist + k, v, c.key, c.value = c.key, c.value, nil, nil + if len(k) > 0 && len(v) > 0 { + return + } + + next := c.cursor.Next + if c.config.Direction == kv.CursorDescending { + next = c.cursor.Prev + } + + k, v = next() if len(k) == 0 && len(v) == 0 { return nil, nil } @@ -242,10 +297,29 @@ func (c *Cursor) Next() ([]byte, []byte) { } // Prev retrieves the previous key in the bucket. -func (c *Cursor) Prev() ([]byte, []byte) { - k, v := c.cursor.Prev() +func (c *Cursor) Prev() (k []byte, v []byte) { + if c.closed { + return nil, nil + } + // get and unset previously seeked values if they exist + k, v, c.key, c.value = c.key, c.value, nil, nil + if len(k) > 0 && len(v) > 0 { + return + } + + prev := c.cursor.Prev + if c.config.Direction == kv.CursorDescending { + prev = c.cursor.Next + } + + k, v = prev() if len(k) == 0 && len(v) == 0 { return nil, nil } return k, v } + +// Err always returns nil as nothing can go wrong™ during iteration +func (c *Cursor) Err() error { + return nil +} diff --git a/inmem/kv.go b/inmem/kv.go index 287f7450377..17ead78c337 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -10,6 +10,13 @@ import ( "github.com/influxdata/influxdb/kv" ) +// ensure *KVStore implement kv.Store interface +var _ kv.Store = (*KVStore)(nil) + +// cursorBatchSize is the size of a batch sent by a forward cursors +// tree iterator +const cursorBatchSize = 1000 + // KVStore is an in memory btree backed kv.Store. type KVStore struct { mu sync.RWMutex @@ -225,3 +232,142 @@ func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) { return pairs, nil } + +type pair struct { + kv.Pair + err error +} + +// ForwardCursor returns a directional cursor which starts at the provided seeked key +func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) { + var ( + pairs = make(chan []pair) + stop = make(chan struct{}) + send = func(batch []pair) bool { + if len(batch) == 0 { + return true + } + + select { + case pairs <- batch: + return true + case <-stop: + return false + } + } + ) + + go func() { + defer close(pairs) + + var ( + batch []pair + config = kv.NewCursorConfig(opts...) + fn = config.Hints.PredicateFn + iterate = func(it btree.ItemIterator) { + b.btree.AscendGreaterOrEqual(&item{key: seek}, it) + } + ) + + if config.Direction == kv.CursorDescending { + iterate = func(it btree.ItemIterator) { + b.btree.DescendLessOrEqual(&item{key: seek}, it) + } + } + + iterate(func(i btree.Item) bool { + select { + case <-stop: + // if signalled to stop then exit iteration + return false + default: + } + + j, ok := i.(*item) + if !ok { + batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)}) + + return false + } + + if fn == nil || fn(j.key, j.value) { + batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}}) + } + + if len(batch) < cursorBatchSize { + return true + } + + if send(batch) { + // batch flushed successfully so we can + // begin a new batch + batch = nil + + return true + } + + // we've been signalled to stop + return false + }) + + // send if any left in batch + send(batch) + }() + + return &ForwardCursor{pairs: pairs, stop: stop}, nil +} + +// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree +type ForwardCursor struct { + pairs <-chan []pair + + cur []pair + n int + + stop chan struct{} + closed bool + // error found during iteration + err error +} + +// Err returns a non-nil error when an error occurred during cursor iteration. +func (c *ForwardCursor) Err() error { + return c.err +} + +// Close releases the producing goroutines for the forward cursor. +// It blocks until the producing goroutine exits. +func (c *ForwardCursor) Close() error { + if c.closed { + return nil + } + + close(c.stop) + + c.closed = true + + return nil +} + +// Next returns the next key/value pair in the cursor +func (c *ForwardCursor) Next() ([]byte, []byte) { + if c.err != nil || c.closed { + return nil, nil + } + + if c.n >= len(c.cur) { + var ok bool + c.cur, ok = <-c.pairs + if !ok { + return nil, nil + } + + c.n = 0 + } + + pair := c.cur[c.n] + c.err = pair.err + c.n++ + + return pair.Key, pair.Value +} diff --git a/kv/store.go b/kv/store.go index 2b45e418af7..24504565a3a 100644 --- a/kv/store.go +++ b/kv/store.go @@ -94,6 +94,9 @@ type Bucket interface { Put(key, value []byte) error // Delete should error if the transaction it was called in is not writable. Delete(key []byte) error + // ForwardCursor returns a forward cursor from the seek position provided. + // Other options can be supplied to provide direction and hints. + ForwardCursor(seek []byte, opts ...CursorOption) (ForwardCursor, error) } // Cursor is an abstraction for iterating/ranging through data. A concrete implementation @@ -110,3 +113,61 @@ type Cursor interface { // Prev moves the cursor to the prev key in the bucket. Prev() (k []byte, v []byte) } + +// ForwardCursor is an abstraction for interacting/ranging through data in one direction. +type ForwardCursor interface { + // Next moves the cursor to the next key in the bucket. + Next() (k, v []byte) + // Err returns non-nil if an error occurred during cursor iteration. + // This should always be checked after Next returns a nil key/value. + Err() error + // Close is reponsible for freeing any resources created by the cursor. + Close() error +} + +// CursorDirection is an integer used to define the direction +// a request cursor operates in. +type CursorDirection int + +const ( + // CursorAscending directs a cursor to range in ascending order + CursorAscending CursorDirection = iota + // CursorAscending directs a cursor to range in descending order + CursorDescending +) + +// CursorConfig is a type used to configure a new forward cursor. +// It includes a direction and a set of hints +type CursorConfig struct { + Direction CursorDirection + Hints CursorHints +} + +// NewCursorConfig constructs and configures a CursorConfig used to configure +// a forward cursor. +func NewCursorConfig(opts ...CursorOption) CursorConfig { + conf := CursorConfig{} + for _, opt := range opts { + opt(&conf) + } + return conf +} + +// CursorOption is a functional option for configuring a forward cursor +type CursorOption func(*CursorConfig) + +// WithCursorDirection sets the cursor direction on a provided cursor config +func WithCursorDirection(direction CursorDirection) CursorOption { + return func(c *CursorConfig) { + c.Direction = direction + } +} + +// WithCursorHints configs the provided hints on the cursor config +func WithCursorHints(hints ...CursorHint) CursorOption { + return func(c *CursorConfig) { + for _, hint := range hints { + hint(&c.Hints) + } + } +} diff --git a/mock/kv.go b/mock/kv.go index d7304bcddfb..87359a9c013 100644 --- a/mock/kv.go +++ b/mock/kv.go @@ -54,10 +54,11 @@ var _ (kv.Bucket) = (*Bucket)(nil) // Bucket is the abstraction used to perform get/put/delete/get-many operations // in a key value store type Bucket struct { - GetFn func(key []byte) ([]byte, error) - CursorFn func() (kv.Cursor, error) - PutFn func(key, value []byte) error - DeleteFn func(key []byte) error + GetFn func(key []byte) ([]byte, error) + CursorFn func() (kv.Cursor, error) + PutFn func(key, value []byte) error + DeleteFn func(key []byte) error + ForwardCursorFn func([]byte, ...kv.CursorOption) kv.ForwardCursor } // Get returns a key within this bucket. Errors if key does not exist. @@ -80,6 +81,11 @@ func (b *Bucket) Delete(key []byte) error { return b.DeleteFn(key) } +// ForwardCursor returns a cursor from the seek points in the configured direction. +func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) { + return b.ForwardCursorFn(seek, opts...), nil +} + var _ (kv.Cursor) = (*Cursor)(nil) // Cursor is an abstraction for iterating/ranging through data. A concrete implementation diff --git a/testing/kv.go b/testing/kv.go index 57b5468e683..8a24203837d 100644 --- a/testing/kv.go +++ b/testing/kv.go @@ -50,6 +50,10 @@ func KVStore( name: "CursorWithHints", fn: KVCursorWithHints, }, + { + name: "ForwardCursor", + fn: KVForwardCursor, + }, { name: "View", fn: KVView, @@ -672,6 +676,248 @@ func KVCursorWithHints( } } +// KVForwardCursor tests the forward cursor contract for the key value store. +func KVForwardCursor( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + type args struct { + seek string + direction kv.CursorDirection + until string + hints []kv.CursorHint + } + + pairs := func(keys ...string) []kv.Pair { + p := make([]kv.Pair, len(keys)) + for i, k := range keys { + p[i].Key = []byte(k) + p[i].Value = []byte("val:" + k) + } + return p + } + + tests := []struct { + name string + fields KVStoreFields + args args + exp []string + expErr error + }{ + { + name: "no hints", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa", + until: "bbb/00", + }, + exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"}, + }, + { + name: "prefix hint", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa", + until: "aaa/03", + hints: []kv.CursorHint{kv.WithCursorHintPrefix("aaa/")}, + }, + exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, + }, + { + name: "start hint", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa", + until: "bbb/00", + hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")}, + }, + exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03", "bbb/00"}, + }, + { + name: "predicate for key", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa", + until: "aaa/03", + hints: []kv.CursorHint{ + kv.WithCursorHintPredicate(func(key, _ []byte) bool { + return len(key) < 3 || string(key[:3]) == "aaa" + })}, + }, + exp: []string{"aaa/00", "aaa/01", "aaa/02", "aaa/03"}, + }, + { + name: "predicate for value", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "", + until: "aa/01", + hints: []kv.CursorHint{ + kv.WithCursorHintPredicate(func(_, val []byte) bool { + return len(val) < 7 || string(val[:7]) == "val:aa/" + })}, + }, + exp: []string{"aa/00", "aa/01"}, + }, + { + name: "no hints - descending", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "bbb/00", + until: "aaa/00", + direction: kv.CursorDescending, + }, + exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"}, + }, + { + name: "start hint - descending", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "bbb/00", + until: "aaa/00", + direction: kv.CursorDescending, + hints: []kv.CursorHint{kv.WithCursorHintKeyStart("aaa/")}, + }, + exp: []string{"bbb/00", "aaa/03", "aaa/02", "aaa/01", "aaa/00"}, + }, + { + name: "predicate for key - descending", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aaa/03", + until: "aaa/00", + direction: kv.CursorDescending, + hints: []kv.CursorHint{ + kv.WithCursorHintPredicate(func(key, _ []byte) bool { + return len(key) < 3 || string(key[:3]) == "aaa" + })}, + }, + exp: []string{"aaa/03", "aaa/02", "aaa/01", "aaa/00"}, + }, + { + name: "predicate for value - descending", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: pairs( + "aa/00", "aa/01", + "aaa/00", "aaa/01", "aaa/02", "aaa/03", + "bbb/00", "bbb/01", "bbb/02"), + }, + args: args{ + seek: "aa/01", + until: "aa/00", + direction: kv.CursorDescending, + hints: []kv.CursorHint{ + kv.WithCursorHintPredicate(func(_, val []byte) bool { + return len(val) >= 7 && string(val[:7]) == "val:aa/" + })}, + }, + exp: []string{"aa/01", "aa/00"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, fin := init(tt.fields, t) + defer fin() + + err := s.View(context.Background(), func(tx kv.Tx) error { + b, err := tx.Bucket([]byte("bucket")) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + cur, err := b.ForwardCursor([]byte(tt.args.seek), + kv.WithCursorDirection(tt.args.direction), + kv.WithCursorHints(tt.args.hints...)) + if err != nil { + t.Errorf("unexpected error: %v", err) + return err + } + + var got []string + + k, _ := cur.Next() + for len(k) > 0 { + got = append(got, string(k)) + if string(k) == tt.args.until { + break + } + + k, _ = cur.Next() + } + + if exp := tt.exp; !cmp.Equal(got, exp) { + t.Errorf("unexpected cursor values: -got/+exp\n%v", cmp.Diff(got, exp)) + } + + if err := cur.Err(); !cmp.Equal(err, tt.expErr) { + t.Errorf("expected error to be %v, got %v", tt.expErr, err) + } + + if err := cur.Close(); err != nil { + t.Errorf("expected cursor to close with nil error, found %v", err) + } + + return nil + }) + + if err != nil { + t.Fatalf("error during view transaction: %v", err) + } + }) + } +} + // KVView tests the view method contract for the key value store. func KVView( init func(KVStoreFields, *testing.T) (kv.Store, func()),