Skip to content

Commit

Permalink
Merge pull request weaviate#4952 from weaviate/stable/v1.23
Browse files Browse the repository at this point in the history
Merge stable/v1.23 into stable/v1.24
  • Loading branch information
antas-marcin authored May 16, 2024
2 parents 37017bd + 24f7d7a commit 96fbd7f
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 1 deletion.
41 changes: 41 additions & 0 deletions adapters/repos/db/lsmkv/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,47 @@ func (b *Bucket) Get(key []byte) ([]byte, error) {
return b.disk.get(key)
}

func (b *Bucket) GetErrDeleted(key []byte) ([]byte, error) {
b.flushLock.RLock()
defer b.flushLock.RUnlock()

v, err := b.active.get(key)
if err == nil {
// item found and no error, return and stop searching, since the strategy
// is replace
return v, nil
}
if errors.Is(err, lsmkv.Deleted) {
// deleted in the mem-table (which is always the latest) means we don't
// have to check the disk segments, return nil now
return nil, err
}

if err != lsmkv.NotFound {
panic(fmt.Sprintf("unsupported error in bucket.Get: %v\n", err))
}

if b.flushing != nil {
v, err := b.flushing.get(key)
if err == nil {
// item found and no error, return and stop searching, since the strategy
// is replace
return v, nil
}
if errors.Is(err, lsmkv.Deleted) {
// deleted in the now most recent memtable means we don't have to check
// the disk segments, return nil now
return nil, err
}

if err != lsmkv.NotFound {
panic("unsupported error in bucket.Get")
}
}

return b.disk.getErrDeleted(key)
}

// GetBySecondary retrieves an object using one of its secondary keys. A bucket
// can have an infinite number of secondary keys. Specify the secondary key
// position as the first argument.
Expand Down
32 changes: 32 additions & 0 deletions adapters/repos/db/lsmkv/segment_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,38 @@ func (sg *SegmentGroup) getWithUpperSegmentBoundary(key []byte, topMostSegment i
return nil, nil
}

func (sg *SegmentGroup) getErrDeleted(key []byte) ([]byte, error) {
sg.maintenanceLock.RLock()
defer sg.maintenanceLock.RUnlock()

return sg.getWithUpperSegmentBoundaryErrDeleted(key, len(sg.segments)-1)
}

func (sg *SegmentGroup) getWithUpperSegmentBoundaryErrDeleted(key []byte, topMostSegment int) ([]byte, error) {
// assumes "replace" strategy

// start with latest and exit as soon as something is found, thus making sure
// the latest takes presence
for i := topMostSegment; i >= 0; i-- {
v, err := sg.segments[i].get(key)
if err != nil {
if errors.Is(err, lsmkv.NotFound) {
continue
}

if errors.Is(err, lsmkv.Deleted) {
return nil, err
}

panic(fmt.Sprintf("unsupported error in segmentGroup.get(): %v", err))
}

return v, nil
}

return nil, nil
}

func (sg *SegmentGroup) getBySecondaryIntoMemory(pos int, key []byte, buffer []byte) ([]byte, []byte, error) {
sg.maintenanceLock.RLock()
defer sg.maintenanceLock.RUnlock()
Expand Down
7 changes: 6 additions & 1 deletion adapters/repos/db/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"path/filepath"

"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/lsmkv"
"github.com/weaviate/weaviate/entities/multi"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/storobj"
Expand Down Expand Up @@ -333,7 +335,10 @@ func (i *Index) overwriteObjects(ctx context.Context,
continue
}
// valid update
found, err := s.ObjectByID(ctx, data.ID, nil, additional.Properties{})
found, err := s.ObjectByIDErrDeleted(ctx, data.ID, nil, additional.Properties{})
if err != nil && errors.Is(err, lsmkv.Deleted) {
continue
}
var curUpdateTime int64 // 0 means object doesn't exist on this node
if found != nil {
curUpdateTime = found.LastUpdateTimeUnix()
Expand Down
1 change: 1 addition & 0 deletions adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type ShardLike interface {
PutObject(context.Context, *storobj.Object) error
PutObjectBatch(context.Context, []*storobj.Object) []error
ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)
ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)
Exists(ctx context.Context, id strfmt.UUID) (bool, error)
ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties) ([]*storobj.Object, []float32, error)
ObjectVectorSearch(ctx context.Context, searchVector []float32, targetVector string, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties) ([]*storobj.Object, []float32, error)
Expand Down
7 changes: 7 additions & 0 deletions adapters/repos/db/shard_lazyloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ func (l *LazyLoadShard) ObjectByID(ctx context.Context, id strfmt.UUID, props se
return l.shard.ObjectByID(ctx, id, props, additional)
}

func (l *LazyLoadShard) ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) {
if err := l.Load(ctx); err != nil {
return nil, err
}
return l.shard.ObjectByIDErrDeleted(ctx, id, props, additional)
}

func (l *LazyLoadShard) Exists(ctx context.Context, id strfmt.UUID) (bool, error) {
if err := l.Load(ctx); err != nil {
return false, err
Expand Down
23 changes: 23 additions & 0 deletions adapters/repos/db/shard_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,29 @@ import (
"github.com/weaviate/weaviate/entities/storobj"
)

func (s *Shard) ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) {
idBytes, err := uuid.MustParse(id.String()).MarshalBinary()
if err != nil {
return nil, err
}

bytes, err := s.store.Bucket(helpers.ObjectsBucketLSM).GetErrDeleted(idBytes)
if err != nil {
return nil, err
}

if bytes == nil {
return nil, nil
}

obj, err := storobj.FromBinary(bytes)
if err != nil {
return nil, errors.Wrap(err, "unmarshal object")
}

return obj, nil
}

func (s *Shard) ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) {
idBytes, err := uuid.MustParse(id.String()).MarshalBinary()
if err != nil {
Expand Down

0 comments on commit 96fbd7f

Please sign in to comment.