diff --git a/pkg/manager/router/backend_observer.go b/pkg/manager/router/backend_observer.go index 5cc1fd82..24ad4bef 100644 --- a/pkg/manager/router/backend_observer.go +++ b/pkg/manager/router/backend_observer.go @@ -122,6 +122,7 @@ type BackendObserver struct { eventReceiver BackendEventReceiver wg waitgroup.WaitGroup cancelFunc context.CancelFunc + refreshChan chan struct{} } // StartBackendObserver creates a BackendObserver and starts watching. @@ -152,6 +153,7 @@ func NewBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver, httpCli: httpCli, httpTLS: httpTLS, eventReceiver: eventReceiver, + refreshChan: make(chan struct{}), } bo.fetcher = backendFetcher return bo, nil @@ -166,6 +168,15 @@ func (bo *BackendObserver) Start() { }) } +// Refresh indicates the observer to refresh immediately. +func (bo *BackendObserver) Refresh() { + // If the observer happens to be refreshing, skip this round. + select { + case bo.refreshChan <- struct{}{}: + default: + } +} + func (bo *BackendObserver) observe(ctx context.Context) { for ctx.Err() == nil { backendInfo := bo.fetcher.GetBackendList(ctx) @@ -176,6 +187,7 @@ func (bo *BackendObserver) observe(ctx context.Context) { bo.notifyIfChanged(backendStatus) select { case <-time.After(bo.config.healthCheckInterval): + case <-bo.refreshChan: case <-ctx.Done(): return } diff --git a/pkg/manager/router/router_score.go b/pkg/manager/router/router_score.go index 2b6091bc..4e9ea244 100644 --- a/pkg/manager/router/router_score.go +++ b/pkg/manager/router/router_score.go @@ -90,6 +90,11 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) string { return backend.addr } } + // No available backends, maybe the health check result is outdated during rolling restart. + // Refresh the backends asynchronously in this case. + if router.observer != nil { + router.observer.Refresh() + } return "" } diff --git a/pkg/manager/router/router_test.go b/pkg/manager/router/router_test.go index c3ae7ab8..0bc034bb 100644 --- a/pkg/manager/router/router_test.go +++ b/pkg/manager/router/router_test.go @@ -668,3 +668,52 @@ func TestConcurrency(t *testing.T) { cancel() router.Close() } + +// Test that the backends are refreshed immediately after it's empty. +func TestRefresh(t *testing.T) { + backends := make([]string, 0) + var m sync.Mutex + fetcher := NewExternalFetcher(func() []string { + m.Lock() + defer m.Unlock() + return backends + }) + // Create a router with a very long health check interval. + lg := logger.CreateLoggerForTest(t) + rt := &ScoreBasedRouter{ + logger: lg, + backends: list.New(), + } + cfg := NewDefaultHealthCheckConfig() + cfg.healthCheckInterval = time.Minute + observer, err := StartBackendObserver(lg, rt, nil, cfg, fetcher) + require.NoError(t, err) + rt.Lock() + rt.observer = observer + rt.Unlock() + defer rt.Close() + // The initial backends are empty. + selector := rt.GetBackendSelector() + addr := selector.Next() + require.True(t, len(addr) == 0) + // Create a new backend and add to the list. + server := newBackendServer(t) + m.Lock() + backends = append(backends, server.sqlAddr) + m.Unlock() + defer server.close() + // The backends are refreshed very soon. + timer := time.NewTimer(3 * time.Second) + defer timer.Stop() + for { + select { + case <-timer.C: + t.Fatal("timeout") + case <-time.After(100 * time.Millisecond): + addr = selector.Next() + if len(addr) > 0 { + return + } + } + } +} diff --git a/pkg/proxy/backend/backend_conn.go b/pkg/proxy/backend/backend_conn.go index 7d083320..8fdcdb10 100644 --- a/pkg/proxy/backend/backend_conn.go +++ b/pkg/proxy/backend/backend_conn.go @@ -23,7 +23,7 @@ import ( ) const ( - DialTimeout = 5 * time.Second + DialTimeout = 2 * time.Second ) type BackendConnection struct {