Skip to content

Commit

Permalink
router: refresh backends when the backends are empty (pingcap#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored and xhebox committed Mar 7, 2023
1 parent 6bf840b commit c4ec5e9
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
12 changes: 12 additions & 0 deletions pkg/manager/router/backend_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type BackendObserver struct {
eventReceiver BackendEventReceiver
wg waitgroup.WaitGroup
cancelFunc context.CancelFunc
refreshChan chan struct{}
}

// StartBackendObserver creates a BackendObserver and starts watching.
Expand Down Expand Up @@ -152,6 +153,7 @@ func NewBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver,
httpCli: httpCli,
httpTLS: httpTLS,
eventReceiver: eventReceiver,
refreshChan: make(chan struct{}),
}
bo.fetcher = backendFetcher
return bo, nil
Expand All @@ -166,6 +168,15 @@ func (bo *BackendObserver) Start() {
})
}

// Refresh indicates the observer to refresh immediately.
func (bo *BackendObserver) Refresh() {
// If the observer happens to be refreshing, skip this round.
select {
case bo.refreshChan <- struct{}{}:
default:
}
}

func (bo *BackendObserver) observe(ctx context.Context) {
for ctx.Err() == nil {
backendInfo := bo.fetcher.GetBackendList(ctx)
Expand All @@ -176,6 +187,7 @@ func (bo *BackendObserver) observe(ctx context.Context) {
bo.notifyIfChanged(backendStatus)
select {
case <-time.After(bo.config.healthCheckInterval):
case <-bo.refreshChan:
case <-ctx.Done():
return
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/manager/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) string {
return backend.addr
}
}
// No available backends, maybe the health check result is outdated during rolling restart.
// Refresh the backends asynchronously in this case.
if router.observer != nil {
router.observer.Refresh()
}
return ""
}

Expand Down
49 changes: 49 additions & 0 deletions pkg/manager/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,3 +668,52 @@ func TestConcurrency(t *testing.T) {
cancel()
router.Close()
}

// Test that the backends are refreshed immediately after it's empty.
func TestRefresh(t *testing.T) {
backends := make([]string, 0)
var m sync.Mutex
fetcher := NewExternalFetcher(func() []string {
m.Lock()
defer m.Unlock()
return backends
})
// Create a router with a very long health check interval.
lg := logger.CreateLoggerForTest(t)
rt := &ScoreBasedRouter{
logger: lg,
backends: list.New(),
}
cfg := NewDefaultHealthCheckConfig()
cfg.healthCheckInterval = time.Minute
observer, err := StartBackendObserver(lg, rt, nil, cfg, fetcher)
require.NoError(t, err)
rt.Lock()
rt.observer = observer
rt.Unlock()
defer rt.Close()
// The initial backends are empty.
selector := rt.GetBackendSelector()
addr := selector.Next()
require.True(t, len(addr) == 0)
// Create a new backend and add to the list.
server := newBackendServer(t)
m.Lock()
backends = append(backends, server.sqlAddr)
m.Unlock()
defer server.close()
// The backends are refreshed very soon.
timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
for {
select {
case <-timer.C:
t.Fatal("timeout")
case <-time.After(100 * time.Millisecond):
addr = selector.Next()
if len(addr) > 0 {
return
}
}
}
}
2 changes: 1 addition & 1 deletion pkg/proxy/backend/backend_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

const (
DialTimeout = 5 * time.Second
DialTimeout = 2 * time.Second
)

type BackendConnection struct {
Expand Down

0 comments on commit c4ec5e9

Please sign in to comment.