Skip to content

Commit

Permalink
Merge pull request weaviate#4913 from weaviate/support-raft-rf-scale_…
Browse files Browse the repository at this point in the history
…with_refactor

Support raft rf scale (part 2)
  • Loading branch information
aliszka authored May 14, 2024
2 parents c014f64 + d625d85 commit d9f1fd6
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 157 deletions.
1 change: 1 addition & 0 deletions adapters/repos/db/helper_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func testShardWithSettings(t *testing.T, ctx context.Context, class *models.Clas
RootPath: tmpDir,
ClassName: schema.ClassName(class.Class),
QueryMaximumResults: maxResults,
ReplicationFactor: NewAtomicInt64(1),
},
invertedIndexConfig: iic,
vectorIndexUserConfig: vic,
Expand Down
4 changes: 2 additions & 2 deletions adapters/repos/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ type IndexConfig struct {
MemtablesMaxActiveSeconds int
MaxSegmentSize int64
HNSWMaxLogSize int64
ReplicationFactor int64
ReplicationFactor *atomic.Int64
AvoidMMap bool
DisableLazyLoadShards bool

Expand Down Expand Up @@ -688,7 +688,7 @@ func (i *Index) IncomingPutObject(ctx context.Context, shardName string,
}

func (i *Index) replicationEnabled() bool {
return i.Config.ReplicationFactor > 1
return i.Config.ReplicationFactor.Load() > 1
}

// parseDateFieldsInProps checks the schema for the current class for which
Expand Down
16 changes: 10 additions & 6 deletions adapters/repos/db/index_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func TestIndex_DropWithDataAndRecreateWithDataIndex(t *testing.T) {
// create index with data
shardState := singleShardState()
index, err := NewIndex(testCtx(), IndexConfig{
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
ReplicationFactor: NewAtomicInt64(1),
}, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig),
hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{
schema: fakeSchema, shardState: shardState,
Expand Down Expand Up @@ -163,8 +164,9 @@ func TestIndex_DropWithDataAndRecreateWithDataIndex(t *testing.T) {

// recreate the index
index, err = NewIndex(testCtx(), IndexConfig{
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
ReplicationFactor: NewAtomicInt64(1),
}, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig),
hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{
schema: fakeSchema,
Expand Down Expand Up @@ -273,8 +275,9 @@ func TestIndex_DropReadOnlyIndexWithData(t *testing.T) {

shardState := singleShardState()
index, err := NewIndex(ctx, IndexConfig{
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
RootPath: dirName,
ClassName: schema.ClassName(class.Class),
ReplicationFactor: NewAtomicInt64(1),
}, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig),
hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{
schema: fakeSchema, shardState: shardState,
Expand Down Expand Up @@ -332,6 +335,7 @@ func emptyIdx(t *testing.T, rootDir string, class *models.Class) *Index {
RootPath: rootDir,
ClassName: schema.ClassName(class.Class),
DisableLazyLoadShards: true,
ReplicationFactor: NewAtomicInt64(1),
}, shardState, inverted.ConfigFromModel(invertedConfig()),
hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{
shardState: shardState,
Expand Down
9 changes: 8 additions & 1 deletion adapters/repos/db/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"os"
"path"
"sync/atomic"
"time"

enterrors "github.com/weaviate/weaviate/entities/errors"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (db *DB) init(ctx context.Context) error {
TrackVectorDimensions: db.config.TrackVectorDimensions,
AvoidMMap: db.config.AvoidMMap,
DisableLazyLoadShards: db.config.DisableLazyLoadShards,
ReplicationFactor: class.ReplicationConfig.Factor,
ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor),
}, db.schemaGetter.CopyShardingState(class.Class),
inverted.ConfigFromModel(invertedConfig),
convertToVectorIndexConfig(class.VectorIndexConfig),
Expand Down Expand Up @@ -168,3 +169,9 @@ func fileExists(file string) (bool, error) {
}
return true, nil
}

func NewAtomicInt64(val int64) *atomic.Int64 {
aval := &atomic.Int64{}
aval.Store(val)
return aval
}
12 changes: 11 additions & 1 deletion adapters/repos/db/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (m *Migrator) AddClass(ctx context.Context, class *models.Class,
TrackVectorDimensions: m.db.config.TrackVectorDimensions,
AvoidMMap: m.db.config.AvoidMMap,
DisableLazyLoadShards: m.db.config.DisableLazyLoadShards,
ReplicationFactor: class.ReplicationConfig.Factor,
ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor),
},
shardState,
// no backward-compatibility check required, since newly added classes will
Expand Down Expand Up @@ -456,6 +456,16 @@ func (m *Migrator) UpdateInvertedIndexConfig(ctx context.Context, className stri
return idx.updateInvertedIndexConfig(ctx, conf)
}

func (m *Migrator) UpdateReplicationFactor(ctx context.Context, className string, factor int64) error {
idx := m.db.GetIndex(schema.ClassName(className))
if idx == nil {
return errors.Errorf("cannot update replication factor of non-existing index for %s", className)
}

idx.Config.ReplicationFactor.Store(factor)
return nil
}

func (m *Migrator) RecalculateVectorDimensions(ctx context.Context) error {
count := 0
m.logger.
Expand Down
6 changes: 4 additions & 2 deletions adapters/repos/db/node_wide_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ func TestShardActivity(t *testing.T) {
indices: map[string]*Index{
"Col1": {
Config: IndexConfig{
ClassName: "Col1",
ClassName: "Col1",
ReplicationFactor: NewAtomicInt64(1),
},
partitioningEnabled: true,
shards: shardMap{},
},
"NonMT": {
Config: IndexConfig{
ClassName: "NonMT",
ClassName: "NonMT",
ReplicationFactor: NewAtomicInt64(1),
},
partitioningEnabled: false,
shards: shardMap{},
Expand Down
56 changes: 27 additions & 29 deletions adapters/repos/db/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"path/filepath"

"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/multi"
"github.com/weaviate/weaviate/entities/schema"
Expand Down Expand Up @@ -253,12 +254,34 @@ func (i *Index) IncomingCreateShard(ctx context.Context, className string, shard
func (i *Index) IncomingReinitShard(ctx context.Context,
shardName string,
) error {
shard, err := i.getOrInitLocalShard(ctx, shardName)
if err != nil {
return fmt.Errorf("shard %q does not exist locally", shardName)
shard := func() ShardLike {
i.shardInUseLocks.Lock(shardName)
defer i.shardInUseLocks.Unlock(shardName)

return i.shards.Load(shardName)
}()

if shard != nil {
err := func() error {
i.shardCreateLocks.Lock(shardName)
defer i.shardCreateLocks.Unlock(shardName)

i.shards.LoadAndDelete(shardName)

if err := shard.Shutdown(ctx); err != nil {
if !errors.Is(err, errAlreadyShutdown) {
return err
}
}
return nil
}()
if err != nil {
return err
}
}

return shard.reinit(ctx)
_, err := i.getOrInitLocalShard(ctx, shardName)
return err
}

func (s *Shard) filePutter(ctx context.Context,
Expand All @@ -280,31 +303,6 @@ func (s *Shard) filePutter(ctx context.Context,
return f, nil
}

func (s *Shard) reinit(ctx context.Context) error {
if err := s.Shutdown(ctx); err != nil {
return fmt.Errorf("shutdown shard: %w", err)
}

if err := s.initNonVector(ctx, nil); err != nil {
return fmt.Errorf("reinit non-vector: %w", err)
}

if s.hasTargetVectors() {
if err := s.initTargetVectors(ctx); err != nil {
return fmt.Errorf("reinit vector: %w", err)
}
} else {
if err := s.initLegacyVector(ctx); err != nil {
return fmt.Errorf("reinit vector: %w", err)
}
}
s.activate()
s.initCycleCallbacks()
s.initDimensionTracking()

return nil
}

// OverwriteObjects if their state didn't change in the meantime
// It returns nil if all object have been successfully overwritten
// and otherwise a list of failed operations.
Expand Down
10 changes: 0 additions & 10 deletions adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type ShardLike interface {
Queues() map[string]*IndexQueue
Shutdown(context.Context) error // Shutdown the shard
preventShutdown() (release func(), err error)
activate() error

// TODO tests only
ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor,
Expand All @@ -133,7 +132,6 @@ type ShardLike interface {

commitReplication(context.Context, string, *backupMutex) interface{}
abortReplication(context.Context, string) replica.SimpleResponse
reinit(context.Context) error
filePutter(context.Context, string) (io.WriteCloser, error)

// TODO tests only
Expand Down Expand Up @@ -1055,14 +1053,6 @@ func (s *Shard) Shutdown(ctx context.Context) (err error) {
return nil
}

// activate makes sure the shut flag is false
func (s *Shard) activate() error {
s.shutdownLock.Lock()
defer s.shutdownLock.Unlock()
s.shut = false
return nil
}

func (s *Shard) preventShutdown() (release func(), err error) {
s.shutdownLock.RLock()
defer s.shutdownLock.RUnlock()
Expand Down
15 changes: 0 additions & 15 deletions adapters/repos/db/shard_lazyloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,6 @@ func (l *LazyLoadShard) Shutdown(ctx context.Context) error {
return l.shard.Shutdown(ctx)
}

func (l *LazyLoadShard) activate() error {
if err := l.Load(context.Background()); err != nil {
return fmt.Errorf("LazyLoadShard::activate: %w", err)
}

return l.shard.activate()
}

func (l *LazyLoadShard) preventShutdown() (release func(), err error) {
if err := l.Load(context.Background()); err != nil {
return nil, fmt.Errorf("LazyLoadShard::preventShutdown: %w", err)
Expand Down Expand Up @@ -512,13 +504,6 @@ func (l *LazyLoadShard) abortReplication(ctx context.Context, shardID string) re
return l.shard.abortReplication(ctx, shardID)
}

func (l *LazyLoadShard) reinit(ctx context.Context) error {
if err := l.Load(ctx); err != nil {
return err
}
return l.shard.reinit(ctx)
}

func (l *LazyLoadShard) filePutter(ctx context.Context, shardID string) (io.WriteCloser, error) {
if err := l.Load(ctx); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit d9f1fd6

Please sign in to comment.