Skip to content

Commit

Permalink
server: enhance graceful stop by closing connections after finish the…
Browse files Browse the repository at this point in the history
… ongoing txn (#32111) (#48905) (#48989)

close #32110

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
crazycs520 and ti-chi-bot authored Nov 29, 2023
1 parent 245230a commit 2db6289
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 128 deletions.
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"@org_golang_google_grpc//channelz/service",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//peer",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
Expand Down
33 changes: 17 additions & 16 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func newClientConn(s *Server) *clientConn {
status: connStatusDispatching,
lastActive: time.Now(),
authPlugin: mysql.AuthNativePassword,
quit: make(chan struct{}),
ppEnabled: s.cfg.ProxyProtocol.Networks != "",
}
}
Expand Down Expand Up @@ -215,6 +216,8 @@ type clientConn struct {
sync.RWMutex
cancelFunc context.CancelFunc
}
// quit is close once clientConn quit Run().
quit chan struct{}
extensions *extension.SessionExtensions

// Proxy Protocol Enabled
Expand Down Expand Up @@ -1093,6 +1096,12 @@ func (cc *clientConn) Run(ctx context.Context) {
terror.Log(err)
metrics.PanicCounter.WithLabelValues(metrics.LabelSession).Inc()
}
if atomic.LoadInt32(&cc.status) != connStatusShutdown {
err := cc.Close()
terror.Log(err)
}

close(cc.quit)
}()

// Usually, client connection status changes between [dispatching] <=> [reading].
Expand All @@ -1101,6 +1110,13 @@ func (cc *clientConn) Run(ctx context.Context) {
// The client connection would detect the events when it fails to change status
// by CAS operation, it would then take some actions accordingly.
for {
// Close connection between txn when we are going to shutdown server.
if cc.server.inShutdownMode.Load() {
if !cc.ctx.GetSessionVars().InTxn() {
return
}
}

if !atomic.CompareAndSwapInt32(&cc.status, connStatusDispatching, connStatusReading) ||
// The judge below will not be hit by all means,
// But keep it stayed as a reminder and for the code reference for connStatusWaitShutdown.
Expand All @@ -1110,6 +1126,7 @@ func (cc *clientConn) Run(ctx context.Context) {

cc.alloc.Reset()
// close connection when idle time is more than wait_timeout
// default 28800(8h), FIXME: should not block at here when we kill the connection.
waitTimeout := cc.getSessionVarsWaitTimeout(ctx)
cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second)
start := time.Now()
Expand Down Expand Up @@ -1196,22 +1213,6 @@ func (cc *clientConn) Run(ctx context.Context) {
}
}

// ShutdownOrNotify will Shutdown this client connection, or do its best to notify.
func (cc *clientConn) ShutdownOrNotify() bool {
if (cc.ctx.Status() & mysql.ServerStatusInTrans) > 0 {
return false
}
// If the client connection status is reading, it's safe to shutdown it.
if atomic.CompareAndSwapInt32(&cc.status, connStatusReading, connStatusShutdown) {
return true
}
// If the client connection status is dispatching, we can't shutdown it immediately,
// so set the status to WaitShutdown as a notification, the loop in clientConn.Run
// will detect it and then exit.
atomic.StoreInt32(&cc.status, connStatusWaitShutdown)
return false
}

func errStrForLog(err error, enableRedactLog bool) string {
if enableRedactLog {
// currently, only ErrParse is considered when enableRedactLog because it may contain sensitive information like
Expand Down
25 changes: 0 additions & 25 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,31 +778,6 @@ func TestShutDown(t *testing.T) {
require.Equal(t, executor.ErrQueryInterrupted, err)
}

func TestShutdownOrNotify(t *testing.T) {
store := testkit.CreateMockStore(t)
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
tc := &TiDBContext{
Session: se,
stmts: make(map[int]*TiDBStatement),
}
cc := &clientConn{
connectionID: 1,
server: &Server{
capability: defaultCapability,
},
status: connStatusWaitShutdown,
}
cc.setCtx(tc)
require.False(t, cc.ShutdownOrNotify())
cc.status = connStatusReading
require.True(t, cc.ShutdownOrNotify())
require.Equal(t, connStatusShutdown, cc.status)
cc.status = connStatusDispatching
require.False(t, cc.ShutdownOrNotify())
require.Equal(t, connStatusWaitShutdown, cc.status)
}

type snapshotCache interface {
SnapCacheHitCount() int
}
Expand Down
2 changes: 1 addition & 1 deletion server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (s *Server) handleStatus(w http.ResponseWriter, req *http.Request) {
// If the server is in the process of shutting down, return a non-200 status.
// It is important not to return status{} as acquiring the s.ConnectionCount()
// acquires a lock that may already be held by the shutdown process.
if s.inShutdownMode {
if !s.health.Load() {
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
Loading

0 comments on commit 2db6289

Please sign in to comment.