Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-6.5' into cherry-pick-…
Browse files Browse the repository at this point in the history
…39011-to-release-6.5
  • Loading branch information
time-and-fate committed Feb 14, 2023
2 parents 0cb59e6 + b8328d8 commit 9ba14f0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 6 deletions.
6 changes: 6 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,12 @@ func KeepGcDisabled(g glue.Glue, store kv.Storage) (RestoreFunc, error) {
return nil, errors.Trace(err)
}

// If the oldRatio is negative, which is not normal status.
// It should set default value "1.1" after PiTR finished.
if strings.HasPrefix(oldRatio, "-") {
oldRatio = "1.1"
}

return func() error {
return utils.SetGcRatio(execCtx, oldRatio)
}, nil
Expand Down
10 changes: 10 additions & 0 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
BinaryArgs: args,
PrepStmt: prepStmt,
}

// For the combination of `ComPrepare` and `ComExecute`, the statement name is stored in the client side, and the
// TiDB only has the ID, so don't try to construct an `EXECUTE SOMETHING`. Use the original prepared statement here
// instead.
sql := ""
planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt)
if ok {
sql = planCacheStmt.StmtText
}
execStmt.SetText(charset.EncodingUTF8Impl, sql)
rs, err := (&cc.ctx).ExecuteStmt(ctx, execStmt)
if err != nil {
return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
Expand Down
32 changes: 32 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1799,3 +1799,35 @@ func TestExtensionChangeUser(t *testing.T) {
require.Equal(t, expectedConnInfo.Error, logInfo.Error)
require.Equal(t, *(expectedConnInfo.ConnectionInfo), *(logInfo.ConnectionInfo))
}

func TestProcessInfoForExecuteCommand(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
ctx := context.Background()

tk.MustExec("use test")
cc.setCtx(&TiDBContext{Session: tk.Session(), stmts: make(map[int]*TiDBStatement)})

tk.MustExec("create table t (col1 int)")

// simple prepare and execute
require.NoError(t, cc.handleStmtPrepare(ctx, "select sum(col1) from t"))
require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}))
require.Equal(t, cc.ctx.Session.ShowProcess().Info, "select sum(col1) from t")

// prepare and execute with params
require.NoError(t, cc.handleStmtPrepare(ctx, "select sum(col1) from t where col1 < ? and col1 > 100"))
// 1 params, length of nullBitMap is 1, `0x8, 0x0` represents the type, and the following `0x10, 0x0....` is the param
// 10
require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x2, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
0x1, 0x8, 0x0,
0x0A, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}))
require.Equal(t, cc.ctx.Session.ShowProcess().Info, "select sum(col1) from t where col1 < ? and col1 > 100")
}
18 changes: 12 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,17 @@ func killConn(conn *clientConn) {
}
}

// KillSysProcesses kill sys processes such as auto analyze.
func (s *Server) KillSysProcesses() {
if s.dom == nil {
return
}
sysProcTracker := s.dom.SysProcTracker()
for connID := range sysProcTracker.GetSysProcessList() {
sysProcTracker.KillSysProcess(connID)
}
}

// KillAllConnections kills all connections when server is not gracefully shutdown.
func (s *Server) KillAllConnections() {
logutil.BgLogger().Info("[server] kill all connections.")
Expand All @@ -804,12 +815,7 @@ func (s *Server) KillAllConnections() {
killConn(conn)
}

if s.dom != nil {
sysProcTracker := s.dom.SysProcTracker()
for connID := range sysProcTracker.GetSysProcessList() {
sysProcTracker.KillSysProcess(connID)
}
}
s.KillSysProcesses()
}

var gracefulCloseConnectionsTimeout = 15 * time.Second
Expand Down
3 changes: 3 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,9 @@ func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain, gracefu
if graceful {
done := make(chan struct{})
svr.GracefulDown(context.Background(), done)
// Kill sys processes such as auto analyze. Otherwise, tidb-server cannot exit until auto analyze is finished.
// See https://github.com/pingcap/tidb/issues/40038 for details.
svr.KillSysProcesses()
} else {
svr.TryGracefulDown()
}
Expand Down

0 comments on commit 9ba14f0

Please sign in to comment.