Skip to content

Commit

Permalink
use flag for consistency wait timeout and rename it
Browse files Browse the repository at this point in the history
Signed-off-by: Loic Reyreaud <loic@weaviate.io>
  • Loading branch information
reyreaud-l committed May 7, 2024
1 parent 2d5ab07 commit 0aebbb8
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 78 deletions.
52 changes: 25 additions & 27 deletions adapters/handlers/rest/configure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
remoteIndexClient, appState.Logger, appState.ServerConfig.Config.Persistence.DataPath)
appState.Scaler = scaler

/// TODO-RAFT START
//
server2port, err := parseNode2Port(appState)
if len(server2port) == 0 || err != nil {
appState.Logger.
Expand All @@ -268,31 +266,31 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
dataPath := appState.ServerConfig.Config.Persistence.DataPath

rConfig := rStore.Config{
WorkDir: filepath.Join(dataPath, "raft"),
NodeID: nodeName,
Host: addrs[0],
RaftPort: appState.ServerConfig.Config.Raft.Port,
RPCPort: appState.ServerConfig.Config.Raft.InternalRPCPort,
RaftRPCMessageMaxSize: appState.ServerConfig.Config.Raft.RPCMessageMaxSize,
ServerName2PortMap: server2port,
BootstrapTimeout: appState.ServerConfig.Config.Raft.BootstrapTimeout,
BootstrapExpect: appState.ServerConfig.Config.Raft.BootstrapExpect,
HeartbeatTimeout: appState.ServerConfig.Config.Raft.HeartbeatTimeout,
RecoveryTimeout: appState.ServerConfig.Config.Raft.RecoveryTimeout,
ElectionTimeout: appState.ServerConfig.Config.Raft.ElectionTimeout,
SnapshotInterval: appState.ServerConfig.Config.Raft.SnapshotInterval,
SnapshotThreshold: appState.ServerConfig.Config.Raft.SnapshotThreshold,
UpdateWaitTimeout: time.Second * 10, // TODO-RAFT read from the flag
MetadataOnlyVoters: appState.ServerConfig.Config.Raft.MetadataOnlyVoters,
DB: nil,
Parser: schema.NewParser(appState.Cluster, vectorIndex.ParseAndValidateConfig, migrator),
AddrResolver: appState.Cluster,
Logger: appState.Logger,
LogLevel: logLevel(),
LogJSONFormat: !logTextFormat(),
IsLocalHost: appState.ServerConfig.Config.Cluster.Localhost,
LoadLegacySchema: schemaRepo.LoadLegacySchema,
SaveLegacySchema: schemaRepo.SaveLegacySchema,
WorkDir: filepath.Join(dataPath, "raft"),
NodeID: nodeName,
Host: addrs[0],
RaftPort: appState.ServerConfig.Config.Raft.Port,
RPCPort: appState.ServerConfig.Config.Raft.InternalRPCPort,
RaftRPCMessageMaxSize: appState.ServerConfig.Config.Raft.RPCMessageMaxSize,
ServerName2PortMap: server2port,
BootstrapTimeout: appState.ServerConfig.Config.Raft.BootstrapTimeout,
BootstrapExpect: appState.ServerConfig.Config.Raft.BootstrapExpect,
HeartbeatTimeout: appState.ServerConfig.Config.Raft.HeartbeatTimeout,
RecoveryTimeout: appState.ServerConfig.Config.Raft.RecoveryTimeout,
ElectionTimeout: appState.ServerConfig.Config.Raft.ElectionTimeout,
SnapshotInterval: appState.ServerConfig.Config.Raft.SnapshotInterval,
SnapshotThreshold: appState.ServerConfig.Config.Raft.SnapshotThreshold,
ConsistencyWaitTimeout: appState.ServerConfig.Config.Raft.ConsistencyWaitTimeout,
MetadataOnlyVoters: appState.ServerConfig.Config.Raft.MetadataOnlyVoters,
DB: nil,
Parser: schema.NewParser(appState.Cluster, vectorIndex.ParseAndValidateConfig, migrator),
AddrResolver: appState.Cluster,
Logger: appState.Logger,
LogLevel: logLevel(),
LogJSONFormat: !logTextFormat(),
IsLocalHost: appState.ServerConfig.Config.Cluster.Localhost,
LoadLegacySchema: schemaRepo.LoadLegacySchema,
SaveLegacySchema: schemaRepo.SaveLegacySchema,
}
for _, name := range appState.ServerConfig.Config.Raft.Join[:rConfig.BootstrapExpect] {
if strings.Contains(name, rConfig.NodeID) {
Expand Down
51 changes: 25 additions & 26 deletions cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ type Config struct {
RecoveryTimeout time.Duration
SnapshotInterval time.Duration
BootstrapTimeout time.Duration
// UpdateWaitTimeout Timeout duration for waiting for the update to be propagated to this follower node.
UpdateWaitTimeout time.Duration
SnapshotThreshold uint64
// ConsistencyWaitTimeout is the duration we will wait for a schema version to land on that node
ConsistencyWaitTimeout time.Duration
SnapshotThreshold uint64

DB Indexer
Parser Parser
Expand Down Expand Up @@ -149,9 +149,8 @@ type Store struct {

// applyTimeout timeout limit the amount of time raft waits for a command to be started
applyTimeout time.Duration

// UpdateWaitTimeout Timeout duration for waiting for the update to be propagated to this follower node.
updateWaitTimeout time.Duration
// consistencyWaitTimeout is the duration we will wait for a schema version to land on that node
consistencyWaitTimeout time.Duration

nodeID string
host string
Expand Down Expand Up @@ -191,25 +190,25 @@ type Store struct {

func New(cfg Config) Store {
return Store{
raftDir: cfg.WorkDir,
raftPort: cfg.RaftPort,
voter: cfg.Voter,
bootstrapExpect: cfg.BootstrapExpect,
candidates: make(map[string]string, cfg.BootstrapExpect),
recoveryTimeout: cfg.RecoveryTimeout,
heartbeatTimeout: cfg.HeartbeatTimeout,
electionTimeout: cfg.ElectionTimeout,
snapshotInterval: cfg.SnapshotInterval,
snapshotThreshold: cfg.SnapshotThreshold,
updateWaitTimeout: cfg.UpdateWaitTimeout,
applyTimeout: time.Second * 20,
nodeID: cfg.NodeID,
host: cfg.Host,
addResolver: newAddrResolver(&cfg),
db: &localDB{NewSchema(cfg.NodeID, cfg.DB), cfg.DB, cfg.Parser, cfg.Logger},
log: cfg.Logger,
logLevel: cfg.LogLevel,
logJsonFormat: cfg.LogJSONFormat,
raftDir: cfg.WorkDir,
raftPort: cfg.RaftPort,
voter: cfg.Voter,
bootstrapExpect: cfg.BootstrapExpect,
candidates: make(map[string]string, cfg.BootstrapExpect),
recoveryTimeout: cfg.RecoveryTimeout,
heartbeatTimeout: cfg.HeartbeatTimeout,
electionTimeout: cfg.ElectionTimeout,
snapshotInterval: cfg.SnapshotInterval,
snapshotThreshold: cfg.SnapshotThreshold,
consistencyWaitTimeout: cfg.ConsistencyWaitTimeout,
applyTimeout: time.Second * 20,
nodeID: cfg.NodeID,
host: cfg.Host,
addResolver: newAddrResolver(&cfg),
db: &localDB{NewSchema(cfg.NodeID, cfg.DB), cfg.DB, cfg.Parser, cfg.Logger},
log: cfg.Logger,
logLevel: cfg.LogLevel,
logJsonFormat: cfg.LogJSONFormat,

// if true voters will only serve schema
metadataOnlyVoters: cfg.MetadataOnlyVoters,
Expand Down Expand Up @@ -448,7 +447,7 @@ func (st *Store) WaitForAppliedIndex(ctx context.Context, period time.Duration,
if idx := st.lastAppliedIndex.Load(); idx >= version {
return nil
}
ctx, cancel := context.WithTimeout(ctx, st.updateWaitTimeout)
ctx, cancel := context.WithTimeout(ctx, st.consistencyWaitTimeout)
defer cancel()
ticker := time.NewTicker(period)
defer ticker.Stop()
Expand Down
32 changes: 16 additions & 16 deletions cluster/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,22 +852,22 @@ func NewMockStore(t *testing.T, nodeID string, raftPort int) MockStore {
logger: logger,

cfg: Config{
WorkDir: t.TempDir(),
NodeID: nodeID,
Host: "localhost",
RaftPort: raftPort,
Voter: true,
BootstrapExpect: 1,
HeartbeatTimeout: 1 * time.Second,
ElectionTimeout: 1 * time.Second,
RecoveryTimeout: 500 * time.Millisecond,
SnapshotInterval: 2 * time.Second,
SnapshotThreshold: 125,
DB: indexer,
Parser: parser,
AddrResolver: &MockAddressResolver{},
Logger: logger.Logger,
UpdateWaitTimeout: time.Millisecond * 50,
WorkDir: t.TempDir(),
NodeID: nodeID,
Host: "localhost",
RaftPort: raftPort,
Voter: true,
BootstrapExpect: 1,
HeartbeatTimeout: 1 * time.Second,
ElectionTimeout: 1 * time.Second,
RecoveryTimeout: 500 * time.Millisecond,
SnapshotInterval: 2 * time.Second,
SnapshotThreshold: 125,
DB: indexer,
Parser: parser,
AddrResolver: &MockAddressResolver{},
Logger: logger.Logger,
ConsistencyWaitTimeout: time.Millisecond * 50,
},
}
s := New(ms.cfg)
Expand Down
31 changes: 22 additions & 9 deletions usecases/config/config_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,16 @@ func (r ResourceUsage) Validate() error {
}

type Raft struct {
Port int
InternalRPCPort int
RPCMessageMaxSize int
Join []string
SnapshotThreshold uint64
HeartbeatTimeout time.Duration
RecoveryTimeout time.Duration
ElectionTimeout time.Duration
SnapshotInterval time.Duration
Port int
InternalRPCPort int
RPCMessageMaxSize int
Join []string
SnapshotThreshold uint64
HeartbeatTimeout time.Duration
RecoveryTimeout time.Duration
ElectionTimeout time.Duration
SnapshotInterval time.Duration
ConsistencyWaitTimeout time.Duration

BootstrapTimeout time.Duration
BootstrapExpect int
Expand Down Expand Up @@ -351,6 +352,18 @@ func (r *Raft) Validate() error {
if r.BootstrapExpect > len(r.Join) {
return fmt.Errorf("raft.bootstrap.expect must be less than or equal to the length of raft.join")
}

if r.SnapshotInterval <= 0 {
return fmt.Errorf("raft.bootstrap.snapshot_interval must be more than 0")
}

if r.SnapshotThreshold <= 0 {
return fmt.Errorf("raft.bootstrap.snapshot_threshold must be more than 0")
}

if r.ConsistencyWaitTimeout <= 0 {
return fmt.Errorf("raft.bootstrap.consistency_wait_timeout must be more than 0")
}
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions usecases/config/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,14 @@ func parseRAFTConfig(hostname string) (Raft, error) {
return cfg, err
}

if err := parsePositiveInt(
"RAFT_CONSISTENCY_WAIT_TIMEOUT",
func(val int) { cfg.ConsistencyWaitTimeout = time.Second * time.Duration(val) },
10,
); err != nil {
return cfg, err
}

return cfg, nil
}

Expand Down

0 comments on commit 0aebbb8

Please sign in to comment.