Skip to content

Commit

Permalink
fix: distinguish not found vs deleted object
Browse files Browse the repository at this point in the history
Signed-off-by: Jeronimo <jeronimo.irazabal@gmail.com>
  • Loading branch information
jeroiraz committed May 16, 2024
1 parent 5ce7986 commit 4b0d5c3
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 5 deletions.
41 changes: 41 additions & 0 deletions adapters/repos/db/lsmkv/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,47 @@ func (b *Bucket) Get(key []byte) ([]byte, error) {
return b.disk.get(key)
}

func (b *Bucket) GetErrNotFound(key []byte) ([]byte, error) {
b.flushLock.RLock()
defer b.flushLock.RUnlock()

v, err := b.active.get(key)
if err == nil {
// item found and no error, return and stop searching, since the strategy
// is replace
return v, nil
}
if errors.Is(err, lsmkv.Deleted) {
// deleted in the mem-table (which is always the latest) means we don't
// have to check the disk segments, return nil now
return nil, err
}

if err != lsmkv.NotFound {
panic(fmt.Sprintf("unsupported error in bucket.Get: %v\n", err))
}

if b.flushing != nil {
v, err := b.flushing.get(key)
if err == nil {
// item found and no error, return and stop searching, since the strategy
// is replace
return v, nil
}
if errors.Is(err, lsmkv.Deleted) {
// deleted in the now most recent memtable means we don't have to check
// the disk segments, return nil now
return nil, err
}

if err != lsmkv.NotFound {
panic("unsupported error in bucket.Get")
}
}

return b.disk.getErrNotFound(key)
}

// GetBySecondary retrieves an object using one of its secondary keys. A bucket
// can have an infinite number of secondary keys. Specify the secondary key
// position as the first argument.
Expand Down
32 changes: 32 additions & 0 deletions adapters/repos/db/lsmkv/segment_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,38 @@ func (sg *SegmentGroup) getWithUpperSegmentBoundary(key []byte, topMostSegment i
return nil, nil
}

func (sg *SegmentGroup) getErrNotFound(key []byte) ([]byte, error) {
sg.maintenanceLock.RLock()
defer sg.maintenanceLock.RUnlock()

return sg.getWithUpperSegmentBoundaryErrNotFound(key, len(sg.segments)-1)
}

func (sg *SegmentGroup) getWithUpperSegmentBoundaryErrNotFound(key []byte, topMostSegment int) ([]byte, error) {
// assumes "replace" strategy

// start with latest and exit as soon as something is found, thus making sure
// the latest takes presence
for i := topMostSegment; i >= 0; i-- {
v, err := sg.segments[i].get(key)
if err != nil {
if errors.Is(err, lsmkv.NotFound) {
continue
}

if errors.Is(err, lsmkv.Deleted) {
return nil, err
}

panic(fmt.Sprintf("unsupported error in segmentGroup.get(): %v", err))
}

return v, nil
}

return nil, nil
}

func (sg *SegmentGroup) getBySecondaryIntoMemory(pos int, key []byte, buffer []byte) ([]byte, []byte, error) {
sg.maintenanceLock.RLock()
defer sg.maintenanceLock.RUnlock()
Expand Down
9 changes: 4 additions & 5 deletions adapters/repos/db/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"path/filepath"

"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/lsmkv"
"github.com/weaviate/weaviate/entities/multi"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/storobj"
Expand Down Expand Up @@ -326,11 +328,8 @@ func (i *Index) overwriteObjects(ctx context.Context,
continue
}
// valid update
found, err := s.ObjectByID(ctx, data.ID, nil, additional.Properties{})
if err == nil && found == nil {
// TODO: current case for locally deleted objects
// explicit Deleted error may be preferred but changing ObjectByID
// implies a global impact
found, err := s.ObjectByIDErrNotFound(ctx, data.ID, nil, additional.Properties{})
if err != nil && errors.Is(err, lsmkv.Deleted) {
continue
}
var curUpdateTime int64 // 0 means object doesn't exist on this node
Expand Down
1 change: 1 addition & 0 deletions adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type ShardLike interface {
PutObject(context.Context, *storobj.Object) error
PutObjectBatch(context.Context, []*storobj.Object) []error
ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)
ObjectByIDErrNotFound(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)
Exists(ctx context.Context, id strfmt.UUID) (bool, error)
ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties) ([]*storobj.Object, []float32, error)
ObjectVectorSearch(ctx context.Context, searchVector []float32, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties) ([]*storobj.Object, []float32, error)
Expand Down
7 changes: 7 additions & 0 deletions adapters/repos/db/shard_lazyloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ func (l *LazyLoadShard) ObjectByID(ctx context.Context, id strfmt.UUID, props se
return l.shard.ObjectByID(ctx, id, props, additional)
}

func (l *LazyLoadShard) ObjectByIDErrNotFound(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) {
if err := l.Load(ctx); err != nil {
return nil, err
}
return l.shard.ObjectByIDErrNotFound(ctx, id, props, additional)
}

func (l *LazyLoadShard) Exists(ctx context.Context, id strfmt.UUID) (bool, error) {
if err := l.Load(ctx); err != nil {
return false, err
Expand Down
23 changes: 23 additions & 0 deletions adapters/repos/db/shard_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,29 @@ import (
"github.com/weaviate/weaviate/entities/storobj"
)

func (s *Shard) ObjectByIDErrNotFound(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) {
idBytes, err := uuid.MustParse(id.String()).MarshalBinary()
if err != nil {
return nil, err
}

bytes, err := s.store.Bucket(helpers.ObjectsBucketLSM).GetErrNotFound(idBytes)
if err != nil {
return nil, err
}

if bytes == nil {
return nil, nil
}

obj, err := storobj.FromBinary(bytes)
if err != nil {
return nil, errors.Wrap(err, "unmarshal object")
}

return obj, nil
}

func (s *Shard) ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) {
idBytes, err := uuid.MustParse(id.String()).MarshalBinary()
if err != nil {
Expand Down

0 comments on commit 4b0d5c3

Please sign in to comment.