Skip to content

Commit

Permalink
backend: add tests for BackendConnMgr (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Aug 19, 2022
1 parent 9d6726a commit ed53270
Show file tree
Hide file tree
Showing 7 changed files with 573 additions and 86 deletions.
4 changes: 2 additions & 2 deletions pkg/proxy/backend/backend_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
package backend

import (
"errors"
"net"
"time"

pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
"github.com/pingcap/TiProxy/pkg/util/errors"
)

const (
Expand All @@ -44,7 +44,7 @@ func (bc *BackendConnection) Addr() string {
func (bc *BackendConnection) Connect() error {
cn, err := net.DialTimeout("tcp", bc.address, DialTimeout)
if err != nil {
return errors.New("dial backend error")
return errors.Wrapf(err, "dial backend error")
}

pkt := pnet.NewPacketIO(cn)
Expand Down
104 changes: 63 additions & 41 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,51 @@ import (
gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/TiProxy/pkg/manager/router"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
"github.com/pingcap/errors"
"github.com/pingcap/TiProxy/pkg/util/errors"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

const (
sqlQueryState = "SHOW SESSION_STATES"
sqlSetState = "SET SESSION_STATES '%s'"
currentDBKey = "current-db"
sqlQueryState = "SHOW SESSION_STATES"
sqlSetState = "SET SESSION_STATES '%s'"
sessionStatesCol = "Session_states"
sessionTokenCol = "Session_token"
currentDBKey = "current-db"
)

type signalRedirect struct {
newAddr string
}

// BackendConnManager migrates a session from one BackendConnection to another.
//
// The signal processing goroutine tries to migrate the session once it receives a signal.
// If the session is not ready at that time, the cmd executing goroutine will try after executing commands.
//
// If redirection fails, it doesn't retry and waits for the next signal, because:
// - If it disconnects immediately: it's even worse than graceful shutdown.
// - If it retries after each command: the latency will be unacceptable afterwards if it always fails.
// - If it stops receiving signals: the previous new backend may be abnormal but the next new backend may be good.
type BackendConnManager struct {
connectionID uint64
authenticator *Authenticator
cmdProcessor *CmdProcessor
eventReceiver router.ConnEventReceiver
backendConn *BackendConnection
processLock sync.Mutex // to make redirecting and command processing exclusive
connectionID uint64
authenticator *Authenticator
cmdProcessor *CmdProcessor
eventReceiver router.ConnEventReceiver
backendConn *BackendConnection
// processLock makes redirecting and command processing exclusive.
processLock sync.Mutex
// signalReceived is used to notify the signal processing goroutine.
signalReceived chan struct{}
signal unsafe.Pointer // type *signalRedirect
cancelFunc context.CancelFunc
// type *signalRedirect, it saves the last signal if there are multiple signals.
// It will be set to nil after migration.
signal unsafe.Pointer
// cancelFunc is used to cancel the signal processing goroutine.
cancelFunc context.CancelFunc
}

// NewBackendConnManager creates a BackendConnManager.
func NewBackendConnManager(connectionID uint64) *BackendConnManager {
return &BackendConnManager{
connectionID: connectionID,
Expand All @@ -64,10 +81,13 @@ func NewBackendConnManager(connectionID uint64) *BackendConnManager {
}
}

// ConnectionID implements RedirectableConn.ConnectionID interface.
// It returns the ID of the frontend connection. The ID stays still after session migration.
func (mgr *BackendConnManager) ConnectionID() uint64 {
return mgr.connectionID
}

// Connect connects to the first backend and then start watching redirection signals.
func (mgr *BackendConnManager) Connect(ctx context.Context, serverAddr string, clientIO *pnet.PacketIO, serverTLSConfig, backendTLSConfig *tls.Config) error {
mgr.processLock.Lock()
defer mgr.processLock.Unlock()
Expand All @@ -85,6 +105,8 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, serverAddr string, c
return nil
}

// ExecuteCmd forwards messages between the client and the backend.
// If it finds that the session is ready for redirection, it migrates the session.
func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, clientIO *pnet.PacketIO) error {
mgr.processLock.Lock()
defer mgr.processLock.Unlock()
Expand All @@ -105,9 +127,8 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, c
}
// Even if it meets an MySQL error, it may have changed the status, such as when executing multi-statements.
if waitingRedirect && mgr.cmdProcessor.canRedirect() {
if err = mgr.tryRedirect(ctx); err != nil {
return err
}
_ = mgr.tryRedirect(ctx)
// Execute the held request no matter redirection succeeds or not.
if holdRequest {
_, err = mgr.cmdProcessor.executeCmd(request, clientIO, mgr.backendConn.PacketIO(), false)
}
Expand All @@ -119,6 +140,8 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, c
return nil
}

// SetEventReceiver implements RedirectableConn.SetEventReceiver interface.
// The receiver sends redirection signals and watches redirecting events.
func (mgr *BackendConnManager) SetEventReceiver(receiver router.ConnEventReceiver) {
mgr.eventReceiver = receiver
}
Expand All @@ -138,13 +161,15 @@ func (mgr *BackendConnManager) querySessionStates() (sessionStates, sessionToken
if result, _, err = mgr.cmdProcessor.query(mgr.backendConn.PacketIO(), sqlQueryState); err != nil {
return
}
if sessionStates, err = result.GetStringByName(0, "Session_states"); err != nil {
if sessionStates, err = result.GetStringByName(0, sessionStatesCol); err != nil {
return
}
sessionToken, err = result.GetStringByName(0, "Session_token")
sessionToken, err = result.GetStringByName(0, sessionTokenCol)
return
}

// processSignals runs in a goroutine to receive redirection signals.
// It will then try to migrate the session.
func (mgr *BackendConnManager) processSignals(ctx context.Context) {
for {
select {
Expand All @@ -154,68 +179,64 @@ func (mgr *BackendConnManager) processSignals(ctx context.Context) {
return
}
mgr.processLock.Lock()
if err := mgr.tryRedirect(ctx); err != nil {
logutil.Logger(ctx).Error("redirect connection failed", zap.Error(err))
}
_ = mgr.tryRedirect(ctx)
mgr.processLock.Unlock()
case <-ctx.Done():
return
}
}
}

// processLock must be held on this function.
func (mgr *BackendConnManager) tryRedirect(ctx context.Context) (err error) {
// tryRedirect tries to migrate the session if the session is redirect-able.
// NOTE: processLock should be held before calling this function.
func (mgr *BackendConnManager) tryRedirect(ctx context.Context) error {
signal := (*signalRedirect)(atomic.LoadPointer(&mgr.signal))
if signal == nil {
return nil
}

if !mgr.cmdProcessor.canRedirect() {
return
return nil
}
from := mgr.backendConn.Addr()
to := signal.newAddr

var err error
defer func() {
if err != nil {
mgr.eventReceiver.OnRedirectFail(from, to, mgr)
logutil.Logger(ctx).Error("redirect connection failed", zap.String("to", signal.newAddr), zap.Error(err))
logutil.Logger(ctx).Warn("redirect connection failed", zap.String("from", from), zap.String("to", to), zap.Error(err))
} else {
mgr.eventReceiver.OnRedirectSucceed(from, to, mgr)
logutil.Logger(ctx).Info("redirect connection succeeds", zap.String("to", signal.newAddr))
logutil.Logger(ctx).Info("redirect connection succeeds", zap.String("from", from), zap.String("to", to))
}
}()

var sessionStates, sessionToken string
if sessionStates, sessionToken, err = mgr.querySessionStates(); err != nil {
return
return err
}

newConn := NewBackendConnection(to)
if err = newConn.Connect(); err != nil {
return
return err
}
// Retrial may be needed in the future.
if err = mgr.authenticator.handshakeSecondTime(newConn.PacketIO(), sessionToken); err != nil {
if ignoredErr := newConn.Close(); ignoredErr != nil {
logutil.Logger(ctx).Warn("close new backend connection failed", zap.Error(ignoredErr))
}
return
if err = mgr.authenticator.handshakeSecondTime(newConn.PacketIO(), sessionToken); err == nil {
err = mgr.initSessionStates(newConn.PacketIO(), sessionStates)
}
if err = mgr.initSessionStates(newConn.PacketIO(), sessionStates); err != nil {
if err != nil {
if ignoredErr := newConn.Close(); ignoredErr != nil {
logutil.Logger(ctx).Warn("close new backend connection failed", zap.Error(ignoredErr))
}
return
return err
}
if ignoredErr := mgr.backendConn.Close(); ignoredErr != nil {
logutil.Logger(ctx).Warn("close previous backend connection failed", zap.Error(ignoredErr))
}
mgr.backendConn = newConn
// The `mgr` won't be notified again before it calls `OnRedirectSucceed`, so simply `StorePointer` is also fine.
atomic.CompareAndSwapPointer(&mgr.signal, unsafe.Pointer(signal), nil)
return
return nil
}

// The original db in the auth info may be dropped during the session, so we need to authenticate with the current db.
Expand All @@ -224,29 +245,30 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) (err error) {
func (mgr *BackendConnManager) updateAuthInfoFromSessionStates(sessionStates []byte) error {
var statesMap map[string]string
if err := json.Unmarshal(sessionStates, &statesMap); err != nil {
return errors.Annotate(err, "unmarshal session states error")
return errors.Wrapf(err, "unmarshal session states error")
}
// The currentDBKey may be omitted if it's empty. I this case, we still need to update it.
// The currentDBKey may be omitted if it's empty. In this case, we still need to update it.
currentDB := statesMap[currentDBKey]
mgr.authenticator.updateCurrentDB(currentDB)
return nil
}

// Redirect redirects the current session to the newAddr. Note that the function should be very quick,
// so it cannot wait for any events.
// Redirect implements RedirectableConn.Redirect interface. It redirects the current session to the newAddr.
// Note that the caller requires the function to be non-blocking.
func (mgr *BackendConnManager) Redirect(newAddr string) {
// We do not use `chan signalRedirect` to avoid blocking. We cannot discard the signal when it blocks,
// because only the latest signal matters.
atomic.StorePointer(&mgr.signal, unsafe.Pointer(&signalRedirect{
newAddr: newAddr,
}))
logutil.BgLogger().Info("received redirect command", zap.String("to", newAddr))
logutil.BgLogger().Info("received redirect command", zap.String("from", mgr.backendConn.Addr()), zap.String("to", newAddr))
select {
case mgr.signalReceived <- struct{}{}:
default:
}
}

// Close releases all resources.
func (mgr *BackendConnManager) Close() error {
if mgr.cancelFunc != nil {
mgr.cancelFunc()
Expand Down
Loading

0 comments on commit ed53270

Please sign in to comment.