Skip to content

Commit

Permalink
router: add tests for router (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Sep 5, 2022
1 parent ddf7bfb commit c4d3fa9
Show file tree
Hide file tree
Showing 4 changed files with 695 additions and 49 deletions.
124 changes: 84 additions & 40 deletions pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package router
import (
"container/list"
"context"
"errors"
"sync"
"time"

"github.com/pingcap/TiProxy/pkg/config"
"github.com/pingcap/TiProxy/pkg/util/errors"
"github.com/pingcap/TiProxy/pkg/util/waitgroup"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand All @@ -38,24 +39,38 @@ var (
ErrNoInstanceToSelect = errors.New("no instances to route")
)

type connPhase int

const (
phaseNotRedirected int = iota
// The session is never redirected.
phaseNotRedirected connPhase = iota
// The session is redirecting.
phaseRedirectNotify
// The session redirected successfully last time.
phaseRedirectEnd
// The session failed to redirect last time.
phaseRedirectFail
)

const (
rebalanceInterval = 10 * time.Millisecond
rebalanceConnsPerLoop = 10
rebalanceMaxScoreRatio = 1.1
// The interval to rebalance connections.
rebalanceInterval = 10 * time.Millisecond
// The number of connections to rebalance during each interval.
// Limit the number to avoid creating too many connections suddenly on a backend.
rebalanceConnsPerLoop = 10
// The threshold of ratio of the highest score and lowest score.
// If the ratio exceeds the threshold, the proxy will rebalance connections.
rebalanceMaxScoreRatio = 1.2
// After a connection fails to redirect, it may contain some unmigratable status.
// Limit its redirection interval to avoid unnecessary retrial to reduce latency jitter.
redirectFailMinInterval = 3 * time.Second
)

// ConnEventReceiver receives connection events.
type ConnEventReceiver interface {
OnRedirectSucceed(from, to string, conn RedirectableConn)
OnRedirectFail(from, to string, conn RedirectableConn)
OnConnClosed(addr string, conn RedirectableConn)
OnRedirectSucceed(from, to string, conn RedirectableConn) error
OnRedirectFail(from, to string, conn RedirectableConn) error
OnConnClosed(addr string, conn RedirectableConn) error
}

// RedirectableConn indicates a redirect-able connection.
Expand Down Expand Up @@ -84,7 +99,9 @@ func (b *backendWrapper) score() int {
// connWrapper wraps RedirectableConn.
type connWrapper struct {
RedirectableConn
phase int
phase connPhase
// Last redirect start time of this connection.
lastRedirect time.Time
}

// ScoreBasedRouter is an implementation of Router interface.
Expand All @@ -93,6 +110,7 @@ type ScoreBasedRouter struct {
sync.Mutex
observer *BackendObserver
cancelFunc context.CancelFunc
wg waitgroup.WaitGroup
// A list of *backendWrapper. The backends are in descending order of scores.
backends *list.List
}
Expand All @@ -111,7 +129,9 @@ func NewScoreBasedRouter(cfg *config.BackendNamespace, client *clientv3.Client)
router.observer = observer
childCtx, cancelFunc := context.WithCancel(context.Background())
router.cancelFunc = cancelFunc
go router.rebalanceLoop(childCtx)
router.wg.Run(func() {
router.rebalanceLoop(childCtx)
})
return router, err
}

Expand Down Expand Up @@ -142,7 +162,9 @@ func (router *ScoreBasedRouter) removeConn(be *list.Element, ce *list.Element) {
conn := ce.Value.(*connWrapper)
backend.connList.Remove(ce)
delete(backend.connMap, conn.ConnectionID())
router.adjustBackendList(be)
if !router.removeBackendIfEmpty(be) {
router.adjustBackendList(be)
}
}

func (router *ScoreBasedRouter) addConn(be *list.Element, conn *connWrapper) {
Expand Down Expand Up @@ -220,55 +242,56 @@ func (router *ScoreBasedRouter) lookupBackend(addr string, forward bool) *list.E
}

// OnRedirectSucceed implements ConnEventReceiver.OnRedirectSucceed interface.
func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) {
func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) error {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
if be == nil {
logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to))
return
return errors.WithStack(errors.Errorf("backend %s is not found in the router", to))
}
toBackend := be.Value.(*backendWrapper)
e, ok := toBackend.connMap[conn.ConnectionID()]
if !ok {
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to),
zap.Uint64("conn", conn.ConnectionID()))
return
return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), to))
}
connWrapper := e.Value.(*connWrapper)
connWrapper.phase = phaseRedirectEnd
return nil
}

// OnRedirectFail implements ConnEventReceiver.OnRedirectFail interface.
func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn RedirectableConn) {
func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn RedirectableConn) error {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
if be == nil {
logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to))
return
return errors.WithStack(errors.Errorf("backend %s is not found in the router", to))
}
toBackend := be.Value.(*backendWrapper)
ce, ok := toBackend.connMap[conn.ConnectionID()]
if !ok {
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to),
zap.Uint64("conn", conn.ConnectionID()))
return
return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), to))
}
router.removeConn(be, ce)

be = router.lookupBackend(from, true)
// If the backend has already been removed, the connection is discarded from the router.
// The backend may have been removed because it's empty. Add it back.
if be == nil {
return
be = router.backends.PushBack(&backendWrapper{
status: StatusCannotConnect,
addr: from,
connList: list.New(),
connMap: make(map[uint64]*list.Element),
})
}
connWrapper := ce.Value.(*connWrapper)
connWrapper.phase = phaseRedirectFail
router.addConn(be, connWrapper)
return nil
}

// OnConnClosed implements ConnEventReceiver.OnConnClosed interface.
func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) {
func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) error {
router.Lock()
defer router.Unlock()
// Get the redirecting address in the lock, rather than letting the connection pass it in.
Expand All @@ -279,18 +302,15 @@ func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn)
}
be := router.lookupBackend(addr, true)
if be == nil {
logutil.BgLogger().Error("backend not found in the router", zap.String("addr", addr))
return
return errors.WithStack(errors.Errorf("backend %s is not found in the router", addr))
}
backend := be.Value.(*backendWrapper)
ce, ok := backend.connMap[conn.ConnectionID()]
if !ok {
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", addr),
zap.Uint64("conn", conn.ConnectionID()))
return
return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), addr))
}
router.removeConn(be, ce)
router.removeBackendIfEmpty(be)
return nil
}

// OnBackendChanged implements BackendEventReceiver.OnBackendChanged interface.
Expand All @@ -299,8 +319,8 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStat
defer router.Unlock()
for addr, status := range backends {
be := router.lookupBackend(addr, true)
if be == nil {
logutil.BgLogger().Info("find new backend", zap.String("url", addr),
if be == nil && status != StatusCannotConnect {
logutil.BgLogger().Info("find new backend", zap.String("addr", addr),
zap.String("status", status.String()))
be = router.backends.PushBack(&backendWrapper{
status: status,
Expand All @@ -310,12 +330,13 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStat
})
} else {
backend := be.Value.(*backendWrapper)
logutil.BgLogger().Info("update backend", zap.String("url", addr),
logutil.BgLogger().Info("update backend", zap.String("addr", addr),
zap.String("prev_status", backend.status.String()), zap.String("cur_status", status.String()))
backend.status = status
}
router.adjustBackendList(be)
router.removeBackendIfEmpty(be)
if !router.removeBackendIfEmpty(be) {
router.adjustBackendList(be)
}
}
}

Expand All @@ -331,6 +352,7 @@ func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) {
}

func (router *ScoreBasedRouter) rebalance(maxNum int) {
curTime := time.Now()
router.Lock()
defer router.Unlock()
for i := 0; i < maxNum; i++ {
Expand All @@ -348,23 +370,44 @@ func (router *ScoreBasedRouter) rebalance(maxNum int) {
busiestBackend := busiestEle.Value.(*backendWrapper)
idlestEle := router.backends.Back()
idlestBackend := idlestEle.Value.(*backendWrapper)
if float64(busiestBackend.score())/float64(idlestBackend.score()+1) <= rebalanceMaxScoreRatio {
if float64(busiestBackend.score())/float64(idlestBackend.score()+1) < rebalanceMaxScoreRatio {
break
}
var ce *list.Element
for ele := busiestBackend.connList.Front(); ele != nil; ele = ele.Next() {
conn := ele.Value.(*connWrapper)
switch conn.phase {
case phaseRedirectNotify:
// A connection cannot be redirected again when it has not finished redirecting.
continue
case phaseRedirectFail:
// If it failed recently, it will probably fail this time.
if conn.lastRedirect.Add(redirectFailMinInterval).After(curTime) {
continue
}
}
ce = ele
break
}
if ce == nil {
break
}
ce := busiestBackend.connList.Front()
router.removeConn(busiestEle, ce)
conn := ce.Value.(*connWrapper)
conn.phase = phaseRedirectNotify
conn.lastRedirect = curTime
router.addConn(idlestEle, conn)
conn.Redirect(idlestBackend.addr)
}
}

func (router *ScoreBasedRouter) removeBackendIfEmpty(be *list.Element) {
func (router *ScoreBasedRouter) removeBackendIfEmpty(be *list.Element) bool {
backend := be.Value.(*backendWrapper)
if backend.status == StatusCannotConnect && backend.connList.Len() == 0 {
router.backends.Remove(be)
return true
}
return false
}

// Close implements Router.Close interface.
Expand All @@ -379,5 +422,6 @@ func (router *ScoreBasedRouter) Close() {
router.observer.Close()
router.observer = nil
}
router.wg.Wait()
// Router only refers to RedirectableConn, it doesn't manage RedirectableConn.
}
Loading

0 comments on commit c4d3fa9

Please sign in to comment.