From 4b0d5c323517ff09cae4a8bc1bf52d4ad3bbfa05 Mon Sep 17 00:00:00 2001 From: Jeronimo Date: Thu, 16 May 2024 13:28:00 -0300 Subject: [PATCH] fix: distinguish not found vs deleted object Signed-off-by: Jeronimo --- adapters/repos/db/lsmkv/bucket.go | 41 ++++++++++++++++++++++++ adapters/repos/db/lsmkv/segment_group.go | 32 ++++++++++++++++++ adapters/repos/db/replication.go | 9 +++--- adapters/repos/db/shard.go | 1 + adapters/repos/db/shard_lazyloader.go | 7 ++++ adapters/repos/db/shard_read.go | 23 +++++++++++++ 6 files changed, 108 insertions(+), 5 deletions(-) diff --git a/adapters/repos/db/lsmkv/bucket.go b/adapters/repos/db/lsmkv/bucket.go index 62097ed7f2..0d37501c7c 100644 --- a/adapters/repos/db/lsmkv/bucket.go +++ b/adapters/repos/db/lsmkv/bucket.go @@ -338,6 +338,47 @@ func (b *Bucket) Get(key []byte) ([]byte, error) { return b.disk.get(key) } +func (b *Bucket) GetErrNotFound(key []byte) ([]byte, error) { + b.flushLock.RLock() + defer b.flushLock.RUnlock() + + v, err := b.active.get(key) + if err == nil { + // item found and no error, return and stop searching, since the strategy + // is replace + return v, nil + } + if errors.Is(err, lsmkv.Deleted) { + // deleted in the mem-table (which is always the latest) means we don't + // have to check the disk segments, return nil now + return nil, err + } + + if err != lsmkv.NotFound { + panic(fmt.Sprintf("unsupported error in bucket.Get: %v\n", err)) + } + + if b.flushing != nil { + v, err := b.flushing.get(key) + if err == nil { + // item found and no error, return and stop searching, since the strategy + // is replace + return v, nil + } + if errors.Is(err, lsmkv.Deleted) { + // deleted in the now most recent memtable means we don't have to check + // the disk segments, return nil now + return nil, err + } + + if err != lsmkv.NotFound { + panic("unsupported error in bucket.Get") + } + } + + return b.disk.getErrNotFound(key) +} + // GetBySecondary retrieves an object using one of its secondary keys. A bucket // can have an infinite number of secondary keys. Specify the secondary key // position as the first argument. diff --git a/adapters/repos/db/lsmkv/segment_group.go b/adapters/repos/db/lsmkv/segment_group.go index cb85aefee0..cdffd5f3be 100644 --- a/adapters/repos/db/lsmkv/segment_group.go +++ b/adapters/repos/db/lsmkv/segment_group.go @@ -228,6 +228,38 @@ func (sg *SegmentGroup) getWithUpperSegmentBoundary(key []byte, topMostSegment i return nil, nil } +func (sg *SegmentGroup) getErrNotFound(key []byte) ([]byte, error) { + sg.maintenanceLock.RLock() + defer sg.maintenanceLock.RUnlock() + + return sg.getWithUpperSegmentBoundaryErrNotFound(key, len(sg.segments)-1) +} + +func (sg *SegmentGroup) getWithUpperSegmentBoundaryErrNotFound(key []byte, topMostSegment int) ([]byte, error) { + // assumes "replace" strategy + + // start with latest and exit as soon as something is found, thus making sure + // the latest takes presence + for i := topMostSegment; i >= 0; i-- { + v, err := sg.segments[i].get(key) + if err != nil { + if errors.Is(err, lsmkv.NotFound) { + continue + } + + if errors.Is(err, lsmkv.Deleted) { + return nil, err + } + + panic(fmt.Sprintf("unsupported error in segmentGroup.get(): %v", err)) + } + + return v, nil + } + + return nil, nil +} + func (sg *SegmentGroup) getBySecondaryIntoMemory(pos int, key []byte, buffer []byte) ([]byte, []byte, error) { sg.maintenanceLock.RLock() defer sg.maintenanceLock.RUnlock() diff --git a/adapters/repos/db/replication.go b/adapters/repos/db/replication.go index 343b27bbd2..d5dbdc5645 100644 --- a/adapters/repos/db/replication.go +++ b/adapters/repos/db/replication.go @@ -20,7 +20,9 @@ import ( "path/filepath" "github.com/go-openapi/strfmt" + "github.com/pkg/errors" "github.com/weaviate/weaviate/entities/additional" + "github.com/weaviate/weaviate/entities/lsmkv" "github.com/weaviate/weaviate/entities/multi" "github.com/weaviate/weaviate/entities/schema" "github.com/weaviate/weaviate/entities/storobj" @@ -326,11 +328,8 @@ func (i *Index) overwriteObjects(ctx context.Context, continue } // valid update - found, err := s.ObjectByID(ctx, data.ID, nil, additional.Properties{}) - if err == nil && found == nil { - // TODO: current case for locally deleted objects - // explicit Deleted error may be preferred but changing ObjectByID - // implies a global impact + found, err := s.ObjectByIDErrNotFound(ctx, data.ID, nil, additional.Properties{}) + if err != nil && errors.Is(err, lsmkv.Deleted) { continue } var curUpdateTime int64 // 0 means object doesn't exist on this node diff --git a/adapters/repos/db/shard.go b/adapters/repos/db/shard.go index be157d45cc..257f94f943 100644 --- a/adapters/repos/db/shard.go +++ b/adapters/repos/db/shard.go @@ -74,6 +74,7 @@ type ShardLike interface { PutObject(context.Context, *storobj.Object) error PutObjectBatch(context.Context, []*storobj.Object) []error ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) + ObjectByIDErrNotFound(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) Exists(ctx context.Context, id strfmt.UUID) (bool, error) ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties) ([]*storobj.Object, []float32, error) ObjectVectorSearch(ctx context.Context, searchVector []float32, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties) ([]*storobj.Object, []float32, error) diff --git a/adapters/repos/db/shard_lazyloader.go b/adapters/repos/db/shard_lazyloader.go index 9cff807055..75196be45b 100644 --- a/adapters/repos/db/shard_lazyloader.go +++ b/adapters/repos/db/shard_lazyloader.go @@ -190,6 +190,13 @@ func (l *LazyLoadShard) ObjectByID(ctx context.Context, id strfmt.UUID, props se return l.shard.ObjectByID(ctx, id, props, additional) } +func (l *LazyLoadShard) ObjectByIDErrNotFound(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) { + if err := l.Load(ctx); err != nil { + return nil, err + } + return l.shard.ObjectByIDErrNotFound(ctx, id, props, additional) +} + func (l *LazyLoadShard) Exists(ctx context.Context, id strfmt.UUID) (bool, error) { if err := l.Load(ctx); err != nil { return false, err diff --git a/adapters/repos/db/shard_read.go b/adapters/repos/db/shard_read.go index 2134833592..467d978170 100644 --- a/adapters/repos/db/shard_read.go +++ b/adapters/repos/db/shard_read.go @@ -34,6 +34,29 @@ import ( "github.com/weaviate/weaviate/entities/storobj" ) +func (s *Shard) ObjectByIDErrNotFound(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) { + idBytes, err := uuid.MustParse(id.String()).MarshalBinary() + if err != nil { + return nil, err + } + + bytes, err := s.store.Bucket(helpers.ObjectsBucketLSM).GetErrNotFound(idBytes) + if err != nil { + return nil, err + } + + if bytes == nil { + return nil, nil + } + + obj, err := storobj.FromBinary(bytes) + if err != nil { + return nil, errors.Wrap(err, "unmarshal object") + } + + return obj, nil +} + func (s *Shard) ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) { idBytes, err := uuid.MustParse(id.String()).MarshalBinary() if err != nil {