diff --git a/adapters/repos/db/helper_for_test.go b/adapters/repos/db/helper_for_test.go index fa57e6aeb1..80ab0f1498 100644 --- a/adapters/repos/db/helper_for_test.go +++ b/adapters/repos/db/helper_for_test.go @@ -258,6 +258,7 @@ func testShardWithSettings(t *testing.T, ctx context.Context, class *models.Clas RootPath: tmpDir, ClassName: schema.ClassName(class.Class), QueryMaximumResults: maxResults, + ReplicationFactor: NewAtomicInt64(1), }, invertedIndexConfig: iic, vectorIndexUserConfig: vic, diff --git a/adapters/repos/db/index.go b/adapters/repos/db/index.go index aeefb1f14d..3ee7831d93 100644 --- a/adapters/repos/db/index.go +++ b/adapters/repos/db/index.go @@ -557,7 +557,7 @@ type IndexConfig struct { MemtablesMaxActiveSeconds int MaxSegmentSize int64 HNSWMaxLogSize int64 - ReplicationFactor int64 + ReplicationFactor *atomic.Int64 AvoidMMap bool DisableLazyLoadShards bool @@ -688,7 +688,7 @@ func (i *Index) IncomingPutObject(ctx context.Context, shardName string, } func (i *Index) replicationEnabled() bool { - return i.Config.ReplicationFactor > 1 + return i.Config.ReplicationFactor.Load() > 1 } // parseDateFieldsInProps checks the schema for the current class for which diff --git a/adapters/repos/db/index_integration_test.go b/adapters/repos/db/index_integration_test.go index 89a0dcbda1..1e07438ce6 100644 --- a/adapters/repos/db/index_integration_test.go +++ b/adapters/repos/db/index_integration_test.go @@ -103,8 +103,9 @@ func TestIndex_DropWithDataAndRecreateWithDataIndex(t *testing.T) { // create index with data shardState := singleShardState() index, err := NewIndex(testCtx(), IndexConfig{ - RootPath: dirName, - ClassName: schema.ClassName(class.Class), + RootPath: dirName, + ClassName: schema.ClassName(class.Class), + ReplicationFactor: NewAtomicInt64(1), }, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig), hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{ schema: fakeSchema, shardState: shardState, @@ -163,8 +164,9 @@ func TestIndex_DropWithDataAndRecreateWithDataIndex(t *testing.T) { // recreate the index index, err = NewIndex(testCtx(), IndexConfig{ - RootPath: dirName, - ClassName: schema.ClassName(class.Class), + RootPath: dirName, + ClassName: schema.ClassName(class.Class), + ReplicationFactor: NewAtomicInt64(1), }, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig), hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{ schema: fakeSchema, @@ -273,8 +275,9 @@ func TestIndex_DropReadOnlyIndexWithData(t *testing.T) { shardState := singleShardState() index, err := NewIndex(ctx, IndexConfig{ - RootPath: dirName, - ClassName: schema.ClassName(class.Class), + RootPath: dirName, + ClassName: schema.ClassName(class.Class), + ReplicationFactor: NewAtomicInt64(1), }, shardState, inverted.ConfigFromModel(class.InvertedIndexConfig), hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{ schema: fakeSchema, shardState: shardState, @@ -332,6 +335,7 @@ func emptyIdx(t *testing.T, rootDir string, class *models.Class) *Index { RootPath: rootDir, ClassName: schema.ClassName(class.Class), DisableLazyLoadShards: true, + ReplicationFactor: NewAtomicInt64(1), }, shardState, inverted.ConfigFromModel(invertedConfig()), hnsw.NewDefaultUserConfig(), nil, &fakeSchemaGetter{ shardState: shardState, diff --git a/adapters/repos/db/init.go b/adapters/repos/db/init.go index 1412e77352..7b6e314b8e 100644 --- a/adapters/repos/db/init.go +++ b/adapters/repos/db/init.go @@ -16,6 +16,7 @@ import ( "fmt" "os" "path" + "sync/atomic" "time" enterrors "github.com/weaviate/weaviate/entities/errors" @@ -94,7 +95,7 @@ func (db *DB) init(ctx context.Context) error { TrackVectorDimensions: db.config.TrackVectorDimensions, AvoidMMap: db.config.AvoidMMap, DisableLazyLoadShards: db.config.DisableLazyLoadShards, - ReplicationFactor: class.ReplicationConfig.Factor, + ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor), }, db.schemaGetter.CopyShardingState(class.Class), inverted.ConfigFromModel(invertedConfig), convertToVectorIndexConfig(class.VectorIndexConfig), @@ -168,3 +169,9 @@ func fileExists(file string) (bool, error) { } return true, nil } + +func NewAtomicInt64(val int64) *atomic.Int64 { + aval := &atomic.Int64{} + aval.Store(val) + return aval +} diff --git a/adapters/repos/db/migrator.go b/adapters/repos/db/migrator.go index a156ee545f..b4d3a5a1bd 100644 --- a/adapters/repos/db/migrator.go +++ b/adapters/repos/db/migrator.go @@ -68,7 +68,7 @@ func (m *Migrator) AddClass(ctx context.Context, class *models.Class, TrackVectorDimensions: m.db.config.TrackVectorDimensions, AvoidMMap: m.db.config.AvoidMMap, DisableLazyLoadShards: m.db.config.DisableLazyLoadShards, - ReplicationFactor: class.ReplicationConfig.Factor, + ReplicationFactor: NewAtomicInt64(class.ReplicationConfig.Factor), }, shardState, // no backward-compatibility check required, since newly added classes will @@ -456,6 +456,16 @@ func (m *Migrator) UpdateInvertedIndexConfig(ctx context.Context, className stri return idx.updateInvertedIndexConfig(ctx, conf) } +func (m *Migrator) UpdateReplicationFactor(ctx context.Context, className string, factor int64) error { + idx := m.db.GetIndex(schema.ClassName(className)) + if idx == nil { + return errors.Errorf("cannot update replication factor of non-existing index for %s", className) + } + + idx.Config.ReplicationFactor.Store(factor) + return nil +} + func (m *Migrator) RecalculateVectorDimensions(ctx context.Context) error { count := 0 m.logger. diff --git a/adapters/repos/db/node_wide_metrics_test.go b/adapters/repos/db/node_wide_metrics_test.go index 1ec6be79cc..0c88699e0b 100644 --- a/adapters/repos/db/node_wide_metrics_test.go +++ b/adapters/repos/db/node_wide_metrics_test.go @@ -28,14 +28,16 @@ func TestShardActivity(t *testing.T) { indices: map[string]*Index{ "Col1": { Config: IndexConfig{ - ClassName: "Col1", + ClassName: "Col1", + ReplicationFactor: NewAtomicInt64(1), }, partitioningEnabled: true, shards: shardMap{}, }, "NonMT": { Config: IndexConfig{ - ClassName: "NonMT", + ClassName: "NonMT", + ReplicationFactor: NewAtomicInt64(1), }, partitioningEnabled: false, shards: shardMap{}, diff --git a/adapters/repos/db/replication.go b/adapters/repos/db/replication.go index b287e2368c..17a170a614 100644 --- a/adapters/repos/db/replication.go +++ b/adapters/repos/db/replication.go @@ -20,6 +20,7 @@ import ( "path/filepath" "github.com/go-openapi/strfmt" + "github.com/pkg/errors" "github.com/weaviate/weaviate/entities/additional" "github.com/weaviate/weaviate/entities/multi" "github.com/weaviate/weaviate/entities/schema" @@ -253,12 +254,34 @@ func (i *Index) IncomingCreateShard(ctx context.Context, className string, shard func (i *Index) IncomingReinitShard(ctx context.Context, shardName string, ) error { - shard, err := i.getOrInitLocalShard(ctx, shardName) - if err != nil { - return fmt.Errorf("shard %q does not exist locally", shardName) + shard := func() ShardLike { + i.shardInUseLocks.Lock(shardName) + defer i.shardInUseLocks.Unlock(shardName) + + return i.shards.Load(shardName) + }() + + if shard != nil { + err := func() error { + i.shardCreateLocks.Lock(shardName) + defer i.shardCreateLocks.Unlock(shardName) + + i.shards.LoadAndDelete(shardName) + + if err := shard.Shutdown(ctx); err != nil { + if !errors.Is(err, errAlreadyShutdown) { + return err + } + } + return nil + }() + if err != nil { + return err + } } - return shard.reinit(ctx) + _, err := i.getOrInitLocalShard(ctx, shardName) + return err } func (s *Shard) filePutter(ctx context.Context, @@ -280,31 +303,6 @@ func (s *Shard) filePutter(ctx context.Context, return f, nil } -func (s *Shard) reinit(ctx context.Context) error { - if err := s.Shutdown(ctx); err != nil { - return fmt.Errorf("shutdown shard: %w", err) - } - - if err := s.initNonVector(ctx, nil); err != nil { - return fmt.Errorf("reinit non-vector: %w", err) - } - - if s.hasTargetVectors() { - if err := s.initTargetVectors(ctx); err != nil { - return fmt.Errorf("reinit vector: %w", err) - } - } else { - if err := s.initLegacyVector(ctx); err != nil { - return fmt.Errorf("reinit vector: %w", err) - } - } - s.activate() - s.initCycleCallbacks() - s.initDimensionTracking() - - return nil -} - // OverwriteObjects if their state didn't change in the meantime // It returns nil if all object have been successfully overwritten // and otherwise a list of failed operations. diff --git a/adapters/repos/db/shard.go b/adapters/repos/db/shard.go index 4dd2298cb0..5e89ea7557 100644 --- a/adapters/repos/db/shard.go +++ b/adapters/repos/db/shard.go @@ -110,7 +110,6 @@ type ShardLike interface { Queues() map[string]*IndexQueue Shutdown(context.Context) error // Shutdown the shard preventShutdown() (release func(), err error) - activate() error // TODO tests only ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor, @@ -133,7 +132,6 @@ type ShardLike interface { commitReplication(context.Context, string, *backupMutex) interface{} abortReplication(context.Context, string) replica.SimpleResponse - reinit(context.Context) error filePutter(context.Context, string) (io.WriteCloser, error) // TODO tests only @@ -1055,14 +1053,6 @@ func (s *Shard) Shutdown(ctx context.Context) (err error) { return nil } -// activate makes sure the shut flag is false -func (s *Shard) activate() error { - s.shutdownLock.Lock() - defer s.shutdownLock.Unlock() - s.shut = false - return nil -} - func (s *Shard) preventShutdown() (release func(), err error) { s.shutdownLock.RLock() defer s.shutdownLock.RUnlock() diff --git a/adapters/repos/db/shard_lazyloader.go b/adapters/repos/db/shard_lazyloader.go index 478c9ae47f..62128e7a1a 100644 --- a/adapters/repos/db/shard_lazyloader.go +++ b/adapters/repos/db/shard_lazyloader.go @@ -418,14 +418,6 @@ func (l *LazyLoadShard) Shutdown(ctx context.Context) error { return l.shard.Shutdown(ctx) } -func (l *LazyLoadShard) activate() error { - if err := l.Load(context.Background()); err != nil { - return fmt.Errorf("LazyLoadShard::activate: %w", err) - } - - return l.shard.activate() -} - func (l *LazyLoadShard) preventShutdown() (release func(), err error) { if err := l.Load(context.Background()); err != nil { return nil, fmt.Errorf("LazyLoadShard::preventShutdown: %w", err) @@ -512,13 +504,6 @@ func (l *LazyLoadShard) abortReplication(ctx context.Context, shardID string) re return l.shard.abortReplication(ctx, shardID) } -func (l *LazyLoadShard) reinit(ctx context.Context) error { - if err := l.Load(ctx); err != nil { - return err - } - return l.shard.reinit(ctx) -} - func (l *LazyLoadShard) filePutter(ctx context.Context, shardID string) (io.WriteCloser, error) { if err := l.Load(ctx); err != nil { return nil, err diff --git a/test/acceptance/replication/crud_test.go b/test/acceptance/replication/crud_test.go index dbad693e5b..64479bd5bd 100644 --- a/test/acceptance/replication/crud_test.go +++ b/test/acceptance/replication/crud_test.go @@ -29,7 +29,6 @@ import ( "github.com/weaviate/weaviate/test/helper" "github.com/weaviate/weaviate/test/helper/sample-schema/articles" "github.com/weaviate/weaviate/usecases/replica" - "golang.org/x/sync/errgroup" ) var ( @@ -359,10 +358,12 @@ func eventualReplicaCRUD(t *testing.T) { }) t.Run("assert all previous data replicated to node 2", func(t *testing.T) { - resp := gqlGet(t, compose.GetWeaviateNode2().URI(), "Article", replica.Quorum) - assert.Len(t, resp, len(articleIDs)) - resp = gqlGet(t, compose.GetWeaviateNode2().URI(), "Paragraph", replica.Quorum) - assert.Len(t, resp, len(paragraphIDs)) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + resp := gqlGet(t, compose.GetWeaviateNode2().URI(), "Article", replica.One) + assert.Len(collect, resp, len(articleIDs)) + resp = gqlGet(t, compose.GetWeaviateNode2().URI(), "Paragraph", replica.One) + assert.Len(collect, resp, len(paragraphIDs)) + }, 5*time.Second, 100*time.Millisecond) }) t.Run("RestartNode-3", func(t *testing.T) { @@ -370,14 +371,12 @@ func eventualReplicaCRUD(t *testing.T) { }) t.Run("assert all previous data replicated to node 3", func(t *testing.T) { - resp := gqlGet(t, compose.GetWeaviateNode3().URI(), "Article", replica.All) - assert.Len(t, resp, len(articleIDs)) - resp = gqlGet(t, compose.GetWeaviateNode3().URI(), "Paragraph", replica.All) - assert.Len(t, resp, len(paragraphIDs)) - }) - - t.Run("RestartCluster", func(t *testing.T) { - restartCluster(ctx, t, compose) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + resp := gqlGet(t, compose.GetWeaviateNode3().URI(), "Article", replica.All) + assert.Len(collect, resp, len(articleIDs)) + resp = gqlGet(t, compose.GetWeaviateNode3().URI(), "Paragraph", replica.All) + assert.Len(collect, resp, len(paragraphIDs)) + }, 5*time.Second, 100*time.Millisecond) }) t.Run("assert any future writes are replicated", func(t *testing.T) { @@ -395,30 +394,32 @@ func eventualReplicaCRUD(t *testing.T) { patchObject(t, compose.GetWeaviateNode2().URI(), patch) }) - t.Run("StopNode-2", func(t *testing.T) { - stopNodeAt(ctx, t, compose, 2) - }) - t.Run("PatchedOnNode-1", func(t *testing.T) { after, err := getObjectFromNode(t, compose.GetWeaviate().URI(), "Article", articleIDs[0], "node1") require.Nil(t, err) - newVal, ok := after.Properties.(map[string]interface{})["title"] - require.True(t, ok) - assert.Equal(t, newTitle, newVal) + require.Contains(t, after.Properties.(map[string]interface{}), "title") + assert.Equal(t, newTitle, after.Properties.(map[string]interface{})["title"]) }) - t.Run("RestartNode-2", func(t *testing.T) { - startNodeAt(ctx, t, compose, 2) + t.Run("PatchedOnNode-2", func(t *testing.T) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + after, err := getObjectFromNode(t, compose.GetWeaviateNode2().URI(), "Article", articleIDs[0], "node2") + require.Nil(collect, err) + + require.Contains(collect, after.Properties.(map[string]interface{}), "title") + assert.Equal(collect, newTitle, after.Properties.(map[string]interface{})["title"]) + }, 5*time.Second, 100*time.Millisecond) }) t.Run("PatchedOnNode-3", func(t *testing.T) { - after, err := getObjectFromNode(t, compose.GetWeaviateNode3().URI(), "Article", articleIDs[0], "node3") - require.Nil(t, err) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + after, err := getObjectFromNode(t, compose.GetWeaviate().URI(), "Article", articleIDs[0], "node3") + require.Nil(collect, err) - newVal, ok := after.Properties.(map[string]interface{})["title"] - require.True(t, ok) - assert.Equal(t, newTitle, newVal) + require.Contains(collect, after.Properties.(map[string]interface{}), "title") + assert.Equal(collect, newTitle, after.Properties.(map[string]interface{})["title"]) + }, 5*time.Second, 100*time.Millisecond) }) }) @@ -427,37 +428,25 @@ func eventualReplicaCRUD(t *testing.T) { deleteObject(t, compose.GetWeaviateNode2().URI(), "Article", articleIDs[0]) }) - t.Run("StopNode-2", func(t *testing.T) { - stopNodeAt(ctx, t, compose, 2) - }) - t.Run("OnNode-1", func(t *testing.T) { - _, err := getObjectFromNode(t, compose.GetWeaviate().URI(), "Article", articleIDs[0], "node1") - assert.Equal(t, &objects.ObjectsClassGetNotFound{}, err) - }) - - t.Run("RestartNode-2", func(t *testing.T) { - startNodeAt(ctx, t, compose, 2) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + _, err := getObjectFromNode(t, compose.GetWeaviate().URI(), "Article", articleIDs[0], "node1") + assert.Equal(collect, &objects.ObjectsClassGetNotFound{}, err) + }, 5*time.Second, 100*time.Millisecond) }) }) - t.Run("BatchAllObjects", func(t *testing.T) { + t.Run("BatchDeleteAllObjects", func(t *testing.T) { t.Run("OnNode-2", func(t *testing.T) { deleteObjects(t, compose.GetWeaviateNode2().URI(), "Article", []string{"title"}, "Article#*") }) - t.Run("StopNode-2", func(t *testing.T) { - stopNodeAt(ctx, t, compose, 2) - }) - t.Run("OnNode-1", func(t *testing.T) { - resp := gqlGet(t, compose.GetWeaviate().URI(), "Article", replica.One) - assert.Empty(t, resp) - }) - - t.Run("RestartNode-2", func(t *testing.T) { - startNodeAt(ctx, t, compose, 2) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + resp := gqlGet(t, compose.GetWeaviate().URI(), "Article", replica.One) + assert.Empty(collect, resp) + }, 5*time.Second, 100*time.Millisecond) }) }) @@ -480,31 +469,6 @@ func eventualReplicaCRUD(t *testing.T) { }) } -func restartCluster(ctx context.Context, t *testing.T, compose *docker.DockerCompose) { - // since node1 is the gossip "leader", node 2 and 3 must be stopped and restarted - // after node1 to re-facilitate internode communication - eg := errgroup.Group{} - eg.Go(func() error { - require.Nil(t, compose.StartAt(ctx, 1)) - return nil - }) - eg.Go(func() error { // restart node 2 - time.Sleep(3 * time.Second) // wait for member list initialization - stopNodeAt(ctx, t, compose, 2) - require.Nil(t, compose.StartAt(ctx, 2)) - return nil - }) - eg.Go(func() error { // restart node 3 - time.Sleep(3 * time.Second) // wait for member list initialization - stopNodeAt(ctx, t, compose, 3) - require.Nil(t, compose.StartAt(ctx, 3)) - return nil - }) - - eg.Wait() - <-time.After(3 * time.Second) // wait for initialization -} - func stopNodeAt(ctx context.Context, t *testing.T, compose *docker.DockerCompose, index int) { <-time.After(1 * time.Second) require.Nil(t, compose.StopAt(ctx, index, nil)) diff --git a/test/acceptance/replication/scale_test.go b/test/acceptance/replication/scale_test.go index 231060aa1f..acca6b6511 100644 --- a/test/acceptance/replication/scale_test.go +++ b/test/acceptance/replication/scale_test.go @@ -98,17 +98,20 @@ func multiShardScaleOut(t *testing.T) { }) t.Run("assert paragraphs were scaled out", func(t *testing.T) { - n := getNodes(t, compose.GetWeaviate().URI()) - var shardsFound int - for _, node := range n.Nodes { - for _, shard := range node.Shards { - if shard.Class == paragraphClass.Class { - assert.EqualValues(t, 10, shard.ObjectCount) - shardsFound++ + // shard.ObjectCount is eventually consistent, see Bucket::CountAsync() + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + n := getNodes(t, compose.GetWeaviate().URI()) + var shardsFound int + for _, node := range n.Nodes { + for _, shard := range node.Shards { + if shard.Class == paragraphClass.Class { + assert.EqualValues(collect, int64(10), shard.ObjectCount) + shardsFound++ + } } } - } - assert.Equal(t, 2, shardsFound) + assert.Equal(collect, 2, shardsFound) + }, 10*time.Second, 100*time.Millisecond) }) t.Run("scale out articles", func(t *testing.T) { @@ -118,17 +121,20 @@ func multiShardScaleOut(t *testing.T) { }) t.Run("assert articles were scaled out", func(t *testing.T) { - n := getNodes(t, compose.GetWeaviate().URI()) - var shardsFound int - for _, node := range n.Nodes { - for _, shard := range node.Shards { - if shard.Class == articleClass.Class { - assert.EqualValues(t, 10, shard.ObjectCount) - shardsFound++ + // shard.ObjectCount is eventually consistent, see Bucket::CountAsync() + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + n := getNodes(t, compose.GetWeaviate().URI()) + var shardsFound int + for _, node := range n.Nodes { + for _, shard := range node.Shards { + if shard.Class == articleClass.Class { + assert.EqualValues(collect, int64(10), shard.ObjectCount) + shardsFound++ + } } } - } - assert.Equal(t, 2, shardsFound) + assert.Equal(collect, 2, shardsFound) + }, 10*time.Second, 100*time.Millisecond) }) t.Run("kill a node and check contents of remaining node", func(t *testing.T) { diff --git a/usecases/schema/executor.go b/usecases/schema/executor.go index eaeca60636..9c6a6b3924 100644 --- a/usecases/schema/executor.go +++ b/usecases/schema/executor.go @@ -98,6 +98,11 @@ func (e *executor) UpdateClass(req api.UpdateClassRequest) error { req.Class.InvertedIndexConfig); err != nil { return errors.Wrap(err, "inverted index config") } + + if err := e.migrator.UpdateReplicationFactor(ctx, className, req.Class.ReplicationConfig.Factor); err != nil { + return fmt.Errorf("replication index update: %w", err) + } + return nil } diff --git a/usecases/schema/executor_test.go b/usecases/schema/executor_test.go index 10e6ce3f8f..844dedef17 100644 --- a/usecases/schema/executor_test.go +++ b/usecases/schema/executor_test.go @@ -43,6 +43,9 @@ func TestExecutor(t *testing.T) { cls := &models.Class{ Class: "A", VectorIndexConfig: flat.NewDefaultUserConfig(), + ReplicationConfig: &models.ReplicationConfig{ + Factor: 1, + }, } store.On("ReadOnlySchema").Return(models.Schema{}) store.On("ReadOnlyClass", "A", mock.Anything).Return(cls) diff --git a/usecases/schema/helpers_test.go b/usecases/schema/helpers_test.go index 6dbdeac646..7c2c14a746 100644 --- a/usecases/schema/helpers_test.go +++ b/usecases/schema/helpers_test.go @@ -323,6 +323,10 @@ func (f *fakeMigrator) UpdateInvertedIndexConfig(ctx context.Context, className return args.Error(0) } +func (f *fakeMigrator) UpdateReplicationFactor(ctx context.Context, className string, factor int64) error { + return nil +} + func (f *fakeMigrator) WaitForStartup(ctx context.Context) error { args := f.Called(ctx) return args.Error(0) diff --git a/usecases/schema/migrator.go b/usecases/schema/migrator.go index 54cf3b655c..9f5cca740a 100644 --- a/usecases/schema/migrator.go +++ b/usecases/schema/migrator.go @@ -59,6 +59,7 @@ type Migrator interface { ValidateInvertedIndexConfigUpdate(old, updated *models.InvertedIndexConfig) error UpdateInvertedIndexConfig(ctx context.Context, className string, updated *models.InvertedIndexConfig) error + UpdateReplicationFactor(ctx context.Context, className string, factor int64) error WaitForStartup(context.Context) error Shutdown(context.Context) error }