Skip to content

Commit

Permalink
backend, router: fix errors when closing and redirecting concurrently (
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Sep 2, 2022
1 parent a4b095d commit 8d8a09e
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 119 deletions.
2 changes: 1 addition & 1 deletion pkg/manager/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewNamespaceManager() *NamespaceManager {
}
func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace, client *clientv3.Client) (*Namespace, error) {
logger := mgr.logger.With(zap.String("namespace", cfg.Namespace))
rt, err := router.NewRandomRouter(&cfg.Backend, client)
rt, err := router.NewScoreBasedRouter(&cfg.Backend, client)
if err != nil {
return nil, errors.Errorf("build router error: %w", err)
}
Expand Down
139 changes: 84 additions & 55 deletions pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/zap"
)

// Router routes client connections to backends.
type Router interface {
Route(RedirectableConn) (string, error)
RedirectConnections() error
Expand All @@ -50,45 +51,55 @@ const (
rebalanceMaxScoreRatio = 1.1
)

// ConnEventReceiver receives connection events.
type ConnEventReceiver interface {
OnRedirectSucceed(from, to string, conn RedirectableConn)
OnRedirectFail(from, to string, conn RedirectableConn)
OnConnClosed(addr string, conn RedirectableConn)
}

// RedirectableConn indicates a redirect-able connection.
type RedirectableConn interface {
SetEventReceiver(receiver ConnEventReceiver)
Redirect(addr string)
GetRedirectingAddr() string
ConnectionID() uint64
}

type BackendWrapper struct {
// backendWrapper contains the connections on the backend.
type backendWrapper struct {
status BackendStatus
addr string
// A list of *ConnWrapper and is ordered by the connecting or redirecting time.
// A list of *connWrapper and is ordered by the connecting or redirecting time.
// connList and connMap include moving out connections but not moving in connections.
connList *list.List
connMap map[uint64]*list.Element
}

func (b *BackendWrapper) score() int {
// score calculates the score of the backend. Larger score indicates higher load.
func (b *backendWrapper) score() int {
return b.status.ToScore() + b.connList.Len()
}

type ConnWrapper struct {
// connWrapper wraps RedirectableConn.
type connWrapper struct {
RedirectableConn
phase int
}

type RandomRouter struct {
// ScoreBasedRouter is an implementation of Router interface.
// It routes a connection based on score.
type ScoreBasedRouter struct {
sync.Mutex
observer *BackendObserver
cancelFunc context.CancelFunc
// A list of *BackendWrapper and ordered by the score of the backends.
// A list of *backendWrapper. The backends are in descending order of scores.
backends *list.List
}

func NewRandomRouter(cfg *config.BackendNamespace, client *clientv3.Client) (*RandomRouter, error) {
router := &RandomRouter{
// NewScoreBasedRouter creates a ScoreBasedRouter.
func NewScoreBasedRouter(cfg *config.BackendNamespace, client *clientv3.Client) (*ScoreBasedRouter, error) {
router := &ScoreBasedRouter{
backends: list.New(),
}
router.Lock()
Expand All @@ -104,19 +115,20 @@ func NewRandomRouter(cfg *config.BackendNamespace, client *clientv3.Client) (*Ra
return router, err
}

func (router *RandomRouter) Route(conn RedirectableConn) (string, error) {
// Route implements Router.Route interface.
func (router *ScoreBasedRouter) Route(conn RedirectableConn) (string, error) {
router.Lock()
defer router.Unlock()
be := router.backends.Back()
if be == nil {
return "", ErrNoInstanceToSelect
}
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
switch backend.status {
case StatusCannotConnect, StatusSchemaOutdated:
return "", ErrNoInstanceToSelect
}
connWrapper := &ConnWrapper{
connWrapper := &connWrapper{
RedirectableConn: conn,
phase: phaseNotRedirected,
}
Expand All @@ -125,27 +137,28 @@ func (router *RandomRouter) Route(conn RedirectableConn) (string, error) {
return backend.addr, nil
}

func (router *RandomRouter) removeConn(be *list.Element, ce *list.Element) {
backend := be.Value.(*BackendWrapper)
conn := ce.Value.(*ConnWrapper)
func (router *ScoreBasedRouter) removeConn(be *list.Element, ce *list.Element) {
backend := be.Value.(*backendWrapper)
conn := ce.Value.(*connWrapper)
backend.connList.Remove(ce)
delete(backend.connMap, conn.ConnectionID())
router.adjustBackendList(be)
}

func (router *RandomRouter) addConn(be *list.Element, conn *ConnWrapper) {
backend := be.Value.(*BackendWrapper)
func (router *ScoreBasedRouter) addConn(be *list.Element, conn *connWrapper) {
backend := be.Value.(*backendWrapper)
ce := backend.connList.PushBack(conn)
backend.connMap[conn.ConnectionID()] = ce
router.adjustBackendList(be)
}

func (router *RandomRouter) adjustBackendList(be *list.Element) {
backend := be.Value.(*BackendWrapper)
// adjustBackendList moves `be` after the score of `be` changes to keep the list ordered.
func (router *ScoreBasedRouter) adjustBackendList(be *list.Element) {
backend := be.Value.(*backendWrapper)
curScore := backend.score()
var mark *list.Element
for ele := be.Prev(); ele != nil; ele = ele.Prev() {
b := ele.Value.(*BackendWrapper)
b := ele.Value.(*backendWrapper)
if b.score() >= curScore {
break
}
Expand All @@ -156,7 +169,7 @@ func (router *RandomRouter) adjustBackendList(be *list.Element) {
return
}
for ele := be.Next(); ele != nil; ele = ele.Next() {
b := ele.Value.(*BackendWrapper)
b := ele.Value.(*backendWrapper)
if b.score() <= curScore {
break
}
Expand All @@ -167,14 +180,16 @@ func (router *RandomRouter) adjustBackendList(be *list.Element) {
}
}

func (router *RandomRouter) RedirectConnections() error {
// RedirectConnections implements Router.RedirectConnections interface.
// It redirects all connections compulsively. It's only used for testing.
func (router *ScoreBasedRouter) RedirectConnections() error {
router.Lock()
defer router.Unlock()
for be := router.backends.Front(); be != nil; be = be.Next() {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
for ce := backend.connList.Front(); ce != nil; ce = ce.Next() {
// This is only for test, so we allow it to reconnect to the same backend.
connWrapper := ce.Value.(*ConnWrapper)
connWrapper := ce.Value.(*connWrapper)
if connWrapper.phase != phaseRedirectNotify {
connWrapper.phase = phaseRedirectNotify
connWrapper.Redirect(backend.addr)
Expand All @@ -184,17 +199,18 @@ func (router *RandomRouter) RedirectConnections() error {
return nil
}

func (router *RandomRouter) lookupBackend(addr string, forward bool) *list.Element {
// forward is a hint to speed up searching.
func (router *ScoreBasedRouter) lookupBackend(addr string, forward bool) *list.Element {
if forward {
for be := router.backends.Front(); be != nil; be = be.Next() {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
if backend.addr == addr {
return be
}
}
} else {
for be := router.backends.Back(); be != nil; be = be.Prev() {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
if backend.addr == addr {
return be
}
Expand All @@ -203,36 +219,40 @@ func (router *RandomRouter) lookupBackend(addr string, forward bool) *list.Eleme
return nil
}

func (router *RandomRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) {
// OnRedirectSucceed implements ConnEventReceiver.OnRedirectSucceed interface.
func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
if be == nil {
// impossible here
logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to))
return
}
toBackend := be.Value.(*BackendWrapper)
toBackend := be.Value.(*backendWrapper)
e, ok := toBackend.connMap[conn.ConnectionID()]
if !ok {
// impossible here
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to),
zap.Uint64("conn", conn.ConnectionID()))
return
}
connWrapper := e.Value.(*ConnWrapper)
connWrapper := e.Value.(*connWrapper)
connWrapper.phase = phaseRedirectEnd
}

func (router *RandomRouter) OnRedirectFail(from, to string, conn RedirectableConn) {
// OnRedirectFail implements ConnEventReceiver.OnRedirectFail interface.
func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
if be == nil {
// impossible here
logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to))
return
}
toBackend := be.Value.(*BackendWrapper)
toBackend := be.Value.(*backendWrapper)
ce, ok := toBackend.connMap[conn.ConnectionID()]
if !ok {
// impossible here
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to),
zap.Uint64("conn", conn.ConnectionID()))
return
}
router.removeConn(be, ce)
Expand All @@ -242,46 +262,54 @@ func (router *RandomRouter) OnRedirectFail(from, to string, conn RedirectableCon
if be == nil {
return
}
connWrapper := ce.Value.(*ConnWrapper)
connWrapper := ce.Value.(*connWrapper)
connWrapper.phase = phaseRedirectFail
router.addConn(be, connWrapper)
}

func (router *RandomRouter) OnConnClosed(addr string, conn RedirectableConn) {
connID := conn.ConnectionID()
// OnConnClosed implements ConnEventReceiver.OnConnClosed interface.
func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
// Get the redirecting address in the lock, rather than letting the connection pass it in.
// While the connection closes, the router may also send a new redirection signal concurrently
// and move it to another backendWrapper.
if toAddr := conn.GetRedirectingAddr(); len(toAddr) > 0 {
addr = toAddr
}
be := router.lookupBackend(addr, true)
if be != nil {
// impossible here
if be == nil {
logutil.BgLogger().Error("backend not found in the router", zap.String("addr", addr))
return
}
backend := be.Value.(*BackendWrapper)
ce, ok := backend.connMap[connID]
backend := be.Value.(*backendWrapper)
ce, ok := backend.connMap[conn.ConnectionID()]
if !ok {
// impossible here
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", addr),
zap.Uint64("conn", conn.ConnectionID()))
return
}
router.removeConn(be, ce)
router.removeBackendIfEmpty(be)
}

func (router *RandomRouter) OnBackendChanged(backends map[string]BackendStatus) {
// OnBackendChanged implements BackendEventReceiver.OnBackendChanged interface.
func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStatus) {
router.Lock()
defer router.Unlock()
for addr, status := range backends {
be := router.lookupBackend(addr, true)
if be == nil {
logutil.BgLogger().Info("find new backend", zap.String("url", addr),
zap.String("status", status.String()))
be = router.backends.PushBack(&BackendWrapper{
be = router.backends.PushBack(&backendWrapper{
status: status,
addr: addr,
connList: list.New(),
connMap: make(map[uint64]*list.Element),
})
} else {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
logutil.BgLogger().Info("update backend", zap.String("url", addr),
zap.String("prev_status", backend.status.String()), zap.String("cur_status", status.String()))
backend.status = status
Expand All @@ -291,7 +319,7 @@ func (router *RandomRouter) OnBackendChanged(backends map[string]BackendStatus)
}
}

func (router *RandomRouter) rebalanceLoop(ctx context.Context) {
func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) {
for {
router.rebalance(rebalanceConnsPerLoop)
select {
Expand All @@ -302,13 +330,13 @@ func (router *RandomRouter) rebalanceLoop(ctx context.Context) {
}
}

func (router *RandomRouter) rebalance(maxNum int) {
func (router *ScoreBasedRouter) rebalance(maxNum int) {
router.Lock()
defer router.Unlock()
for i := 0; i < maxNum; i++ {
var busiestEle *list.Element
for be := router.backends.Front(); be != nil; be = be.Next() {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
if backend.connList.Len() > 0 {
busiestEle = be
break
Expand All @@ -317,29 +345,30 @@ func (router *RandomRouter) rebalance(maxNum int) {
if busiestEle == nil {
break
}
busiestBackend := busiestEle.Value.(*BackendWrapper)
busiestBackend := busiestEle.Value.(*backendWrapper)
idlestEle := router.backends.Back()
idlestBackend := idlestEle.Value.(*BackendWrapper)
idlestBackend := idlestEle.Value.(*backendWrapper)
if float64(busiestBackend.score())/float64(idlestBackend.score()+1) <= rebalanceMaxScoreRatio {
break
}
ce := busiestBackend.connList.Front()
router.removeConn(busiestEle, ce)
conn := ce.Value.(*ConnWrapper)
conn := ce.Value.(*connWrapper)
conn.phase = phaseRedirectNotify
router.addConn(idlestEle, conn)
conn.Redirect(idlestBackend.addr)
}
}

func (router *RandomRouter) removeBackendIfEmpty(be *list.Element) {
backend := be.Value.(*BackendWrapper)
func (router *ScoreBasedRouter) removeBackendIfEmpty(be *list.Element) {
backend := be.Value.(*backendWrapper)
if backend.status == StatusCannotConnect && backend.connList.Len() == 0 {
router.backends.Remove(be)
}
}

func (router *RandomRouter) Close() {
// Close implements Router.Close interface.
func (router *ScoreBasedRouter) Close() {
router.Lock()
defer router.Unlock()
if router.cancelFunc != nil {
Expand Down
Loading

0 comments on commit 8d8a09e

Please sign in to comment.