Skip to content

Commit

Permalink
router, config: fix deadlock during closing the router (#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Feb 2, 2023
1 parent 4a70be3 commit 32e96e6
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 52 deletions.
38 changes: 38 additions & 0 deletions lib/config/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ type BackendNamespace struct {
//HealthCheck HealthCheck `yaml:"health-check" json:"health-check" toml:"health-check"`
}

const (
healthCheckInterval = 3 * time.Second
healthCheckMaxRetries = 3
healthCheckRetryInterval = 1 * time.Second
healthCheckTimeout = 2 * time.Second
tombstoneThreshold = 5 * time.Minute
)

// HealthCheck contains some configurations for health check.
// Some general configurations of them may be exposed to users in the future.
// We can use shorter durations to speed up unit tests.
Expand All @@ -52,6 +60,36 @@ type HealthCheck struct {
TombstoneThreshold time.Duration `yaml:"tombstone-threshold" json:"tombstone-threshold" toml:"tombstone-threshold"`
}

// NewDefaultHealthCheckConfig creates a default HealthCheck.
func NewDefaultHealthCheckConfig() *HealthCheck {
return &HealthCheck{
Enable: true,
Interval: healthCheckInterval,
MaxRetries: healthCheckMaxRetries,
RetryInterval: healthCheckRetryInterval,
DialTimeout: healthCheckTimeout,
TombstoneThreshold: tombstoneThreshold,
}
}

func (hc *HealthCheck) Check() {
if hc.Interval == 0 {
hc.Interval = healthCheckInterval
}
if hc.MaxRetries == 0 {
hc.MaxRetries = healthCheckMaxRetries
}
if hc.RetryInterval == 0 {
hc.RetryInterval = healthCheckRetryInterval
}
if hc.DialTimeout == 0 {
hc.DialTimeout = healthCheckTimeout
}
if hc.TombstoneThreshold == 0 {
hc.TombstoneThreshold = tombstoneThreshold
}
}

func NewNamespace(data []byte) (*Namespace, error) {
var cfg Namespace
if err := toml.Unmarshal(data, &cfg); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace,

var fetcher router.BackendFetcher
if mgr.client != nil {
fetcher = router.NewPDFetcher(mgr.client, logger.Named("be_fetcher"), router.NewDefaultHealthCheckConfig())
fetcher = router.NewPDFetcher(mgr.client, logger.Named("be_fetcher"), config.NewDefaultHealthCheckConfig())
} else {
fetcher = router.NewStaticFetcher(cfg.Backend.Instances)
}
rt, err := router.NewScoreBasedRouter(logger.Named("router"), mgr.httpCli, fetcher, router.NewDefaultHealthCheckConfig())
rt, err := router.NewScoreBasedRouter(logger.Named("router"), mgr.httpCli, fetcher, config.NewDefaultHealthCheckConfig())
if err != nil {
return nil, errors.Errorf("build router error: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/manager/router/backend_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type PDFetcher struct {
}

func NewPDFetcher(client *clientv3.Client, logger *zap.Logger, config *config.HealthCheck) *PDFetcher {
config.Check()
return &PDFetcher{
backendInfo: make(map[string]*pdBackendInfo),
client: client,
Expand Down
23 changes: 3 additions & 20 deletions pkg/manager/router/backend_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,11 @@ var statusScores = map[BackendStatus]int{
}

const (
healthCheckInterval = 3 * time.Second
healthCheckMaxRetries = 3
healthCheckRetryInterval = 1 * time.Second
healthCheckTimeout = 2 * time.Second
tombstoneThreshold = 5 * time.Minute
ttlPathSuffix = "/ttl"
infoPathSuffix = "/info"
statusPathSuffix = "/status"
ttlPathSuffix = "/ttl"
infoPathSuffix = "/info"
statusPathSuffix = "/status"
)

// NewDefaultHealthCheckConfig creates a default HealthCheck.
func NewDefaultHealthCheckConfig() *config.HealthCheck {
return &config.HealthCheck{
Enable: true,
Interval: healthCheckInterval,
MaxRetries: healthCheckMaxRetries,
RetryInterval: healthCheckRetryInterval,
DialTimeout: healthCheckTimeout,
TombstoneThreshold: tombstoneThreshold,
}
}

// BackendEventReceiver receives the event of backend status change.
type BackendEventReceiver interface {
// OnBackendChanged is called when the backend list changes.
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/router/backend_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func newHealthCheckConfigForTest() *config.HealthCheck {
return &config.HealthCheck{
Enable: true,
Interval: 500 * time.Millisecond,
MaxRetries: healthCheckMaxRetries,
MaxRetries: 3,
RetryInterval: 100 * time.Millisecond,
DialTimeout: 100 * time.Millisecond,
TombstoneThreshold: tombstoneThreshold,
TombstoneThreshold: 5 * time.Minute,
}
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/manager/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func NewScoreBasedRouter(logger *zap.Logger, httpCli *http.Client, fetcher Backe
logger: logger,
backends: list.New(),
}
router.Lock()
defer router.Unlock()
cfg.Check()
observer, err := StartBackendObserver(logger.Named("observer"), router, httpCli, cfg, fetcher)
if err != nil {
return nil, err
Expand Down Expand Up @@ -393,8 +392,6 @@ func (router *ScoreBasedRouter) ConnCount() int {

// Close implements Router.Close interface.
func (router *ScoreBasedRouter) Close() {
router.Lock()
defer router.Unlock()
if router.cancelFunc != nil {
router.cancelFunc()
router.cancelFunc = nil
Expand Down
66 changes: 42 additions & 24 deletions pkg/manager/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/errors"
"github.com/pingcap/TiProxy/lib/util/logger"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
Expand Down Expand Up @@ -694,7 +695,7 @@ func TestRefresh(t *testing.T) {
logger: lg,
backends: list.New(),
}
cfg := NewDefaultHealthCheckConfig()
cfg := config.NewDefaultHealthCheckConfig()
cfg.Interval = time.Minute
observer, err := StartBackendObserver(lg, rt, nil, cfg, fetcher)
require.NoError(t, err)
Expand All @@ -714,11 +715,11 @@ func TestRefresh(t *testing.T) {
m.Unlock()
defer server.close()
// The backends are refreshed very soon.
finally(t, func() bool {
require.Eventually(t, func() bool {
addr, err = selector.Next()
require.NoError(t, err)
return len(addr) > 0
}, 100*time.Millisecond, 3*time.Second)
}, 3*time.Second, 100*time.Millisecond)
}

func TestObserveError(t *testing.T) {
Expand Down Expand Up @@ -754,43 +755,60 @@ func TestObserveError(t *testing.T) {
m.Unlock()
defer server.close()
// The backends are refreshed very soon.
finally(t, func() bool {
require.Eventually(t, func() bool {
selector.Reset()
addr, err = selector.Next()
require.NoError(t, err)
return len(addr) > 0
}, 100*time.Millisecond, 3*time.Second)
}, 3*time.Second, 100*time.Millisecond)
// Mock an observe error.
m.Lock()
observeError = errors.New("mock observe error")
m.Unlock()
finally(t, func() bool {
require.Eventually(t, func() bool {
selector.Reset()
addr, err = selector.Next()
return len(addr) == 0 && err != nil
}, 100*time.Millisecond, 3*time.Second)
}, 3*time.Second, 100*time.Millisecond)
// Clear the observe error.
m.Lock()
observeError = nil
m.Unlock()
finally(t, func() bool {
require.Eventually(t, func() bool {
selector.Reset()
addr, err = selector.Next()
return len(addr) > 0 && err == nil
}, 100*time.Millisecond, 3*time.Second)
}

func finally(t *testing.T, checker func() bool, checkInterval, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-timer.C:
t.Fatal("timeout")
case <-time.After(checkInterval):
if checker() {
return
}
}
}
}, 3*time.Second, 100*time.Millisecond)
}

func TestDisableHealthCheck(t *testing.T) {
backends := []string{"127.0.0.1:4000"}
var m sync.Mutex
fetcher := NewExternalFetcher(func() ([]string, error) {
m.Lock()
defer m.Unlock()
return backends, nil
})
// Create a router with a very short health check interval.
lg := logger.CreateLoggerForTest(t)
rt, err := NewScoreBasedRouter(lg, nil, fetcher, &config.HealthCheck{Enable: false})
require.NoError(t, err)
defer rt.Close()
// No backends and no error.
selector := rt.GetBackendSelector()
// The backends are refreshed very soon.
require.Eventually(t, func() bool {
addr, err := selector.Next()
require.NoError(t, err)
return addr == "127.0.0.1:4000"
}, 3*time.Second, 100*time.Millisecond)
// Replace the backend.
m.Lock()
backends[0] = "127.0.0.1:5000"
m.Unlock()
require.Eventually(t, func() bool {
addr, err := selector.Next()
require.NoError(t, err)
return addr == "127.0.0.1:5000"
}, 3*time.Second, 100*time.Millisecond)
}

0 comments on commit 32e96e6

Please sign in to comment.