diff --git a/kv/index.go b/kv/index.go index ce2d0d899e2..33e28d35be3 100644 --- a/kv/index.go +++ b/kv/index.go @@ -128,7 +128,9 @@ func (i *Index) sourceBucket(tx Tx) (Bucket, error) { return tx.Bucket(i.SourceBucket()) } -func indexKey(foreignKey, primaryKey []byte) (newKey []byte) { +// IndexKey returns a value suitable for use as the key component +// when storing values in the index. +func IndexKey(foreignKey, primaryKey []byte) (newKey []byte) { newKey = make([]byte, len(primaryKey)+len(foreignKey)+1) copy(newKey, foreignKey) newKey[len(foreignKey)] = '/' @@ -157,7 +159,7 @@ func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error { return err } - return bkt.Put(indexKey(foreignKey, primaryKey), primaryKey) + return bkt.Put(IndexKey(foreignKey, primaryKey), primaryKey) } // Delete removes the foreignKey and primaryKey mapping from the underlying index. @@ -167,7 +169,7 @@ func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error { return err } - return bkt.Delete(indexKey(foreignKey, primaryKey)) + return bkt.Delete(IndexKey(foreignKey, primaryKey)) } // Walk walks the source bucket using keys found in the index using the provided foreign key @@ -195,16 +197,20 @@ func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn Visi return err } - return indexWalk(ctx, cursor, sourceBucket, visitFn) + return indexWalk(ctx, foreignKey, cursor, sourceBucket, visitFn) } // indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their // associated primaryKey's value in the provided source bucket. // When an item is located in the source, the provided visit function is called with primary key and associated value. -func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { +func indexWalk(ctx context.Context, foreignKey []byte, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { var keys [][]byte for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() { - keys = append(keys, pk) + if fk, _, err := indexKeyParts(ik); err != nil { + return err + } else if string(fk) == string(foreignKey) { + keys = append(keys, pk) + } } if err := indexCursor.Err(); err != nil { diff --git a/kv/index_migration.go b/kv/index_migration.go index b6ea2dfdd65..7b33f336712 100644 --- a/kv/index_migration.go +++ b/kv/index_migration.go @@ -128,7 +128,7 @@ func (i *IndexMigration) Populate(ctx context.Context, store Store) (n int, err for fk, fkm := range diff.MissingFromIndex { for pk := range fkm { - batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)}) + batch = append(batch, [2][]byte{IndexKey([]byte(fk), []byte(pk)), []byte(pk)}) if len(batch) >= i.operationBatchSize { if err := flush(batch); err != nil { @@ -183,7 +183,7 @@ func (i *IndexMigration) remove(ctx context.Context, store Store, mappings map[s for fk, fkm := range mappings { for pk := range fkm { - batch = append(batch, indexKey([]byte(fk), []byte(pk))) + batch = append(batch, IndexKey([]byte(fk), []byte(pk))) if len(batch) >= i.operationBatchSize { if err := flush(batch); err != nil { diff --git a/kv/index_test.go b/kv/index_test.go index 5b4726eaf25..090280935a6 100644 --- a/kv/index_test.go +++ b/kv/index_test.go @@ -7,9 +7,14 @@ import ( "os" "testing" + "github.com/golang/mock/gomock" "github.com/influxdata/influxdb/v2/bolt" "github.com/influxdata/influxdb/v2/inmem" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/kv/mock" influxdbtesting "github.com/influxdata/influxdb/v2/testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -33,6 +38,102 @@ func Test_Bolt_Index(t *testing.T) { influxdbtesting.TestIndex(t, s) } +func TestIndex_Walk(t *testing.T) { + t.Run("only selects exact keys", func(t *testing.T) { + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + type keyValue struct{ key, val []byte } + makeIndexKV := func(fk, pk string) keyValue { + return keyValue{ + key: kv.IndexKey([]byte(fk), []byte(pk)), + val: []byte(pk), + } + } + + makeKV := func(key, val string) keyValue { + return keyValue{[]byte(key), []byte(val)} + } + + var ( + sourceBucket = []byte("source") + indexBucket = []byte("index") + foreignKey = []byte("jenkins") + idxkeyvals = []keyValue{ + makeIndexKV("jenkins-aws", "pk1"), + makeIndexKV("jenkins-aws", "pk2"), + makeIndexKV("jenkins-aws", "pk3"), + makeIndexKV("jenkins", "pk4"), + makeIndexKV("jenkins", "pk5"), + } + srckeyvals = []struct{ key, val []byte }{ + makeKV("pk4", "val4"), + makeKV("pk5", "val5"), + } + ) + + mapping := kv.NewIndexMapping(sourceBucket, indexBucket, func(data []byte) ([]byte, error) { + return nil, nil + }) + + tx := mock.NewMockTx(ctrl) + + src := mock.NewMockBucket(ctrl) + src.EXPECT(). + GetBatch(srckeyvals[0].key, srckeyvals[1].key). + Return([][]byte{srckeyvals[0].val, srckeyvals[1].val}, nil) + + tx.EXPECT(). + Bucket(sourceBucket). + Return(src, nil) + + idx := mock.NewMockBucket(ctrl) + tx.EXPECT(). + Bucket(indexBucket). + Return(idx, nil) + + cur := mock.NewMockForwardCursor(ctrl) + + i := 0 + cur.EXPECT(). + Next(). + DoAndReturn(func() ([]byte, []byte) { + var k, v []byte + if i < len(idxkeyvals) { + elem := idxkeyvals[i] + i++ + k, v = elem.key, elem.val + } + + return k, v + }). + Times(len(idxkeyvals) + 1) + cur.EXPECT(). + Err(). + Return(nil) + cur.EXPECT(). + Close(). + Return(nil) + idx.EXPECT(). + ForwardCursor(foreignKey, gomock.Any()). + Return(cur, nil) + + ctx := context.Background() + index := kv.NewIndex(mapping, kv.WithIndexReadPathEnabled) + + j := 0 + err := index.Walk(ctx, tx, foreignKey, func(k, v []byte) (bool, error) { + require.Less(t, j, len(srckeyvals)) + assert.Equal(t, srckeyvals[j].key, k) + assert.Equal(t, srckeyvals[j].val, v) + j++ + return true, nil + }) + + assert.NoError(t, err) + }) +} + func Benchmark_Inmem_Index_Walk(b *testing.B) { influxdbtesting.BenchmarkIndexWalk(b, inmem.NewKVStore(), 1000, 200) } diff --git a/kv/mock/bucket.go b/kv/mock/bucket.go new file mode 100644 index 00000000000..7a9fd8b8262 --- /dev/null +++ b/kv/mock/bucket.go @@ -0,0 +1,135 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2/kv (interfaces: Bucket) + +// Package mock is a generated GoMock package. +package mock + +import ( + gomock "github.com/golang/mock/gomock" + kv "github.com/influxdata/influxdb/v2/kv" + reflect "reflect" +) + +// MockBucket is a mock of Bucket interface +type MockBucket struct { + ctrl *gomock.Controller + recorder *MockBucketMockRecorder +} + +// MockBucketMockRecorder is the mock recorder for MockBucket +type MockBucketMockRecorder struct { + mock *MockBucket +} + +// NewMockBucket creates a new mock instance +func NewMockBucket(ctrl *gomock.Controller) *MockBucket { + mock := &MockBucket{ctrl: ctrl} + mock.recorder = &MockBucketMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockBucket) EXPECT() *MockBucketMockRecorder { + return m.recorder +} + +// Cursor mocks base method +func (m *MockBucket) Cursor(arg0 ...kv.CursorHint) (kv.Cursor, error) { + m.ctrl.T.Helper() + varargs := []interface{}{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Cursor", varargs...) + ret0, _ := ret[0].(kv.Cursor) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Cursor indicates an expected call of Cursor +func (mr *MockBucketMockRecorder) Cursor(arg0 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cursor", reflect.TypeOf((*MockBucket)(nil).Cursor), arg0...) +} + +// Delete mocks base method +func (m *MockBucket) Delete(arg0 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete +func (mr *MockBucketMockRecorder) Delete(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockBucket)(nil).Delete), arg0) +} + +// ForwardCursor mocks base method +func (m *MockBucket) ForwardCursor(arg0 []byte, arg1 ...kv.CursorOption) (kv.ForwardCursor, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ForwardCursor", varargs...) + ret0, _ := ret[0].(kv.ForwardCursor) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ForwardCursor indicates an expected call of ForwardCursor +func (mr *MockBucketMockRecorder) ForwardCursor(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardCursor", reflect.TypeOf((*MockBucket)(nil).ForwardCursor), varargs...) +} + +// Get mocks base method +func (m *MockBucket) Get(arg0 []byte) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get +func (mr *MockBucketMockRecorder) Get(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockBucket)(nil).Get), arg0) +} + +// GetBatch mocks base method +func (m *MockBucket) GetBatch(arg0 ...[]byte) ([][]byte, error) { + m.ctrl.T.Helper() + varargs := []interface{}{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetBatch", varargs...) + ret0, _ := ret[0].([][]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetBatch indicates an expected call of GetBatch +func (mr *MockBucketMockRecorder) GetBatch(arg0 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBatch", reflect.TypeOf((*MockBucket)(nil).GetBatch), arg0...) +} + +// Put mocks base method +func (m *MockBucket) Put(arg0, arg1 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Put", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Put indicates an expected call of Put +func (mr *MockBucketMockRecorder) Put(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockBucket)(nil).Put), arg0, arg1) +} diff --git a/kv/mock/forward_cursor.go b/kv/mock/forward_cursor.go new file mode 100644 index 00000000000..cc066398065 --- /dev/null +++ b/kv/mock/forward_cursor.go @@ -0,0 +1,76 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2/kv (interfaces: ForwardCursor) + +// Package mock is a generated GoMock package. +package mock + +import ( + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockForwardCursor is a mock of ForwardCursor interface +type MockForwardCursor struct { + ctrl *gomock.Controller + recorder *MockForwardCursorMockRecorder +} + +// MockForwardCursorMockRecorder is the mock recorder for MockForwardCursor +type MockForwardCursorMockRecorder struct { + mock *MockForwardCursor +} + +// NewMockForwardCursor creates a new mock instance +func NewMockForwardCursor(ctrl *gomock.Controller) *MockForwardCursor { + mock := &MockForwardCursor{ctrl: ctrl} + mock.recorder = &MockForwardCursorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockForwardCursor) EXPECT() *MockForwardCursorMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockForwardCursor) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockForwardCursorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockForwardCursor)(nil).Close)) +} + +// Err mocks base method +func (m *MockForwardCursor) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(error) + return ret0 +} + +// Err indicates an expected call of Err +func (mr *MockForwardCursorMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockForwardCursor)(nil).Err)) +} + +// Next mocks base method +func (m *MockForwardCursor) Next() ([]byte, []byte) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].([]byte) + return ret0, ret1 +} + +// Next indicates an expected call of Next +func (mr *MockForwardCursorMockRecorder) Next() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockForwardCursor)(nil).Next)) +} diff --git a/kv/mock/tx.go b/kv/mock/tx.go new file mode 100644 index 00000000000..3a326fa12d6 --- /dev/null +++ b/kv/mock/tx.go @@ -0,0 +1,76 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2/kv (interfaces: Tx) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + gomock "github.com/golang/mock/gomock" + kv "github.com/influxdata/influxdb/v2/kv" + reflect "reflect" +) + +// MockTx is a mock of Tx interface +type MockTx struct { + ctrl *gomock.Controller + recorder *MockTxMockRecorder +} + +// MockTxMockRecorder is the mock recorder for MockTx +type MockTxMockRecorder struct { + mock *MockTx +} + +// NewMockTx creates a new mock instance +func NewMockTx(ctrl *gomock.Controller) *MockTx { + mock := &MockTx{ctrl: ctrl} + mock.recorder = &MockTxMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockTx) EXPECT() *MockTxMockRecorder { + return m.recorder +} + +// Bucket mocks base method +func (m *MockTx) Bucket(arg0 []byte) (kv.Bucket, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Bucket", arg0) + ret0, _ := ret[0].(kv.Bucket) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Bucket indicates an expected call of Bucket +func (mr *MockTxMockRecorder) Bucket(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bucket", reflect.TypeOf((*MockTx)(nil).Bucket), arg0) +} + +// Context mocks base method +func (m *MockTx) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context +func (mr *MockTxMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockTx)(nil).Context)) +} + +// WithContext mocks base method +func (m *MockTx) WithContext(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "WithContext", arg0) +} + +// WithContext indicates an expected call of WithContext +func (mr *MockTxMockRecorder) WithContext(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithContext", reflect.TypeOf((*MockTx)(nil).WithContext), arg0) +}