Skip to content

Commit

Permalink
reimplements Shard::reinit, updates replication factor
Browse files Browse the repository at this point in the history
  • Loading branch information
aliszka committed May 14, 2024
1 parent c014f64 commit f1cc22b
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 58 deletions.
4 changes: 2 additions & 2 deletions adapters/repos/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ type IndexConfig struct {
MemtablesMaxActiveSeconds int
MaxSegmentSize int64
HNSWMaxLogSize int64
ReplicationFactor int64
ReplicationFactor atomic.Int64
AvoidMMap bool
DisableLazyLoadShards bool

Expand Down Expand Up @@ -688,7 +688,7 @@ func (i *Index) IncomingPutObject(ctx context.Context, shardName string,
}

func (i *Index) replicationEnabled() bool {
return i.Config.ReplicationFactor > 1
return i.Config.ReplicationFactor.Load() > 1
}

// parseDateFieldsInProps checks the schema for the current class for which
Expand Down
8 changes: 7 additions & 1 deletion adapters/repos/db/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"os"
"path"
"sync/atomic"
"time"

enterrors "github.com/weaviate/weaviate/entities/errors"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (db *DB) init(ctx context.Context) error {
TrackVectorDimensions: db.config.TrackVectorDimensions,
AvoidMMap: db.config.AvoidMMap,
DisableLazyLoadShards: db.config.DisableLazyLoadShards,
ReplicationFactor: class.ReplicationConfig.Factor,
ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor),
}, db.schemaGetter.CopyShardingState(class.Class),
inverted.ConfigFromModel(invertedConfig),
convertToVectorIndexConfig(class.VectorIndexConfig),
Expand Down Expand Up @@ -168,3 +169,8 @@ func fileExists(file string) (bool, error) {
}
return true, nil
}

func NewAtomicInt64(val int64) (aval atomic.Int64) {
aval.Store(val)
return
}
12 changes: 11 additions & 1 deletion adapters/repos/db/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (m *Migrator) AddClass(ctx context.Context, class *models.Class,
TrackVectorDimensions: m.db.config.TrackVectorDimensions,
AvoidMMap: m.db.config.AvoidMMap,
DisableLazyLoadShards: m.db.config.DisableLazyLoadShards,
ReplicationFactor: class.ReplicationConfig.Factor,
ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor),
},
shardState,
// no backward-compatibility check required, since newly added classes will
Expand Down Expand Up @@ -456,6 +456,16 @@ func (m *Migrator) UpdateInvertedIndexConfig(ctx context.Context, className stri
return idx.updateInvertedIndexConfig(ctx, conf)
}

func (m *Migrator) UpdateReplicationFactor(ctx context.Context, className string, factor int64) error {
idx := m.db.GetIndex(schema.ClassName(className))
if idx == nil {
return errors.Errorf("cannot update replication factor of non-existing index for %s", className)
}

idx.Config.ReplicationFactor.Store(factor)
return nil
}

func (m *Migrator) RecalculateVectorDimensions(ctx context.Context) error {
count := 0
m.logger.
Expand Down
56 changes: 27 additions & 29 deletions adapters/repos/db/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"path/filepath"

"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/multi"
"github.com/weaviate/weaviate/entities/schema"
Expand Down Expand Up @@ -253,12 +254,34 @@ func (i *Index) IncomingCreateShard(ctx context.Context, className string, shard
func (i *Index) IncomingReinitShard(ctx context.Context,
shardName string,
) error {
shard, err := i.getOrInitLocalShard(ctx, shardName)
if err != nil {
return fmt.Errorf("shard %q does not exist locally", shardName)
shard := func() ShardLike {
i.shardInUseLocks.Lock(shardName)
defer i.shardInUseLocks.Unlock(shardName)

return i.shards.Load(shardName)
}()

if shard != nil {
err := func() error {
i.shardCreateLocks.Lock(shardName)
defer i.shardCreateLocks.Unlock(shardName)

i.shards.LoadAndDelete(shardName)

if err := shard.Shutdown(ctx); err != nil {
if !errors.Is(err, errAlreadyShutdown) {
return err
}
}
return nil
}()
if err != nil {
return err
}
}

return shard.reinit(ctx)
_, err := i.getOrInitLocalShard(ctx, shardName)
return err
}

func (s *Shard) filePutter(ctx context.Context,
Expand All @@ -280,31 +303,6 @@ func (s *Shard) filePutter(ctx context.Context,
return f, nil
}

func (s *Shard) reinit(ctx context.Context) error {
if err := s.Shutdown(ctx); err != nil {
return fmt.Errorf("shutdown shard: %w", err)
}

if err := s.initNonVector(ctx, nil); err != nil {
return fmt.Errorf("reinit non-vector: %w", err)
}

if s.hasTargetVectors() {
if err := s.initTargetVectors(ctx); err != nil {
return fmt.Errorf("reinit vector: %w", err)
}
} else {
if err := s.initLegacyVector(ctx); err != nil {
return fmt.Errorf("reinit vector: %w", err)
}
}
s.activate()
s.initCycleCallbacks()
s.initDimensionTracking()

return nil
}

// OverwriteObjects if their state didn't change in the meantime
// It returns nil if all object have been successfully overwritten
// and otherwise a list of failed operations.
Expand Down
10 changes: 0 additions & 10 deletions adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type ShardLike interface {
Queues() map[string]*IndexQueue
Shutdown(context.Context) error // Shutdown the shard
preventShutdown() (release func(), err error)
activate() error

// TODO tests only
ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor,
Expand All @@ -133,7 +132,6 @@ type ShardLike interface {

commitReplication(context.Context, string, *backupMutex) interface{}
abortReplication(context.Context, string) replica.SimpleResponse
reinit(context.Context) error
filePutter(context.Context, string) (io.WriteCloser, error)

// TODO tests only
Expand Down Expand Up @@ -1055,14 +1053,6 @@ func (s *Shard) Shutdown(ctx context.Context) (err error) {
return nil
}

// activate makes sure the shut flag is false
func (s *Shard) activate() error {
s.shutdownLock.Lock()
defer s.shutdownLock.Unlock()
s.shut = false
return nil
}

func (s *Shard) preventShutdown() (release func(), err error) {
s.shutdownLock.RLock()
defer s.shutdownLock.RUnlock()
Expand Down
15 changes: 0 additions & 15 deletions adapters/repos/db/shard_lazyloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,6 @@ func (l *LazyLoadShard) Shutdown(ctx context.Context) error {
return l.shard.Shutdown(ctx)
}

func (l *LazyLoadShard) activate() error {
if err := l.Load(context.Background()); err != nil {
return fmt.Errorf("LazyLoadShard::activate: %w", err)
}

return l.shard.activate()
}

func (l *LazyLoadShard) preventShutdown() (release func(), err error) {
if err := l.Load(context.Background()); err != nil {
return nil, fmt.Errorf("LazyLoadShard::preventShutdown: %w", err)
Expand Down Expand Up @@ -512,13 +504,6 @@ func (l *LazyLoadShard) abortReplication(ctx context.Context, shardID string) re
return l.shard.abortReplication(ctx, shardID)
}

func (l *LazyLoadShard) reinit(ctx context.Context) error {
if err := l.Load(ctx); err != nil {
return err
}
return l.shard.reinit(ctx)
}

func (l *LazyLoadShard) filePutter(ctx context.Context, shardID string) (io.WriteCloser, error) {
if err := l.Load(ctx); err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions usecases/schema/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (e *executor) UpdateClass(req api.UpdateClassRequest) error {
req.Class.InvertedIndexConfig); err != nil {
return errors.Wrap(err, "inverted index config")
}

if err := e.migrator.UpdateReplicationFactor(ctx, className, req.Class.ReplicationConfig.Factor); err != nil {
return fmt.Errorf("replication index update: %w", err)
}

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions usecases/schema/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func TestExecutor(t *testing.T) {
cls := &models.Class{
Class: "A",
VectorIndexConfig: flat.NewDefaultUserConfig(),
ReplicationConfig: &models.ReplicationConfig{
Factor: 1,
},
}
store.On("ReadOnlySchema").Return(models.Schema{})
store.On("ReadOnlyClass", "A", mock.Anything).Return(cls)
Expand Down
4 changes: 4 additions & 0 deletions usecases/schema/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ func (f *fakeMigrator) UpdateInvertedIndexConfig(ctx context.Context, className
return args.Error(0)
}

func (f *fakeMigrator) UpdateReplicationFactor(ctx context.Context, className string, factor int64) error {
return nil
}

func (f *fakeMigrator) WaitForStartup(ctx context.Context) error {
args := f.Called(ctx)
return args.Error(0)
Expand Down
1 change: 1 addition & 0 deletions usecases/schema/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Migrator interface {
ValidateInvertedIndexConfigUpdate(old, updated *models.InvertedIndexConfig) error
UpdateInvertedIndexConfig(ctx context.Context, className string,
updated *models.InvertedIndexConfig) error
UpdateReplicationFactor(ctx context.Context, className string, factor int64) error
WaitForStartup(context.Context) error
Shutdown(context.Context) error
}

0 comments on commit f1cc22b

Please sign in to comment.