Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router, backend: retry connecting to different backends #171

Merged
merged 4 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions pkg/manager/router/backend_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package router

type BackendSelector struct {
excluded []string
cur string
routeOnce func(excluded []string) string
addConn func(addr string, conn RedirectableConn) error
}

func (bs *BackendSelector) Reset() {
xhebox marked this conversation as resolved.
Show resolved Hide resolved
bs.excluded = bs.excluded[:0]
}

func (bs *BackendSelector) Next() string {
if len(bs.cur) > 0 {
bs.excluded = append(bs.excluded, bs.cur)
}
bs.cur = bs.routeOnce(bs.excluded)
return bs.cur
}

func (bs *BackendSelector) Succeed(conn RedirectableConn) error {
return bs.addConn(bs.cur, conn)
}
2 changes: 1 addition & 1 deletion pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Router interface {
// Router will handle connection events to balance connections if possible.
ConnEventReceiver

Route(RedirectableConn) (string, error)
GetBackendSelector() BackendSelector
RedirectConnections() error
ConnCount() int
Close()
Expand Down
51 changes: 39 additions & 12 deletions pkg/manager/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,54 @@ func NewScoreBasedRouter(logger *zap.Logger, httpCli *http.Client, fetcher Backe
return router, nil
}

// Route implements Router.Route interface.
func (router *ScoreBasedRouter) Route(conn RedirectableConn) (string, error) {
// GetBackendSelector implements Router.GetBackendSelector interface.
func (router *ScoreBasedRouter) GetBackendSelector() BackendSelector {
return BackendSelector{
routeOnce: router.routeOnce,
addConn: router.addNewConn,
}
}

func (router *ScoreBasedRouter) routeOnce(excluded []string) string {
router.Lock()
defer router.Unlock()
be := router.backends.Back()
if be == nil {
return "", ErrNoInstanceToSelect
}
backend := be.Value.(*backendWrapper)
switch backend.status {
case StatusCannotConnect, StatusSchemaOutdated:
return "", ErrNoInstanceToSelect
for be := router.backends.Back(); be != nil; be = be.Prev() {
backend := be.Value.(*backendWrapper)
// These backends may be recycled, so we should not connect to them again.
switch backend.status {
case StatusCannotConnect, StatusSchemaOutdated:
continue
}
found := false
for _, ex := range excluded {
if ex == backend.addr {
found = true
break
}
}
if !found {
return backend.addr
}
}
return ""
}

func (router *ScoreBasedRouter) addNewConn(addr string, conn RedirectableConn) error {
connWrapper := &connWrapper{
RedirectableConn: conn,
phase: phaseNotRedirected,
}
router.Lock()
be := router.lookupBackend(addr, true)
if be == nil {
router.Unlock()
return errors.WithStack(errors.Errorf("backend %s is not found in the router", addr))
}
router.addConn(be, connWrapper)
addBackendConnMetrics(backend.addr)
router.Unlock()
addBackendConnMetrics(addr)
conn.SetEventReceiver(router)
return backend.addr, nil
return nil
}

func (router *ScoreBasedRouter) removeConn(be *list.Element, ce *list.Element) {
Expand Down
29 changes: 24 additions & 5 deletions pkg/manager/router/router_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,43 @@ package router
var _ Router = &StaticRouter{}

type StaticRouter struct {
addr string
addr []string
cnt int
}

func NewStaticRouter(addr string) *StaticRouter {
func NewStaticRouter(addr []string) *StaticRouter {
return &StaticRouter{addr: addr}
}

func (r *StaticRouter) Route(c RedirectableConn) (string, error) {
return r.addr, nil
func (r *StaticRouter) GetBackendSelector() BackendSelector {
return BackendSelector{
routeOnce: func(excluded []string) string {
for _, addr := range r.addr {
found := false
for _, e := range excluded {
if e == addr {
found = true
break
}
}
if !found {
return addr
}
}
return ""
},
addConn: func(addr string, conn RedirectableConn) error {
r.cnt++
return nil
},
}
}

func (r *StaticRouter) RedirectConnections() error {
return nil
}

func (r *StaticRouter) ConnCount() int {
r.cnt++
return r.cnt
}

Expand Down
66 changes: 57 additions & 9 deletions pkg/manager/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,21 @@ func (tester *routerTester) checkBackendOrder() {
}
}

func (tester *routerTester) simpleRoute(conn RedirectableConn) string {
selector := tester.router.GetBackendSelector()
addr := selector.Next()
if len(addr) > 0 {
err := selector.Succeed(conn)
require.NoError(tester.t, err)
}
return addr
}

func (tester *routerTester) addConnections(num int) {
for i := 0; i < num; i++ {
conn := tester.createConn()
addr, err := tester.router.Route(conn)
require.NoError(tester.t, err)
addr := tester.simpleRoute(conn)
require.True(tester.t, len(addr) > 0)
conn.from = addr
tester.conns[conn.connID] = conn
}
Expand Down Expand Up @@ -355,13 +365,49 @@ func TestConnBalanced(t *testing.T) {
func TestNoBackends(t *testing.T) {
tester := newRouterTester(t)
conn := tester.createConn()
_, err := tester.router.Route(conn)
require.ErrorIs(t, err, ErrNoInstanceToSelect)
addr := tester.simpleRoute(conn)
require.True(t, len(addr) == 0)
tester.addBackends(1)
tester.addConnections(10)
tester.killBackends(1)
_, err = tester.router.Route(conn)
require.ErrorIs(t, err, ErrNoInstanceToSelect)
addr = tester.simpleRoute(conn)
require.True(t, len(addr) == 0)
}

// Test that the backends returned by the BackendSelector are complete and different.
func TestSelectorReturnOrder(t *testing.T) {
tester := newRouterTester(t)
tester.addBackends(3)
selector := tester.router.GetBackendSelector()
for i := 0; i < 3; i++ {
addrs := make(map[string]struct{}, 3)
for j := 0; j < 3; j++ {
addr := selector.Next()
addrs[addr] = struct{}{}
}
// All 3 addresses are different.
require.Equal(t, 3, len(addrs))
addr := selector.Next()
require.True(t, len(addr) == 0)
selector.Reset()
}

tester.killBackends(1)
for i := 0; i < 2; i++ {
addr := selector.Next()
require.True(t, len(addr) > 0)
}
addr := selector.Next()
require.True(t, len(addr) == 0)
selector.Reset()

tester.addBackends(1)
for i := 0; i < 3; i++ {
addr := selector.Next()
require.True(t, len(addr) > 0)
}
addr = selector.Next()
require.True(t, len(addr) == 0)
}

// Test that the backends are balanced during rolling restart.
Expand Down Expand Up @@ -578,12 +624,14 @@ func TestConcurrency(t *testing.T) {
t: t,
connID: connID,
}
addr, err := router.Route(conn)
if err != nil {
require.ErrorIs(t, err, ErrNoInstanceToSelect)
selector := router.GetBackendSelector()
addr := selector.Next()
if len(addr) == 0 {
conn = nil
continue
}
err = selector.Succeed(conn)
require.NoError(t, err)
conn.from = addr
} else if len(conn.GetRedirectingAddr()) > 0 {
// redirecting, 70% success, 20% fail, 10% close
Expand Down
5 changes: 3 additions & 2 deletions pkg/proxy/backend/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"sync"
"time"

"github.com/pingcap/TiProxy/lib/util/errors"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (auth *Authenticator) verifyBackendCaps(logger *zap.Logger, backendCapabili
return nil
}

type backendIOGetter func(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp) (*pnet.PacketIO, error)
type backendIOGetter func(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp, timeout time.Duration) (*pnet.PacketIO, error)

func (auth *Authenticator) handshakeFirstTime(logger *zap.Logger, clientIO *pnet.PacketIO, handshakeHandler HandshakeHandler,
getBackendIO backendIOGetter, frontendTLSConfig, backendTLSConfig *tls.Config) error {
Expand Down Expand Up @@ -160,7 +161,7 @@ func (auth *Authenticator) handshakeFirstTime(logger *zap.Logger, clientIO *pnet
auth.attrs = resp.Attrs

// In case of testing, backendIO is passed manually that we don't want to bother with the routing logic.
backendIO, err := getBackendIO(auth, auth, resp)
backendIO, err := getBackendIO(auth, auth, resp, 5*time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a constant, DialTimeout in backend_conn.go.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's different. The one passed to getBackendIO is the total timeout for all retries.
BTW, DialTimeout is also 5 seconds. Maybe it should be less?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both.I think that DialTimeout should be less, but backoff duration should be larger, e.g. three or four times of DialTimeout.

if err != nil {
return err
}
Expand Down
53 changes: 31 additions & 22 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,40 +153,49 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO *pnet.Packe
return nil
}

func (mgr *BackendConnManager) getBackendIO(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp) (*pnet.PacketIO, error) {
func (mgr *BackendConnManager) getBackendIO(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp, timeout time.Duration) (*pnet.PacketIO, error) {
r, err := mgr.handshakeHandler.GetRouter(auth, resp)
if err != nil {
return nil, err
}
// wait for initialize
bctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
addr, err := backoff.RetryNotifyWithData(
func() (string, error) {
addr, err := r.Route(mgr)
if !errors.Is(err, router.ErrNoInstanceToSelect) {
return addr, backoff.Permanent(err)
// Reasons to wait:
// - The TiDB instances may not be initialized yet
// - One TiDB may be just shut down and another is just started but not ready yet
bctx, cancel := context.WithTimeout(context.Background(), timeout)
selector := r.GetBackendSelector()
io, err := backoff.RetryNotifyWithData(
func() (*pnet.PacketIO, error) {
// Try to connect to all backup backends one by one.
selector.Reset()
for {
addr := selector.Next()
if len(addr) == 0 {
return nil, router.ErrNoInstanceToSelect
}
backendConn := NewBackendConnection(addr)
err := backendConn.Connect()
mgr.handshakeHandler.OnHandshake(auth, addr, err)
if err == nil {
if err = selector.Succeed(mgr); err == nil {
mgr.logger.Info("connected to backend", zap.String("addr", addr))
mgr.backendConn = backendConn
auth.serverAddr = addr
return mgr.backendConn.PacketIO(), nil
}
// Bad luck: the backend has been recycled or shut down just after the selector returns it.
if ignoredErr := backendConn.Close(); ignoredErr != nil {
mgr.logger.Error("close backend connection failed", zap.String("addr", addr), zap.Error(ignoredErr))
}
}
}
return addr, err
},
backoff.WithContext(backoff.NewConstantBackOff(200*time.Millisecond), bctx),
func(err error, d time.Duration) {
mgr.handshakeHandler.OnHandshake(auth, "", err)
},
)
cancel()
if err != nil {
return nil, err
}

mgr.logger.Info("found", zap.String("addr", addr))
mgr.backendConn = NewBackendConnection(addr)
if err := mgr.backendConn.Connect(); err != nil {
mgr.handshakeHandler.OnHandshake(auth, addr, err)
return nil, err
}

auth.serverAddr = addr
return mgr.backendConn.PacketIO(), nil
return io, err
}

// ExecuteCmd forwards messages between the client and the backend.
Expand Down
Loading