From f1cc22bc4014c9587bbe1befbf043d910012ec13 Mon Sep 17 00:00:00 2001 From: Andrzej Liszka Date: Tue, 14 May 2024 11:39:22 +0200 Subject: [PATCH] reimplements Shard::reinit, updates replication factor --- adapters/repos/db/index.go | 4 +- adapters/repos/db/init.go | 8 +++- adapters/repos/db/migrator.go | 12 +++++- adapters/repos/db/replication.go | 56 +++++++++++++-------------- adapters/repos/db/shard.go | 10 ----- adapters/repos/db/shard_lazyloader.go | 15 ------- usecases/schema/executor.go | 5 +++ usecases/schema/executor_test.go | 3 ++ usecases/schema/helpers_test.go | 4 ++ usecases/schema/migrator.go | 1 + 10 files changed, 60 insertions(+), 58 deletions(-) diff --git a/adapters/repos/db/index.go b/adapters/repos/db/index.go index aeefb1f14d..b905c38e34 100644 --- a/adapters/repos/db/index.go +++ b/adapters/repos/db/index.go @@ -557,7 +557,7 @@ type IndexConfig struct { MemtablesMaxActiveSeconds int MaxSegmentSize int64 HNSWMaxLogSize int64 - ReplicationFactor int64 + ReplicationFactor atomic.Int64 AvoidMMap bool DisableLazyLoadShards bool @@ -688,7 +688,7 @@ func (i *Index) IncomingPutObject(ctx context.Context, shardName string, } func (i *Index) replicationEnabled() bool { - return i.Config.ReplicationFactor > 1 + return i.Config.ReplicationFactor.Load() > 1 } // parseDateFieldsInProps checks the schema for the current class for which diff --git a/adapters/repos/db/init.go b/adapters/repos/db/init.go index 1412e77352..3b91262d90 100644 --- a/adapters/repos/db/init.go +++ b/adapters/repos/db/init.go @@ -16,6 +16,7 @@ import ( "fmt" "os" "path" + "sync/atomic" "time" enterrors "github.com/weaviate/weaviate/entities/errors" @@ -94,7 +95,7 @@ func (db *DB) init(ctx context.Context) error { TrackVectorDimensions: db.config.TrackVectorDimensions, AvoidMMap: db.config.AvoidMMap, DisableLazyLoadShards: db.config.DisableLazyLoadShards, - ReplicationFactor: class.ReplicationConfig.Factor, + ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor), }, db.schemaGetter.CopyShardingState(class.Class), inverted.ConfigFromModel(invertedConfig), convertToVectorIndexConfig(class.VectorIndexConfig), @@ -168,3 +169,8 @@ func fileExists(file string) (bool, error) { } return true, nil } + +func NewAtomicInt64(val int64) (aval atomic.Int64) { + aval.Store(val) + return +} diff --git a/adapters/repos/db/migrator.go b/adapters/repos/db/migrator.go index a156ee545f..b4d3a5a1bd 100644 --- a/adapters/repos/db/migrator.go +++ b/adapters/repos/db/migrator.go @@ -68,7 +68,7 @@ func (m *Migrator) AddClass(ctx context.Context, class *models.Class, TrackVectorDimensions: m.db.config.TrackVectorDimensions, AvoidMMap: m.db.config.AvoidMMap, DisableLazyLoadShards: m.db.config.DisableLazyLoadShards, - ReplicationFactor: class.ReplicationConfig.Factor, + ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor), }, shardState, // no backward-compatibility check required, since newly added classes will @@ -456,6 +456,16 @@ func (m *Migrator) UpdateInvertedIndexConfig(ctx context.Context, className stri return idx.updateInvertedIndexConfig(ctx, conf) } +func (m *Migrator) UpdateReplicationFactor(ctx context.Context, className string, factor int64) error { + idx := m.db.GetIndex(schema.ClassName(className)) + if idx == nil { + return errors.Errorf("cannot update replication factor of non-existing index for %s", className) + } + + idx.Config.ReplicationFactor.Store(factor) + return nil +} + func (m *Migrator) RecalculateVectorDimensions(ctx context.Context) error { count := 0 m.logger. diff --git a/adapters/repos/db/replication.go b/adapters/repos/db/replication.go index b287e2368c..17a170a614 100644 --- a/adapters/repos/db/replication.go +++ b/adapters/repos/db/replication.go @@ -20,6 +20,7 @@ import ( "path/filepath" "github.com/go-openapi/strfmt" + "github.com/pkg/errors" "github.com/weaviate/weaviate/entities/additional" "github.com/weaviate/weaviate/entities/multi" "github.com/weaviate/weaviate/entities/schema" @@ -253,12 +254,34 @@ func (i *Index) IncomingCreateShard(ctx context.Context, className string, shard func (i *Index) IncomingReinitShard(ctx context.Context, shardName string, ) error { - shard, err := i.getOrInitLocalShard(ctx, shardName) - if err != nil { - return fmt.Errorf("shard %q does not exist locally", shardName) + shard := func() ShardLike { + i.shardInUseLocks.Lock(shardName) + defer i.shardInUseLocks.Unlock(shardName) + + return i.shards.Load(shardName) + }() + + if shard != nil { + err := func() error { + i.shardCreateLocks.Lock(shardName) + defer i.shardCreateLocks.Unlock(shardName) + + i.shards.LoadAndDelete(shardName) + + if err := shard.Shutdown(ctx); err != nil { + if !errors.Is(err, errAlreadyShutdown) { + return err + } + } + return nil + }() + if err != nil { + return err + } } - return shard.reinit(ctx) + _, err := i.getOrInitLocalShard(ctx, shardName) + return err } func (s *Shard) filePutter(ctx context.Context, @@ -280,31 +303,6 @@ func (s *Shard) filePutter(ctx context.Context, return f, nil } -func (s *Shard) reinit(ctx context.Context) error { - if err := s.Shutdown(ctx); err != nil { - return fmt.Errorf("shutdown shard: %w", err) - } - - if err := s.initNonVector(ctx, nil); err != nil { - return fmt.Errorf("reinit non-vector: %w", err) - } - - if s.hasTargetVectors() { - if err := s.initTargetVectors(ctx); err != nil { - return fmt.Errorf("reinit vector: %w", err) - } - } else { - if err := s.initLegacyVector(ctx); err != nil { - return fmt.Errorf("reinit vector: %w", err) - } - } - s.activate() - s.initCycleCallbacks() - s.initDimensionTracking() - - return nil -} - // OverwriteObjects if their state didn't change in the meantime // It returns nil if all object have been successfully overwritten // and otherwise a list of failed operations. diff --git a/adapters/repos/db/shard.go b/adapters/repos/db/shard.go index 4dd2298cb0..5e89ea7557 100644 --- a/adapters/repos/db/shard.go +++ b/adapters/repos/db/shard.go @@ -110,7 +110,6 @@ type ShardLike interface { Queues() map[string]*IndexQueue Shutdown(context.Context) error // Shutdown the shard preventShutdown() (release func(), err error) - activate() error // TODO tests only ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor, @@ -133,7 +132,6 @@ type ShardLike interface { commitReplication(context.Context, string, *backupMutex) interface{} abortReplication(context.Context, string) replica.SimpleResponse - reinit(context.Context) error filePutter(context.Context, string) (io.WriteCloser, error) // TODO tests only @@ -1055,14 +1053,6 @@ func (s *Shard) Shutdown(ctx context.Context) (err error) { return nil } -// activate makes sure the shut flag is false -func (s *Shard) activate() error { - s.shutdownLock.Lock() - defer s.shutdownLock.Unlock() - s.shut = false - return nil -} - func (s *Shard) preventShutdown() (release func(), err error) { s.shutdownLock.RLock() defer s.shutdownLock.RUnlock() diff --git a/adapters/repos/db/shard_lazyloader.go b/adapters/repos/db/shard_lazyloader.go index 478c9ae47f..62128e7a1a 100644 --- a/adapters/repos/db/shard_lazyloader.go +++ b/adapters/repos/db/shard_lazyloader.go @@ -418,14 +418,6 @@ func (l *LazyLoadShard) Shutdown(ctx context.Context) error { return l.shard.Shutdown(ctx) } -func (l *LazyLoadShard) activate() error { - if err := l.Load(context.Background()); err != nil { - return fmt.Errorf("LazyLoadShard::activate: %w", err) - } - - return l.shard.activate() -} - func (l *LazyLoadShard) preventShutdown() (release func(), err error) { if err := l.Load(context.Background()); err != nil { return nil, fmt.Errorf("LazyLoadShard::preventShutdown: %w", err) @@ -512,13 +504,6 @@ func (l *LazyLoadShard) abortReplication(ctx context.Context, shardID string) re return l.shard.abortReplication(ctx, shardID) } -func (l *LazyLoadShard) reinit(ctx context.Context) error { - if err := l.Load(ctx); err != nil { - return err - } - return l.shard.reinit(ctx) -} - func (l *LazyLoadShard) filePutter(ctx context.Context, shardID string) (io.WriteCloser, error) { if err := l.Load(ctx); err != nil { return nil, err diff --git a/usecases/schema/executor.go b/usecases/schema/executor.go index eaeca60636..9c6a6b3924 100644 --- a/usecases/schema/executor.go +++ b/usecases/schema/executor.go @@ -98,6 +98,11 @@ func (e *executor) UpdateClass(req api.UpdateClassRequest) error { req.Class.InvertedIndexConfig); err != nil { return errors.Wrap(err, "inverted index config") } + + if err := e.migrator.UpdateReplicationFactor(ctx, className, req.Class.ReplicationConfig.Factor); err != nil { + return fmt.Errorf("replication index update: %w", err) + } + return nil } diff --git a/usecases/schema/executor_test.go b/usecases/schema/executor_test.go index 10e6ce3f8f..844dedef17 100644 --- a/usecases/schema/executor_test.go +++ b/usecases/schema/executor_test.go @@ -43,6 +43,9 @@ func TestExecutor(t *testing.T) { cls := &models.Class{ Class: "A", VectorIndexConfig: flat.NewDefaultUserConfig(), + ReplicationConfig: &models.ReplicationConfig{ + Factor: 1, + }, } store.On("ReadOnlySchema").Return(models.Schema{}) store.On("ReadOnlyClass", "A", mock.Anything).Return(cls) diff --git a/usecases/schema/helpers_test.go b/usecases/schema/helpers_test.go index 6dbdeac646..7c2c14a746 100644 --- a/usecases/schema/helpers_test.go +++ b/usecases/schema/helpers_test.go @@ -323,6 +323,10 @@ func (f *fakeMigrator) UpdateInvertedIndexConfig(ctx context.Context, className return args.Error(0) } +func (f *fakeMigrator) UpdateReplicationFactor(ctx context.Context, className string, factor int64) error { + return nil +} + func (f *fakeMigrator) WaitForStartup(ctx context.Context) error { args := f.Called(ctx) return args.Error(0) diff --git a/usecases/schema/migrator.go b/usecases/schema/migrator.go index 54cf3b655c..9f5cca740a 100644 --- a/usecases/schema/migrator.go +++ b/usecases/schema/migrator.go @@ -59,6 +59,7 @@ type Migrator interface { ValidateInvertedIndexConfigUpdate(old, updated *models.InvertedIndexConfig) error UpdateInvertedIndexConfig(ctx context.Context, className string, updated *models.InvertedIndexConfig) error + UpdateReplicationFactor(ctx context.Context, className string, factor int64) error WaitForStartup(context.Context) error Shutdown(context.Context) error }