Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve VTOrc config handling to support dynamic variables #17218

Merged
merged 23 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b3e4aec
feat: remove deprecated flag
GuptaManan100 Nov 12, 2024
85913d1
feat: remove config file flag and start using viper instead for insta…
GuptaManan100 Nov 12, 2024
0a27ba8
feat: move prevent cross cell promotion as well to viper
GuptaManan100 Nov 12, 2024
c6c89fb
feat: add sqldatafile, snapshot time and reasonable replication log t…
GuptaManan100 Nov 12, 2024
7b86cb3
feat: add audit file location to viper
GuptaManan100 Nov 12, 2024
1ab5cc3
feat: move remaining audit flags to viper
GuptaManan100 Nov 12, 2024
ec2ef87
feat: move remaining configs to viper
GuptaManan100 Nov 12, 2024
2121e16
feat: add api to read config and add a test for dynamic configuration
GuptaManan100 Nov 12, 2024
0420ce5
feat: add summary changes
GuptaManan100 Nov 12, 2024
897ab10
feat: fix vtorc config in local example
GuptaManan100 Nov 13, 2024
d20979b
feat: change way we use config in the e2e tests
GuptaManan100 Nov 21, 2024
d5923cd
feat: add two more fields to viper
GuptaManan100 Nov 21, 2024
7ef3b3b
Merge remote-tracking branch 'upstream/main' into vtorc-config-reload…
GuptaManan100 Nov 21, 2024
6822752
feat: fix flag in config
GuptaManan100 Nov 22, 2024
8b1e52b
test: start adding dynamic config tests in e2e fashion
GuptaManan100 Nov 24, 2024
3a422fa
test: add all the remaining configs to the test
GuptaManan100 Nov 24, 2024
ed96f72
feat: change the keys of the viper configs to match the flag names
GuptaManan100 Nov 25, 2024
9f14d8b
feat: add logging for configuration changes
GuptaManan100 Nov 25, 2024
b63d4f8
feat: fix config in examples and update summary
GuptaManan100 Nov 26, 2024
a939288
Merge remote-tracking branch 'upstream/main' into vtorc-config-reload…
GuptaManan100 Nov 27, 2024
cb16b34
Merge remote-tracking branch 'upstream/main' into vtorc-config-reload…
GuptaManan100 Dec 3, 2024
a9761f5
feat: address review comments
GuptaManan100 Dec 3, 2024
85fbc91
feat: don't use deleted flag in vtorc
GuptaManan100 Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: remove config file flag and start using viper instead for insta…
…nce poll seconds

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Nov 12, 2024
commit 85913d118bc231e208e59eee7e9e52a9a21df62b
10 changes: 1 addition & 9 deletions go/cmd/vtorc/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
)

var (
configFile string
Main = &cobra.Command{
Main = &cobra.Command{
Use: "vtorc",
Short: "VTOrc is the automated fault detection and repair tool in Vitess.",
Example: `vtorc \
Expand All @@ -55,11 +54,6 @@ func run(cmd *cobra.Command, args []string) {
inst.RegisterStats()

log.Info("starting vtorc")
if len(configFile) > 0 {
config.ForceRead(configFile)
} else {
config.Read("/etc/vtorc.conf.json", "conf/vtorc.conf.json", "vtorc.conf.json")
}
if config.Config.AuditToSyslog {
inst.EnableAuditSyslog()
}
Expand Down Expand Up @@ -96,7 +90,5 @@ func init() {
servenv.MoveFlagsToCobraCommand(Main)

logic.RegisterFlags(Main.Flags())
config.RegisterFlags(Main.Flags())
acl.RegisterFlags(Main.Flags())
Main.Flags().StringVar(&configFile, "config", "", "config file name")
}
1 change: 0 additions & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Flags:
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config string config file name
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
--config-name string Name of the config file (without extension) to search for. (default "vtconfig")
Expand Down
11 changes: 5 additions & 6 deletions go/test/endtoend/cluster/vtorc_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ type VTOrcProcess struct {
}

type VTOrcConfiguration struct {
InstancePollSeconds int `json:",omitempty"`
TopologyRefreshSeconds int `json:",omitempty"`
InstancePollTime string `json:",omitempty"`
PreventCrossDataCenterPrimaryFailover bool `json:",omitempty"`
LockShardTimeoutSeconds int `json:",omitempty"`
ReplicationLagQuery string `json:",omitempty"`
Expand All @@ -64,7 +63,7 @@ func (config *VTOrcConfiguration) ToJSONString() string {
}

func (config *VTOrcConfiguration) AddDefaults(webPort int) {
config.InstancePollSeconds = 1
config.InstancePollTime = "10h"
}

// Setup starts orc process with required arguements
Expand Down Expand Up @@ -105,10 +104,10 @@ func (orc *VTOrcProcess) Setup() (err error) {
"--topo_implementation", orc.TopoImplementation,
"--topo_global_server_address", orc.TopoGlobalAddress,
"--topo_global_root", orc.TopoGlobalRoot,
"--config", orc.ConfigPath,
"--config-file", orc.ConfigPath,
"--port", fmt.Sprintf("%d", orc.Port),
// This parameter is overriden from the config file, added here to just verify that we indeed use the config file paramter over the flag
"--instance-poll-time", "10h",
// This parameter is overriden from the config file. This verifies that we indeed use the flag value over the config file.
"--instance-poll-time", "1s",
// Faster topo information refresh speeds up the tests. This doesn't add any significant load either.
"--topo-information-refresh-duration", "3s",
"--bind-address", "127.0.0.1",
Expand Down
3 changes: 1 addition & 2 deletions go/test/endtoend/vtorc/readtopologyinstance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
"--topo_global_root", clusterInfo.ClusterInstance.VtctlProcess.TopoGlobalRoot,
}
servenv.ParseFlags("vtorc")
config.Config.RecoveryPeriodBlockSeconds = 1
config.Config.InstancePollSeconds = 1
config.SetInstancePollTime(1 * time.Second)
config.MarkConfigurationLoaded()
server.StartVTOrcDiscovery()

Expand Down
4 changes: 4 additions & 0 deletions go/viperutil/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ import (
func Debug() {
registry.Combined().Debug()
}

func WriteConfigAs(filename string) error {
return registry.Combined().WriteConfigAs(filename)
}
70 changes: 39 additions & 31 deletions go/vt/vtorc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/viperutil"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
)

var configurationLoaded = make(chan bool)
Expand All @@ -41,9 +43,19 @@ const (
UnseenInstanceForgetHours = 240 // Number of hours after which an unseen instance is forgotten
)

var (
instancePollTime = viperutil.Configure(
"InstancePollTime",
viperutil.Options[time.Duration]{
FlagName: "instance-poll-time",
Default: 5 * time.Second,
Dynamic: true,
Copy link
Member Author

@GuptaManan100 GuptaManan100 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the field dynamic

},
)
)

var (
sqliteDataFile = "file::memory:?mode=memory&cache=shared"
instancePollTime = 5 * time.Second
snapshotTopologyInterval = 0 * time.Hour
reasonableReplicationLag = 10 * time.Second
auditFileLocation = ""
Expand All @@ -59,10 +71,14 @@ var (
convertTabletsWithErrantGTIDs = false
)

// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
func init() {
servenv.OnParseFor("vtorc", registerFlags)
}

// registerFlags registers the flags required by VTOrc
func registerFlags(fs *pflag.FlagSet) {
fs.StringVar(&sqliteDataFile, "sqlite-data-file", sqliteDataFile, "SQLite Datafile to use as VTOrc's database")
fs.DurationVar(&instancePollTime, "instance-poll-time", instancePollTime, "Timer duration on which VTOrc refreshes MySQL information")
fs.Duration("instance-poll-time", instancePollTime.Default(), "Timer duration on which VTOrc refreshes MySQL information")
fs.DurationVar(&snapshotTopologyInterval, "snapshot-topology-interval", snapshotTopologyInterval, "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours")
fs.DurationVar(&reasonableReplicationLag, "reasonable-replication-lag", reasonableReplicationLag, "Maximum replication lag on replicas which is deemed to be acceptable")
fs.StringVar(&auditFileLocation, "audit-file-location", auditFileLocation, "File location where the audit logs are to be stored")
Expand All @@ -76,6 +92,10 @@ func RegisterFlags(fs *pflag.FlagSet) {
fs.DurationVar(&recoveryPollDuration, "recovery-poll-duration", recoveryPollDuration, "Timer duration on which VTOrc polls its database to run a recovery")
fs.BoolVar(&ersEnabled, "allow-emergency-reparent", ersEnabled, "Whether VTOrc should be allowed to run emergency reparent operation when it detects a dead primary")
fs.BoolVar(&convertTabletsWithErrantGTIDs, "change-tablets-with-errant-gtid-to-drained", convertTabletsWithErrantGTIDs, "Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED")

viperutil.BindFlags(fs,
instancePollTime,
)
}

// Configuration makes for vtorc configuration input, which can be provided by user via JSON formatted file.
Expand All @@ -84,7 +104,6 @@ func RegisterFlags(fs *pflag.FlagSet) {
// TODO(sougou): change this to yaml parsing, and possible merge with tabletenv.
type Configuration struct {
SQLite3DataFile string // full path to sqlite3 datafile
InstancePollSeconds uint // Number of seconds between instance reads
SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled)
ReasonableReplicationLagSeconds int // Above this value is considered a problem
AuditLogFile string // Name of log file for audit operations. Disabled when empty.
Expand All @@ -106,13 +125,26 @@ func (config *Configuration) ToJSONString() string {

// Config is *the* configuration instance, used globally to get configuration data
var Config = newConfiguration()
var readFileNames []string

// GetInstancePollTime is a getter function.
func GetInstancePollTime() time.Duration {
return instancePollTime.Get()
}

// SetInstancePollTime is a setter function.
func SetInstancePollTime(v time.Duration) {
instancePollTime.Set(v)
}

// GetInstancePollSeconds gets the instance poll time but in seconds.
func GetInstancePollSeconds() uint {
return uint(instancePollTime.Get() / time.Second)
}

// UpdateConfigValuesFromFlags is used to update the config values from the flags defined.
// This is done before we read any configuration files from the user. So the config files take precedence.
func UpdateConfigValuesFromFlags() {
Config.SQLite3DataFile = sqliteDataFile
Config.InstancePollSeconds = uint(instancePollTime / time.Second)
Config.SnapshotTopologiesIntervalHours = uint(snapshotTopologyInterval / time.Hour)
Config.ReasonableReplicationLagSeconds = int(reasonableReplicationLag / time.Second)
Config.AuditLogFile = auditFileLocation
Expand Down Expand Up @@ -155,7 +187,6 @@ func LogConfigValues() {
func newConfiguration() *Configuration {
return &Configuration{
SQLite3DataFile: "file::memory:?mode=memory&cache=shared",
InstancePollSeconds: 5,
SnapshotTopologiesIntervalHours: 0,
ReasonableReplicationLagSeconds: 10,
AuditLogFile: "",
Expand Down Expand Up @@ -200,31 +231,8 @@ func read(fileName string) (*Configuration, error) {
return Config, err
}

// Read reads configuration from zero, either, some or all given files, in order of input.
// A file can override configuration provided in previous file.
func Read(fileNames ...string) *Configuration {
for _, fileName := range fileNames {
_, _ = read(fileName)
}
readFileNames = fileNames
return Config
}

// ForceRead reads configuration from given file name or bails out if it fails
func ForceRead(fileName string) *Configuration {
_, err := read(fileName)
if err != nil {
log.Fatal("Cannot read config file:", fileName, err)
}
readFileNames = []string{fileName}
return Config
}

// Reload re-reads configuration from last used files
func Reload(extraFileNames ...string) *Configuration {
for _, fileName := range readFileNames {
_, _ = read(fileName)
}
for _, fileName := range extraFileNames {
_, _ = read(fileName)
}
Expand Down
15 changes: 0 additions & 15 deletions go/vt/vtorc/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,6 @@ func TestUpdateConfigValuesFromFlags(t *testing.T) {
require.Equal(t, testConfig, Config)
})

t.Run("override instancePollTime", func(t *testing.T) {
oldInstancePollTime := instancePollTime
instancePollTime = 7 * time.Second
// Restore the changes we make
defer func() {
Config = newConfiguration()
instancePollTime = oldInstancePollTime
}()

testConfig := newConfiguration()
testConfig.InstancePollSeconds = 7
UpdateConfigValuesFromFlags()
require.Equal(t, testConfig, Config)
})

t.Run("override snapshotTopologyInterval", func(t *testing.T) {
oldSnapshotTopologyInterval := snapshotTopologyInterval
snapshotTopologyInterval = 1 * time.Hour
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtorc/discovery/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (q *Queue) Consume() string {

// alarm if have been waiting for too long
timeOnQueue := time.Since(q.queuedKeys[key])
if timeOnQueue > time.Duration(config.Config.InstancePollSeconds)*time.Second {
if timeOnQueue > config.GetInstancePollTime() {
log.Warningf("key %v spent %.4fs waiting on a discoveryQueue", key, timeOnQueue.Seconds())
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtorc/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,5 @@ func (replicationAnalysis *ReplicationAnalysis) MarshalJSON() ([]byte, error) {
// ValidSecondsFromSeenToLastAttemptedCheck returns the maximum allowed elapsed time
// between last_attempted_check to last_checked before we consider the instance as invalid.
func ValidSecondsFromSeenToLastAttemptedCheck() uint {
return config.Config.InstancePollSeconds + 1
return config.GetInstancePollSeconds()
}
10 changes: 5 additions & 5 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func init() {

func initializeInstanceDao() {
config.WaitForConfigurationToBeLoaded()
forgetAliases = cache.New(time.Duration(config.Config.InstancePollSeconds*3)*time.Second, time.Second)
forgetAliases = cache.New(config.GetInstancePollTime()*3, time.Second)
cacheInitializationCompleted.Store(true)
}

Expand Down Expand Up @@ -544,8 +544,8 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {
instance.ReplicationDepth = m.GetUint("replication_depth")
instance.IsCoPrimary = m.GetBool("is_co_primary")
instance.HasReplicationCredentials = m.GetBool("has_replication_credentials")
instance.IsUpToDate = (m.GetUint("seconds_since_last_checked") <= config.Config.InstancePollSeconds)
instance.IsRecentlyChecked = (m.GetUint("seconds_since_last_checked") <= config.Config.InstancePollSeconds*5)
instance.IsUpToDate = m.GetUint("seconds_since_last_checked") <= config.GetInstancePollSeconds()
instance.IsRecentlyChecked = m.GetUint("seconds_since_last_checked") <= config.GetInstancePollSeconds()*5
instance.LastSeenTimestamp = m.GetString("last_seen")
instance.IsLastCheckValid = m.GetBool("is_last_check_valid")
instance.SecondsSinceLastSeen = m.GetNullInt64("seconds_since_last_seen")
Expand Down Expand Up @@ -646,7 +646,7 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error)
)
`

args := sqlutils.Args(keyspace, keyspace, shard, shard, config.Config.InstancePollSeconds*5, config.Config.ReasonableReplicationLagSeconds, config.Config.ReasonableReplicationLagSeconds)
args := sqlutils.Args(keyspace, keyspace, shard, shard, config.GetInstancePollSeconds()*5, config.Config.ReasonableReplicationLagSeconds, config.Config.ReasonableReplicationLagSeconds)
return readInstancesByCondition(condition, args, "")
}

Expand Down Expand Up @@ -716,7 +716,7 @@ func ReadOutdatedInstanceKeys() ([]string, error) {
WHERE
database_instance.alias IS NULL
`
args := sqlutils.Args(config.Config.InstancePollSeconds, 2*config.Config.InstancePollSeconds)
args := sqlutils.Args(config.GetInstancePollSeconds(), 2*config.GetInstancePollSeconds())

err := db.QueryVTOrc(query, args, func(m sqlutils.RowMap) error {
tabletAlias := m.GetString("alias")
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vtorc/inst/instance_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,11 @@ func TestReadProblemInstances(t *testing.T) {

// We need to set InstancePollSeconds to a large value otherwise all the instances are reported as having problems since their last_checked is very old.
// Setting this value to a hundred years, we ensure that this test doesn't fail with this issue for the next hundred years.
oldVal := config.Config.InstancePollSeconds
oldVal := config.GetInstancePollTime()
defer func() {
config.Config.InstancePollSeconds = oldVal
config.SetInstancePollTime(oldVal)
}()
config.Config.InstancePollSeconds = 60 * 60 * 24 * 365 * 100
config.SetInstancePollTime(60 * 60 * 24 * 365 * 100 * time.Second)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -325,11 +325,11 @@ func TestReadInstancesWithErrantGTIds(t *testing.T) {

// We need to set InstancePollSeconds to a large value otherwise all the instances are reported as having problems since their last_checked is very old.
// Setting this value to a hundred years, we ensure that this test doesn't fail with this issue for the next hundred years.
oldVal := config.Config.InstancePollSeconds
oldVal := config.GetInstancePollTime()
defer func() {
config.Config.InstancePollSeconds = oldVal
config.SetInstancePollTime(oldVal)
}()
config.Config.InstancePollSeconds = 60 * 60 * 24 * 365 * 100
config.SetInstancePollTime(60 * 60 * 24 * 365 * 100 * time.Second)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -459,13 +459,13 @@ func TestReadOutdatedInstanceKeys(t *testing.T) {
waitForCacheInitialization()

// We are setting InstancePollSeconds to 59 minutes, just for the test.
oldVal := config.Config.InstancePollSeconds
oldVal := config.GetInstancePollTime()
oldCache := forgetAliases
defer func() {
forgetAliases = oldCache
config.Config.InstancePollSeconds = oldVal
config.SetInstancePollTime(oldVal)
}()
config.Config.InstancePollSeconds = 60 * 25
config.SetInstancePollTime(60 * 25 * time.Second)
forgetAliases = cache.New(time.Minute, time.Minute)

for _, tt := range tests {
Expand Down
11 changes: 3 additions & 8 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ func init() {
})
}

// used in several places
func instancePollSecondsDuration() time.Duration {
return time.Duration(config.Config.InstancePollSeconds) * time.Second
}

// acceptSighupSignal registers for SIGHUP signal from the OS to reload the configuration files.
func acceptSighupSignal() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GuptaManan100 double-checking: the 3rd party library handles SIGHUP reload?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SIGHUP is not even required anymore. Viper has a watcher on the file, and if it changes, it automatically reloads the configurations.

c := make(chan os.Signal, 1)
Expand Down Expand Up @@ -161,7 +156,7 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) {
defer func() {
latency.Stop("total")
discoveryTime := latency.Elapsed("total")
if discoveryTime > instancePollSecondsDuration() {
if discoveryTime > config.GetInstancePollTime() {
instancePollSecondsExceededCounter.Add(1)
log.Warningf("discoverInstance exceeded InstancePollSeconds for %+v, took %.4fs", tabletAlias, discoveryTime.Seconds())
if metric != nil {
Expand All @@ -177,7 +172,7 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) {
// Calculate the expiry period each time as InstancePollSeconds
// _may_ change during the run of the process (via SIGHUP) and
// it is not possible to change the cache's default expiry..
if existsInCacheError := recentDiscoveryOperationKeys.Add(tabletAlias, true, instancePollSecondsDuration()); existsInCacheError != nil && !forceDiscovery {
if existsInCacheError := recentDiscoveryOperationKeys.Add(tabletAlias, true, config.GetInstancePollTime()); existsInCacheError != nil && !forceDiscovery {
// Just recently attempted
return
}
Expand Down Expand Up @@ -271,7 +266,7 @@ func onHealthTick() {
// nolint SA1015: using time.Tick leaks the underlying ticker
func ContinuousDiscovery() {
log.Infof("continuous discovery: setting up")
recentDiscoveryOperationKeys = cache.New(instancePollSecondsDuration(), time.Second)
recentDiscoveryOperationKeys = cache.New(config.GetInstancePollTime(), time.Second)

go handleDiscoveryRequests()

Expand Down