-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
index.go
628 lines (518 loc) · 16.2 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
package kv
import (
"bytes"
"context"
"errors"
"fmt"
)
const (
defaultPopulateBatchSize = 100
)
// Index is used to define and manage an index for a source bucket.
//
// When using the index you must provide it with an IndexMapping.
// The IndexMapping provides the index with the contract it needs to populate
// the entire index and traverse a populated index correctly.
// The IndexMapping provides a way to retrieve the key on which to index with
// when provided with the value from the source.
// It also provides the way to access the source bucket.
//
// The following is an illustration of its use:
//
// byUserID := func(v []byte) ([]byte, error) {
// auth := &influxdb.Authorization{}
//
// if err := json.Unmarshal(v, auth); err != nil {
// return err
// }
//
// return auth.UserID.Encode()
// }
//
// // configure a write only index
// indexByUser := NewIndex(NewSource([]byte(`authorizationsbyuserv1/), byUserID))
//
// indexByUser.Insert(tx, someUserID, someAuthID)
//
// indexByUser.Delete(tx, someUserID, someAuthID)
//
// indexByUser.Walk(tx, someUserID, func(k, v []byte) error {
// auth := &influxdb.Authorization{}
// if err := json.Unmarshal(v, auth); err != nil {
// return err
// }
//
// // do something with auth
//
// return nil
// })
//
// // populate entire index from source
// indexedCount, err := indexByUser.Populate(ctx, store)
//
// // verify the current index against the source and return the differences
// // found in each
// diff, err := indexByUser.Verify(ctx, tx)
type Index struct {
IndexMapping
// populateBatchSize configures the size of the batch used for insertion
populateBatchSize int
// canRead configures whether or not Walk accesses the index at all
// or skips the index altogether and returns nothing.
// This is used when you want to integrate only the write path before
// releasing the read path.
canRead bool
}
// IndexOption is a function which configures an index
type IndexOption func(*Index)
// WithIndexReadPathEnabled enables the read paths of the index (Walk)
// This should be enabled once the index has been fully populated and
// the Insert and Delete paths are correctly integrated.
func WithIndexReadPathEnabled(i *Index) {
i.canRead = true
}
// WithIndexPopulateBatchSize configures the size of each batch
// used when fully populating an index. (number of puts per tx)
func WithIndexPopulateBatchSize(n int) IndexOption {
return func(i *Index) {
i.populateBatchSize = n
}
}
// IndexMapping is a type which configures and Index to map items
// from a source bucket to an index bucket via a mapping known as
// IndexSourceOn. This function is called on the values in the source
// to derive the foreign key on which to index each item.
type IndexMapping interface {
SourceBucket() []byte
IndexBucket() []byte
IndexSourceOn(value []byte) (foreignKey []byte, err error)
}
// IndexSourceOnFunc is a function which can be used to derive the foreign key
// of a value in a source bucket.
type IndexSourceOnFunc func([]byte) ([]byte, error)
type indexMapping struct {
source []byte
index []byte
fn IndexSourceOnFunc
}
func (i indexMapping) SourceBucket() []byte { return i.source }
func (i indexMapping) IndexBucket() []byte { return i.index }
func (i indexMapping) IndexSourceOn(v []byte) ([]byte, error) {
return i.fn(v)
}
// NewIndexMapping creates an implementation of IndexMapping for the provided source bucket
// to a destination index bucket.
func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) IndexMapping {
return indexMapping{
source: sourceBucket,
index: indexBucket,
fn: fn,
}
}
// NewIndex configures and returns a new *Index for a given index mapping.
// By default the read path (Walk) is disabled. This is because the index needs to
// be fully populated before depending upon the read path.
// The read path can be enabled using WithIndexReadPathEnabled option.
func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index {
index := &Index{
IndexMapping: mapping,
populateBatchSize: defaultPopulateBatchSize,
}
for _, opt := range opts {
opt(index)
}
return index
}
func (i *Index) initialize(ctx context.Context, store Store) error {
return store.Update(ctx, func(tx Tx) error {
// create bucket if not exist
_, err := tx.Bucket(i.IndexBucket())
return err
})
}
func (i *Index) indexBucket(tx Tx) (Bucket, error) {
return tx.Bucket(i.IndexBucket())
}
func (i *Index) sourceBucket(tx Tx) (Bucket, error) {
return tx.Bucket(i.SourceBucket())
}
func indexKey(foreignKey, primaryKey []byte) (newKey []byte) {
newKey = make([]byte, len(primaryKey)+len(foreignKey)+1)
copy(newKey, foreignKey)
newKey[len(foreignKey)] = '/'
copy(newKey[len(foreignKey)+1:], primaryKey)
return
}
func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) {
// this function is called with items missing in index
parts := bytes.SplitN(indexKey, []byte("/"), 2)
if len(parts) < 2 {
return nil, nil, errors.New("malformed index key")
}
// parts are fk/pk
fk, pk = parts[0], parts[1]
return
}
// ensure IndexMigration implements MigrationSpec
var _ MigrationSpec = (*IndexMigration)(nil)
// IndexMigration is a migration for adding and removing an index.
// These are constructed via the Index.Migration function.
type IndexMigration struct {
*Index
opts []PopulateOption
}
// Name returns a readable name for the index migration.
func (i *IndexMigration) MigrationName() string {
return fmt.Sprintf("add index %q", string(i.IndexBucket()))
}
// Up initializes the index bucket and populates the index.
func (i *IndexMigration) Up(ctx context.Context, store Store) (err error) {
wrapErr := func(err error) error {
if err == nil {
return nil
}
return fmt.Errorf("migration (up) %s: %w", i.MigrationName(), err)
}
if err = i.initialize(ctx, store); err != nil {
return wrapErr(err)
}
_, err = i.Populate(ctx, store, i.opts...)
return wrapErr(err)
}
// Down deletes all entries from the index.
func (i *IndexMigration) Down(ctx context.Context, store Store) error {
if err := i.DeleteAll(ctx, store); err != nil {
return fmt.Errorf("migration (down) %s: %w", i.MigrationName(), err)
}
return nil
}
// Migration creates an IndexMigration for the underlying Index.
func (i *Index) Migration(opts ...PopulateOption) *IndexMigration {
return &IndexMigration{
Index: i,
opts: opts,
}
}
// Insert creates a single index entry for the provided primary key on the foreign key.
func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
bkt, err := i.indexBucket(tx)
if err != nil {
return err
}
return bkt.Put(indexKey(foreignKey, primaryKey), primaryKey)
}
// Delete removes the foreignKey and primaryKey mapping from the underlying index.
func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error {
bkt, err := i.indexBucket(tx)
if err != nil {
return err
}
return bkt.Delete(indexKey(foreignKey, primaryKey))
}
// Walk walks the source bucket using keys found in the index using the provided foreign key
// given the index has been fully populated.
func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn VisitFunc) error {
// skip walking if configured to do so as the index
// is currently being used purely to write the index
if !i.canRead {
return nil
}
sourceBucket, err := i.sourceBucket(tx)
if err != nil {
return err
}
indexBucket, err := i.indexBucket(tx)
if err != nil {
return err
}
cursor, err := indexBucket.ForwardCursor(foreignKey,
WithCursorPrefix(foreignKey))
if err != nil {
return err
}
return indexWalk(ctx, cursor, sourceBucket, visitFn)
}
// PopulateConfig configures a call to Populate
type PopulateConfig struct {
RemoveDanglingForeignKeys bool
}
// PopulateOption is a functional option for the Populate call
type PopulateOption func(*PopulateConfig)
// WithPopulateRemoveDanglingForeignKeys removes index entries which point to
// missing items in the source bucket.
func WithPopulateRemoveDanglingForeignKeys(c *PopulateConfig) {
c.RemoveDanglingForeignKeys = true
}
// Populate does a full population of the index using the IndexSourceOn IndexMapping function.
// Once completed it marks the index as ready for use.
// It return a nil error on success and the count of inserted items.
func (i *Index) Populate(ctx context.Context, store Store, opts ...PopulateOption) (n int, err error) {
var config PopulateConfig
for _, opt := range opts {
opt(&config)
}
// verify the index to derive missing index
// we can skip missing source lookup as we're
// only interested in populating the missing index
diff, err := i.verify(ctx, store, config.RemoveDanglingForeignKeys)
if err != nil {
return 0, fmt.Errorf("looking up missing indexes: %w", err)
}
flush := func(batch kvSlice) error {
if len(batch) == 0 {
return nil
}
if err := store.Update(ctx, func(tx Tx) error {
indexBucket, err := i.indexBucket(tx)
if err != nil {
return err
}
for _, pair := range batch {
// insert missing item into index
if err := indexBucket.Put(pair[0], pair[1]); err != nil {
return err
}
n++
}
return nil
}); err != nil {
return fmt.Errorf("updating index: %w", err)
}
return nil
}
var batch kvSlice
for fk, fkm := range diff.MissingFromIndex {
for pk := range fkm {
batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)})
if len(batch) >= i.populateBatchSize {
if err := flush(batch); err != nil {
return n, err
}
batch = batch[:0]
}
}
}
if err := flush(batch); err != nil {
return n, err
}
if config.RemoveDanglingForeignKeys {
return n, i.remove(ctx, store, diff.MissingFromSource)
}
return n, nil
}
// DeleteAll removes the entire index in batches
func (i *Index) DeleteAll(ctx context.Context, store Store) error {
diff, err := i.verify(ctx, store, true)
if err != nil {
return err
}
for k, v := range diff.MissingFromSource {
if fkm, ok := diff.PresentInIndex[k]; ok {
for pk := range v {
fkm[pk] = struct{}{}
}
continue
}
diff.PresentInIndex[k] = v
}
return i.remove(ctx, store, diff.PresentInIndex)
}
func (i *Index) remove(ctx context.Context, store Store, mappings map[string]map[string]struct{}) error {
var (
batch [][]byte
flush = func(batch [][]byte) error {
if len(batch) == 0 {
return nil
}
if err := store.Update(ctx, func(tx Tx) error {
indexBucket, err := i.indexBucket(tx)
if err != nil {
return err
}
for _, indexKey := range batch {
// delete dangling foreign key
if err := indexBucket.Delete(indexKey); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("removing dangling foreign keys: %w", err)
}
return nil
}
)
for fk, fkm := range mappings {
for pk := range fkm {
batch = append(batch, indexKey([]byte(fk), []byte(pk)))
if len(batch) >= i.populateBatchSize {
if err := flush(batch); err != nil {
return err
}
batch = batch[:0]
}
}
}
return flush(batch)
}
// IndexDiff contains a set of items present in the source not in index,
// along with a set of things in the index which are not in the source.
type IndexDiff struct {
// PresentInIndex is a map of foreign key to primary keys
// present in the index.
PresentInIndex map[string]map[string]struct{}
// MissingFromIndex is a map of foreign key to associated primary keys
// missing from the index given the source bucket.
// These items could be due to the fact an index populate migration has
// not yet occured, the index populate code is incorrect or the write path
// for your resource type does not yet insert into the index as well (Create actions).
MissingFromIndex map[string]map[string]struct{}
// MissingFromSource is a map of foreign key to associated primary keys
// missing from the source but accounted for in the index.
// This happens when index items are not properly removed from the index
// when an item is removed from the source (Delete actions).
MissingFromSource map[string]map[string]struct{}
}
func (i *IndexDiff) addMissingSource(fk, pk []byte) {
if i.MissingFromSource == nil {
i.MissingFromSource = map[string]map[string]struct{}{}
}
if _, ok := i.MissingFromSource[string(fk)]; !ok {
i.MissingFromSource[string(fk)] = map[string]struct{}{}
}
i.MissingFromSource[string(fk)][string(pk)] = struct{}{}
}
func (i *IndexDiff) addMissingIndex(fk, pk []byte) {
if i.MissingFromIndex == nil {
i.MissingFromIndex = map[string]map[string]struct{}{}
}
if _, ok := i.MissingFromIndex[string(fk)]; !ok {
i.MissingFromIndex[string(fk)] = map[string]struct{}{}
}
i.MissingFromIndex[string(fk)][string(pk)] = struct{}{}
}
// Corrupt returns a list of foreign keys which have corrupted indexes (partial)
// These are foreign keys which map to a subset of the primary keys which they should
// be associated with.
func (i *IndexDiff) Corrupt() (corrupt []string) {
for fk := range i.MissingFromIndex {
if _, ok := i.PresentInIndex[fk]; ok {
corrupt = append(corrupt, fk)
}
}
return
}
// Verify returns the difference between a source and its index
// The difference contains items in the source that are not in the index
// and vice-versa.
func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error) {
return i.verify(ctx, store, true)
}
func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bool) (diff IndexDiff, err error) {
diff.PresentInIndex, err = i.readEntireIndex(ctx, store)
if err != nil {
return diff, err
}
sourceKVs, err := consumeBucket(ctx, store, i.sourceBucket)
if err != nil {
return diff, err
}
// pks is a map of primary keys in source
pks := map[string]struct{}{}
// look for items missing from index
for _, kv := range sourceKVs {
pk, v := kv[0], kv[1]
if includeMissingSource {
// this is only useful for missing source
pks[string(pk)] = struct{}{}
}
fk, err := i.IndexSourceOn(v)
if err != nil {
return diff, err
}
fkm, ok := diff.PresentInIndex[string(fk)]
if ok {
_, ok = fkm[string(pk)]
}
if !ok {
diff.addMissingIndex(fk, pk)
}
}
if includeMissingSource {
// look for items missing from source
for fk, fkm := range diff.PresentInIndex {
for pk := range fkm {
if _, ok := pks[pk]; !ok {
diff.addMissingSource([]byte(fk), []byte(pk))
}
}
}
}
return
}
// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their
// associated primaryKey's value in the provided source bucket.
// When an item is located in the source, the provided visit function is called with primary key and associated value.
func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
var keys [][]byte
for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() {
keys = append(keys, pk)
}
if err := indexCursor.Err(); err != nil {
return err
}
if err := indexCursor.Close(); err != nil {
return err
}
values, err := sourceBucket.GetBatch(keys...)
if err != nil {
return err
}
for i, value := range values {
if value != nil {
if err := visit(keys[i], value); err != nil {
return err
}
}
}
return nil
}
// readEntireIndex returns the entire current state of the index
func (i *Index) readEntireIndex(ctx context.Context, store Store) (map[string]map[string]struct{}, error) {
kvs, err := consumeBucket(ctx, store, i.indexBucket)
if err != nil {
return nil, err
}
index := map[string]map[string]struct{}{}
for _, kv := range kvs {
fk, pk, err := indexKeyParts(kv[0])
if err != nil {
return nil, err
}
if fkm, ok := index[string(fk)]; ok {
fkm[string(pk)] = struct{}{}
continue
}
index[string(fk)] = map[string]struct{}{string(pk): {}}
}
return index, nil
}
type kvSlice [][2][]byte
// consumeBucket consumes the entire k/v space for the provided bucket function
// applied to the provided store
func consumeBucket(ctx context.Context, store Store, fn func(tx Tx) (Bucket, error)) (kvs kvSlice, err error) {
return kvs, store.View(ctx, func(tx Tx) error {
bkt, err := fn(tx)
if err != nil {
return err
}
cursor, err := bkt.ForwardCursor(nil)
if err != nil {
return err
}
return WalkCursor(ctx, cursor, func(k, v []byte) error {
kvs = append(kvs, [2][]byte{k, v})
return nil
})
})
}