Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router: add tests for router #74

Merged
merged 4 commits into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 84 additions & 40 deletions pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package router
import (
"container/list"
"context"
"errors"
"sync"
"time"

"github.com/pingcap/TiProxy/pkg/config"
"github.com/pingcap/TiProxy/pkg/util/errors"
"github.com/pingcap/TiProxy/pkg/util/waitgroup"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand All @@ -38,24 +39,38 @@ var (
ErrNoInstanceToSelect = errors.New("no instances to route")
)

type connPhase int

const (
phaseNotRedirected int = iota
// The session is never redirected.
phaseNotRedirected connPhase = iota
// The session is redirecting.
phaseRedirectNotify
// The session redirected successfully last time.
phaseRedirectEnd
// The session failed to redirect last time.
phaseRedirectFail
)

const (
rebalanceInterval = 10 * time.Millisecond
rebalanceConnsPerLoop = 10
rebalanceMaxScoreRatio = 1.1
// The interval to rebalance connections.
rebalanceInterval = 10 * time.Millisecond
// The number of connections to rebalance during each interval.
// Limit the number to avoid creating too many connections suddenly on a backend.
rebalanceConnsPerLoop = 10
// The threshold of ratio of the highest score and lowest score.
// If the ratio exceeds the threshold, the proxy will rebalance connections.
rebalanceMaxScoreRatio = 1.2
// After a connection fails to redirect, it may contain some unmigratable status.
// Limit its redirection interval to avoid unnecessary retrial to reduce latency jitter.
redirectFailMinInterval = 3 * time.Second
)

// ConnEventReceiver receives connection events.
type ConnEventReceiver interface {
OnRedirectSucceed(from, to string, conn RedirectableConn)
OnRedirectFail(from, to string, conn RedirectableConn)
OnConnClosed(addr string, conn RedirectableConn)
OnRedirectSucceed(from, to string, conn RedirectableConn) error
OnRedirectFail(from, to string, conn RedirectableConn) error
OnConnClosed(addr string, conn RedirectableConn) error
}

// RedirectableConn indicates a redirect-able connection.
Expand Down Expand Up @@ -84,7 +99,9 @@ func (b *backendWrapper) score() int {
// connWrapper wraps RedirectableConn.
type connWrapper struct {
RedirectableConn
phase int
phase connPhase
// Last redirect start time of this connection.
lastRedirect time.Time
}

// ScoreBasedRouter is an implementation of Router interface.
Expand All @@ -93,6 +110,7 @@ type ScoreBasedRouter struct {
sync.Mutex
observer *BackendObserver
cancelFunc context.CancelFunc
wg waitgroup.WaitGroup
// A list of *backendWrapper. The backends are in descending order of scores.
backends *list.List
}
Expand All @@ -111,7 +129,9 @@ func NewScoreBasedRouter(cfg *config.BackendNamespace, client *clientv3.Client)
router.observer = observer
childCtx, cancelFunc := context.WithCancel(context.Background())
router.cancelFunc = cancelFunc
go router.rebalanceLoop(childCtx)
router.wg.Run(func() {
router.rebalanceLoop(childCtx)
})
return router, err
}

Expand Down Expand Up @@ -142,7 +162,9 @@ func (router *ScoreBasedRouter) removeConn(be *list.Element, ce *list.Element) {
conn := ce.Value.(*connWrapper)
backend.connList.Remove(ce)
delete(backend.connMap, conn.ConnectionID())
router.adjustBackendList(be)
if !router.removeBackendIfEmpty(be) {
router.adjustBackendList(be)
}
}

func (router *ScoreBasedRouter) addConn(be *list.Element, conn *connWrapper) {
Expand Down Expand Up @@ -220,55 +242,56 @@ func (router *ScoreBasedRouter) lookupBackend(addr string, forward bool) *list.E
}

// OnRedirectSucceed implements ConnEventReceiver.OnRedirectSucceed interface.
func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) {
func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) error {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
if be == nil {
logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to))
return
return errors.WithStack(errors.Errorf("backend %s is not found in the router", to))
}
toBackend := be.Value.(*backendWrapper)
e, ok := toBackend.connMap[conn.ConnectionID()]
if !ok {
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to),
zap.Uint64("conn", conn.ConnectionID()))
return
return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), to))
}
connWrapper := e.Value.(*connWrapper)
connWrapper.phase = phaseRedirectEnd
return nil
}

// OnRedirectFail implements ConnEventReceiver.OnRedirectFail interface.
func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn RedirectableConn) {
func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn RedirectableConn) error {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
if be == nil {
logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to))
return
return errors.WithStack(errors.Errorf("backend %s is not found in the router", to))
}
toBackend := be.Value.(*backendWrapper)
ce, ok := toBackend.connMap[conn.ConnectionID()]
if !ok {
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to),
zap.Uint64("conn", conn.ConnectionID()))
return
return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), to))
}
router.removeConn(be, ce)

be = router.lookupBackend(from, true)
// If the backend has already been removed, the connection is discarded from the router.
// The backend may have been removed because it's empty. Add it back.
if be == nil {
return
be = router.backends.PushBack(&backendWrapper{
status: StatusCannotConnect,
addr: from,
connList: list.New(),
connMap: make(map[uint64]*list.Element),
})
}
connWrapper := ce.Value.(*connWrapper)
connWrapper.phase = phaseRedirectFail
router.addConn(be, connWrapper)
return nil
}

// OnConnClosed implements ConnEventReceiver.OnConnClosed interface.
func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) {
func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) error {
router.Lock()
defer router.Unlock()
// Get the redirecting address in the lock, rather than letting the connection pass it in.
Expand All @@ -279,18 +302,15 @@ func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn)
}
be := router.lookupBackend(addr, true)
if be == nil {
logutil.BgLogger().Error("backend not found in the router", zap.String("addr", addr))
return
return errors.WithStack(errors.Errorf("backend %s is not found in the router", addr))
}
backend := be.Value.(*backendWrapper)
ce, ok := backend.connMap[conn.ConnectionID()]
if !ok {
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", addr),
zap.Uint64("conn", conn.ConnectionID()))
return
return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), addr))
}
router.removeConn(be, ce)
router.removeBackendIfEmpty(be)
return nil
}

// OnBackendChanged implements BackendEventReceiver.OnBackendChanged interface.
Expand All @@ -299,8 +319,8 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStat
defer router.Unlock()
for addr, status := range backends {
be := router.lookupBackend(addr, true)
if be == nil {
logutil.BgLogger().Info("find new backend", zap.String("url", addr),
if be == nil && status != StatusCannotConnect {
logutil.BgLogger().Info("find new backend", zap.String("addr", addr),
zap.String("status", status.String()))
be = router.backends.PushBack(&backendWrapper{
status: status,
Expand All @@ -310,12 +330,13 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStat
})
} else {
backend := be.Value.(*backendWrapper)
logutil.BgLogger().Info("update backend", zap.String("url", addr),
logutil.BgLogger().Info("update backend", zap.String("addr", addr),
zap.String("prev_status", backend.status.String()), zap.String("cur_status", status.String()))
backend.status = status
}
router.adjustBackendList(be)
router.removeBackendIfEmpty(be)
if !router.removeBackendIfEmpty(be) {
router.adjustBackendList(be)
}
}
}

Expand All @@ -331,6 +352,7 @@ func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) {
}

func (router *ScoreBasedRouter) rebalance(maxNum int) {
curTime := time.Now()
router.Lock()
defer router.Unlock()
for i := 0; i < maxNum; i++ {
Expand All @@ -348,23 +370,44 @@ func (router *ScoreBasedRouter) rebalance(maxNum int) {
busiestBackend := busiestEle.Value.(*backendWrapper)
idlestEle := router.backends.Back()
idlestBackend := idlestEle.Value.(*backendWrapper)
if float64(busiestBackend.score())/float64(idlestBackend.score()+1) <= rebalanceMaxScoreRatio {
if float64(busiestBackend.score())/float64(idlestBackend.score()+1) < rebalanceMaxScoreRatio {
break
}
var ce *list.Element
for ele := busiestBackend.connList.Front(); ele != nil; ele = ele.Next() {
conn := ele.Value.(*connWrapper)
switch conn.phase {
case phaseRedirectNotify:
// A connection cannot be redirected again when it has not finished redirecting.
continue
case phaseRedirectFail:
// If it failed recently, it will probably fail this time.
if conn.lastRedirect.Add(redirectFailMinInterval).After(curTime) {
continue
}
}
ce = ele
break
}
if ce == nil {
break
}
ce := busiestBackend.connList.Front()
router.removeConn(busiestEle, ce)
conn := ce.Value.(*connWrapper)
conn.phase = phaseRedirectNotify
conn.lastRedirect = curTime
router.addConn(idlestEle, conn)
conn.Redirect(idlestBackend.addr)
}
}

func (router *ScoreBasedRouter) removeBackendIfEmpty(be *list.Element) {
func (router *ScoreBasedRouter) removeBackendIfEmpty(be *list.Element) bool {
backend := be.Value.(*backendWrapper)
if backend.status == StatusCannotConnect && backend.connList.Len() == 0 {
router.backends.Remove(be)
return true
}
return false
}

// Close implements Router.Close interface.
Expand All @@ -379,5 +422,6 @@ func (router *ScoreBasedRouter) Close() {
router.observer.Close()
router.observer = nil
}
router.wg.Wait()
// Router only refers to RedirectableConn, it doesn't manage RedirectableConn.
}
Loading