diff --git a/lib/config/namespace.go b/lib/config/namespace.go index 9942122f..346e0097 100644 --- a/lib/config/namespace.go +++ b/lib/config/namespace.go @@ -40,6 +40,14 @@ type BackendNamespace struct { //HealthCheck HealthCheck `yaml:"health-check" json:"health-check" toml:"health-check"` } +const ( + healthCheckInterval = 3 * time.Second + healthCheckMaxRetries = 3 + healthCheckRetryInterval = 1 * time.Second + healthCheckTimeout = 2 * time.Second + tombstoneThreshold = 5 * time.Minute +) + // HealthCheck contains some configurations for health check. // Some general configurations of them may be exposed to users in the future. // We can use shorter durations to speed up unit tests. @@ -52,6 +60,36 @@ type HealthCheck struct { TombstoneThreshold time.Duration `yaml:"tombstone-threshold" json:"tombstone-threshold" toml:"tombstone-threshold"` } +// NewDefaultHealthCheckConfig creates a default HealthCheck. +func NewDefaultHealthCheckConfig() *HealthCheck { + return &HealthCheck{ + Enable: true, + Interval: healthCheckInterval, + MaxRetries: healthCheckMaxRetries, + RetryInterval: healthCheckRetryInterval, + DialTimeout: healthCheckTimeout, + TombstoneThreshold: tombstoneThreshold, + } +} + +func (hc *HealthCheck) Check() { + if hc.Interval == 0 { + hc.Interval = healthCheckInterval + } + if hc.MaxRetries == 0 { + hc.MaxRetries = healthCheckMaxRetries + } + if hc.RetryInterval == 0 { + hc.RetryInterval = healthCheckRetryInterval + } + if hc.DialTimeout == 0 { + hc.DialTimeout = healthCheckTimeout + } + if hc.TombstoneThreshold == 0 { + hc.TombstoneThreshold = tombstoneThreshold + } +} + func NewNamespace(data []byte) (*Namespace, error) { var cfg Namespace if err := toml.Unmarshal(data, &cfg); err != nil { diff --git a/pkg/manager/namespace/manager.go b/pkg/manager/namespace/manager.go index 3f826e28..c0b0a60f 100644 --- a/pkg/manager/namespace/manager.go +++ b/pkg/manager/namespace/manager.go @@ -44,11 +44,11 @@ func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, var fetcher router.BackendFetcher if mgr.client != nil { - fetcher = router.NewPDFetcher(mgr.client, logger.Named("be_fetcher"), router.NewDefaultHealthCheckConfig()) + fetcher = router.NewPDFetcher(mgr.client, logger.Named("be_fetcher"), config.NewDefaultHealthCheckConfig()) } else { fetcher = router.NewStaticFetcher(cfg.Backend.Instances) } - rt, err := router.NewScoreBasedRouter(logger.Named("router"), mgr.httpCli, fetcher, router.NewDefaultHealthCheckConfig()) + rt, err := router.NewScoreBasedRouter(logger.Named("router"), mgr.httpCli, fetcher, config.NewDefaultHealthCheckConfig()) if err != nil { return nil, errors.Errorf("build router error: %w", err) } diff --git a/pkg/manager/router/backend_fetcher.go b/pkg/manager/router/backend_fetcher.go index ed8890fd..b06bad45 100644 --- a/pkg/manager/router/backend_fetcher.go +++ b/pkg/manager/router/backend_fetcher.go @@ -94,6 +94,7 @@ type PDFetcher struct { } func NewPDFetcher(client *clientv3.Client, logger *zap.Logger, config *config.HealthCheck) *PDFetcher { + config.Check() return &PDFetcher{ backendInfo: make(map[string]*pdBackendInfo), client: client, diff --git a/pkg/manager/router/backend_observer.go b/pkg/manager/router/backend_observer.go index c3596eb7..6a7f80c5 100644 --- a/pkg/manager/router/backend_observer.go +++ b/pkg/manager/router/backend_observer.go @@ -67,28 +67,11 @@ var statusScores = map[BackendStatus]int{ } const ( - healthCheckInterval = 3 * time.Second - healthCheckMaxRetries = 3 - healthCheckRetryInterval = 1 * time.Second - healthCheckTimeout = 2 * time.Second - tombstoneThreshold = 5 * time.Minute - ttlPathSuffix = "/ttl" - infoPathSuffix = "/info" - statusPathSuffix = "/status" + ttlPathSuffix = "/ttl" + infoPathSuffix = "/info" + statusPathSuffix = "/status" ) -// NewDefaultHealthCheckConfig creates a default HealthCheck. -func NewDefaultHealthCheckConfig() *config.HealthCheck { - return &config.HealthCheck{ - Enable: true, - Interval: healthCheckInterval, - MaxRetries: healthCheckMaxRetries, - RetryInterval: healthCheckRetryInterval, - DialTimeout: healthCheckTimeout, - TombstoneThreshold: tombstoneThreshold, - } -} - // BackendEventReceiver receives the event of backend status change. type BackendEventReceiver interface { // OnBackendChanged is called when the backend list changes. diff --git a/pkg/manager/router/backend_observer_test.go b/pkg/manager/router/backend_observer_test.go index 4b521666..51d84bba 100644 --- a/pkg/manager/router/backend_observer_test.go +++ b/pkg/manager/router/backend_observer_test.go @@ -57,10 +57,10 @@ func newHealthCheckConfigForTest() *config.HealthCheck { return &config.HealthCheck{ Enable: true, Interval: 500 * time.Millisecond, - MaxRetries: healthCheckMaxRetries, + MaxRetries: 3, RetryInterval: 100 * time.Millisecond, DialTimeout: 100 * time.Millisecond, - TombstoneThreshold: tombstoneThreshold, + TombstoneThreshold: 5 * time.Minute, } } diff --git a/pkg/manager/router/router_score.go b/pkg/manager/router/router_score.go index 1b85e56d..9b1a6b54 100644 --- a/pkg/manager/router/router_score.go +++ b/pkg/manager/router/router_score.go @@ -48,8 +48,7 @@ func NewScoreBasedRouter(logger *zap.Logger, httpCli *http.Client, fetcher Backe logger: logger, backends: list.New(), } - router.Lock() - defer router.Unlock() + cfg.Check() observer, err := StartBackendObserver(logger.Named("observer"), router, httpCli, cfg, fetcher) if err != nil { return nil, err @@ -393,8 +392,6 @@ func (router *ScoreBasedRouter) ConnCount() int { // Close implements Router.Close interface. func (router *ScoreBasedRouter) Close() { - router.Lock() - defer router.Unlock() if router.cancelFunc != nil { router.cancelFunc() router.cancelFunc = nil diff --git a/pkg/manager/router/router_test.go b/pkg/manager/router/router_test.go index 195c1d00..a1f0c804 100644 --- a/pkg/manager/router/router_test.go +++ b/pkg/manager/router/router_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/pingcap/TiProxy/lib/config" "github.com/pingcap/TiProxy/lib/util/errors" "github.com/pingcap/TiProxy/lib/util/logger" "github.com/pingcap/TiProxy/lib/util/waitgroup" @@ -694,7 +695,7 @@ func TestRefresh(t *testing.T) { logger: lg, backends: list.New(), } - cfg := NewDefaultHealthCheckConfig() + cfg := config.NewDefaultHealthCheckConfig() cfg.Interval = time.Minute observer, err := StartBackendObserver(lg, rt, nil, cfg, fetcher) require.NoError(t, err) @@ -714,11 +715,11 @@ func TestRefresh(t *testing.T) { m.Unlock() defer server.close() // The backends are refreshed very soon. - finally(t, func() bool { + require.Eventually(t, func() bool { addr, err = selector.Next() require.NoError(t, err) return len(addr) > 0 - }, 100*time.Millisecond, 3*time.Second) + }, 3*time.Second, 100*time.Millisecond) } func TestObserveError(t *testing.T) { @@ -754,43 +755,60 @@ func TestObserveError(t *testing.T) { m.Unlock() defer server.close() // The backends are refreshed very soon. - finally(t, func() bool { + require.Eventually(t, func() bool { selector.Reset() addr, err = selector.Next() require.NoError(t, err) return len(addr) > 0 - }, 100*time.Millisecond, 3*time.Second) + }, 3*time.Second, 100*time.Millisecond) // Mock an observe error. m.Lock() observeError = errors.New("mock observe error") m.Unlock() - finally(t, func() bool { + require.Eventually(t, func() bool { selector.Reset() addr, err = selector.Next() return len(addr) == 0 && err != nil - }, 100*time.Millisecond, 3*time.Second) + }, 3*time.Second, 100*time.Millisecond) // Clear the observe error. m.Lock() observeError = nil m.Unlock() - finally(t, func() bool { + require.Eventually(t, func() bool { selector.Reset() addr, err = selector.Next() return len(addr) > 0 && err == nil - }, 100*time.Millisecond, 3*time.Second) -} - -func finally(t *testing.T, checker func() bool, checkInterval, timeout time.Duration) { - timer := time.NewTimer(timeout) - defer timer.Stop() - for { - select { - case <-timer.C: - t.Fatal("timeout") - case <-time.After(checkInterval): - if checker() { - return - } - } - } + }, 3*time.Second, 100*time.Millisecond) +} + +func TestDisableHealthCheck(t *testing.T) { + backends := []string{"127.0.0.1:4000"} + var m sync.Mutex + fetcher := NewExternalFetcher(func() ([]string, error) { + m.Lock() + defer m.Unlock() + return backends, nil + }) + // Create a router with a very short health check interval. + lg := logger.CreateLoggerForTest(t) + rt, err := NewScoreBasedRouter(lg, nil, fetcher, &config.HealthCheck{Enable: false}) + require.NoError(t, err) + defer rt.Close() + // No backends and no error. + selector := rt.GetBackendSelector() + // The backends are refreshed very soon. + require.Eventually(t, func() bool { + addr, err := selector.Next() + require.NoError(t, err) + return addr == "127.0.0.1:4000" + }, 3*time.Second, 100*time.Millisecond) + // Replace the backend. + m.Lock() + backends[0] = "127.0.0.1:5000" + m.Unlock() + require.Eventually(t, func() bool { + addr, err := selector.Next() + require.NoError(t, err) + return addr == "127.0.0.1:5000" + }, 3*time.Second, 100*time.Millisecond) }