Skip to content

Commit

Permalink
Clean up set index refs when referenced values are deleted. Fixes #80
Browse files Browse the repository at this point in the history
Also fix related integrity check and improve errors messages
  • Loading branch information
plorenz committed Jul 22, 2024
1 parent ff2b2dc commit 3ff4ab9
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 2 deletions.
52 changes: 52 additions & 0 deletions boltz/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ type SetReadIndex interface {

type Constraint interface {
Checkable
Label() string
ProcessBeforeUpdate(ctx *IndexingContext)
ProcessAfterUpdate(ctx *IndexingContext)
ProcessBeforeDelete(ctx *IndexingContext)
Expand All @@ -281,6 +282,10 @@ type uniqueIndex struct {
indexPath []string
}

func (index *uniqueIndex) Label() string {
return fmt.Sprintf("unique index on %s.%s", index.symbol.GetStore().GetEntityType(), index.symbol.GetName())
}

func (index *uniqueIndex) Read(tx *bbolt.Tx, val []byte) []byte {
indexBucket := index.getIndexBucket(tx)
if indexBucket.Err != nil {
Expand Down Expand Up @@ -428,6 +433,10 @@ type setIndex struct {
listeners []SetChangeListener
}

func (index *setIndex) Label() string {
return fmt.Sprintf("set index on %s.%s", index.symbol.GetStore().GetEntityType(), index.symbol.GetName())
}

func (index *setIndex) AddListener(listener SetChangeListener) {
index.listeners = append(index.listeners, listener)
}
Expand Down Expand Up @@ -560,6 +569,9 @@ func (index *setIndex) ProcessBeforeDelete(ctx *IndexingContext) {
for _, val := range values {
indexBucket := index.getIndexBucket(ctx.Tx(), val.Value)
ctx.ErrHolder.SetError(indexBucket.DeleteListEntry(TypeString, ctx.RowId).Err)
if k, _ := indexBucket.Cursor().First(); k == nil {
ctx.ErrHolder.SetError(index.deleteIndexKey(ctx.Tx(), val.Value))
}
}
}
}
Expand Down Expand Up @@ -590,18 +602,22 @@ func (index *setIndex) deleteIndexKey(tx *bbolt.Tx, key []byte) error {
func (index *setIndex) CheckIntegrity(ctx MutateContext, fix bool, errorSink func(error, bool)) error {
tx := ctx.Tx()
if indexBaseBucket := Path(tx, index.indexPath...); indexBaseBucket != nil {
var toDelete []string
cursor := indexBaseBucket.Cursor()
for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
if indexBucket := indexBaseBucket.Bucket.Bucket(key); indexBucket != nil {
idsCursor := indexBucket.Cursor()
referenceCount := 0
for val, _ := idsCursor.First(); val != nil; val, _ = idsCursor.Next() {
referenceCount++
_, id := GetTypeAndValue(val)
if !index.symbol.GetStore().IsEntityPresent(tx, string(id)) {
// entry has been deleted, remove
if fix {
if err := idsCursor.Delete(); err != nil {
return err
}
referenceCount--
}
errorSink(errors.Errorf("for index on %v.%v, val %v references id %v, which doesn't exist",
index.symbol.GetStore().GetEntityType(), index.GetSymbol().GetName(),
Expand All @@ -621,20 +637,34 @@ func (index *setIndex) CheckIntegrity(ctx MutateContext, fix bool, errorSink fun
if err := idsCursor.Delete(); err != nil {
return err
}
referenceCount--
}
errorSink(errors.Errorf("for index on %v.%v, val %v references id %v, which doesn't contain the value",
index.symbol.GetStore().GetEntityType(), index.GetSymbol().GetName(),
string(key), string(id)), fix)
}
}
}
if referenceCount == 0 {
if fix {
toDelete = append(toDelete, string(key))
}
errorSink(errors.Errorf("for index on %s.%s, index value %s has no referenced values",
index.symbol.GetStore().GetEntityType(), index.GetSymbol().GetName(), string(key)), fix)
}
} else {
// If key has no values, delete the key
if err := cursor.Delete(); err != nil {
return err
}
}
}

for _, deleteKey := range toDelete {
if err := indexBaseBucket.DeleteBucket([]byte(deleteKey)); err != nil {
return errors.Wrapf(err, "error deleting unused key %s from index %s", deleteKey, index.Label())
}
}
}

for idsCursor := index.symbol.GetStore().IterateValidIds(tx, ast.BoolNodeTrue); idsCursor.IsValid(); idsCursor.Next() {
Expand Down Expand Up @@ -670,6 +700,12 @@ type fkIndex struct {
nullable bool
}

func (index *fkIndex) Label() string {
return fmt.Sprintf("fk index %s.%s -> %s.%s",
index.symbol.GetStore().GetEntityType(), index.symbol.GetName(),
index.fkSymbol.GetStore().GetEntityType(), index.fkSymbol.GetName())
}

func (index *fkIndex) ProcessBeforeUpdate(ctx *IndexingContext) {
if !ctx.ErrHolder.HasError() {
_, fieldValue := index.symbol.Eval(ctx.Tx(), ctx.RowId)
Expand Down Expand Up @@ -828,6 +864,12 @@ type fkDeleteConstraint struct {
fkSymbol EntitySymbol
}

func (index *fkDeleteConstraint) Label() string {
return fmt.Sprintf("fk delete contraint %s.%s -> %s.%s",
index.symbol.GetStore().GetEntityType(), index.symbol.GetName(),
index.fkSymbol.GetStore().GetEntityType(), index.fkSymbol.GetName())
}

func (index *fkDeleteConstraint) ProcessBeforeUpdate(_ *IndexingContext) {
}

Expand Down Expand Up @@ -870,6 +912,11 @@ type fkConstraint struct {
nullable bool
}

func (index *fkConstraint) Label() string {
return fmt.Sprintf("fk constraint %s.%s",
index.symbol.GetStore().GetEntityType(), index.symbol.GetName())
}

func (index *fkConstraint) ProcessBeforeUpdate(ctx *IndexingContext) {
if !ctx.ErrHolder.HasError() {
_, fieldValue := index.symbol.Eval(ctx.Tx(), ctx.RowId)
Expand Down Expand Up @@ -944,6 +991,11 @@ type fkDeleteCascadeConstraint struct {
cascadeType CascadeType
}

func (index *fkDeleteCascadeConstraint) Label() string {
return fmt.Sprintf("fk delete cascade index %s.%s",
index.symbol.GetStore().GetEntityType(), index.symbol.GetName())
}

func (index *fkDeleteCascadeConstraint) ProcessBeforeUpdate(*IndexingContext) {
}

Expand Down
7 changes: 6 additions & 1 deletion boltz/store_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package boltz

import (
"github.com/google/uuid"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/foundation/v2/stringz"
"github.com/openziti/storage/ast"
Expand Down Expand Up @@ -545,12 +546,16 @@ func (*BaseStore[E]) IteratorMatchingAnyOf(readIndex SetReadIndex, values []stri
func (store *BaseStore[E]) CheckIntegrity(ctx MutateContext, fix bool, errorSink func(err error, fixed bool)) error {
for _, linkCollection := range store.links {
if err := linkCollection.CheckIntegrity(ctx, fix, errorSink); err != nil {
pfxlog.Logger().WithError(err).Infof("error checking link collection %s.%s -> %s.%s",
linkCollection.GetFieldSymbol().GetStore().GetEntityType(), linkCollection.GetFieldSymbol().GetName(),
linkCollection.GetLinkedSymbol().GetStore().GetEntityType(), linkCollection.GetLinkedSymbol().GetName())
return err
}
}
for _, constraint := range store.Indexer.constraints {
if err := constraint.CheckIntegrity(ctx, fix, errorSink); err != nil {
return err
pfxlog.Logger().WithError(err).Infof("error checking link constraint: %s", constraint.Label())
return errors.Wrapf(err, "error checking constraint: %s", constraint.Label())
}
}
return nil
Expand Down
6 changes: 6 additions & 0 deletions boltz/system_entity_constraint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boltz

import (
"fmt"
"github.com/openziti/foundation/v2/errorz"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
Expand All @@ -17,6 +18,11 @@ type systemEntityConstraint struct {
systemFlagSymbol EntitySymbol
}

func (index *systemEntityConstraint) Label() string {
return fmt.Sprintf("system entity constraint %s.%s",
index.systemFlagSymbol.GetStore().GetEntityType(), index.systemFlagSymbol.GetName())
}

func (self *systemEntityConstraint) checkOperation(operation string, ctx *IndexingContext) error {
t, val := self.systemFlagSymbol.Eval(ctx.Tx(), ctx.RowId)
isSystem := FieldToBool(t, val)
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2
0.3

0 comments on commit 3ff4ab9

Please sign in to comment.