From 155e3ea5d4baf063c9c7f696193b8a1951b16d1d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 17 Feb 2023 16:51:04 +0800 Subject: [PATCH] *: prevent cursor read from being cancelled by GC (#39950) (#39990) close pingcap/tidb#39447 --- ddl/db_test.go | 26 +++++---- domain/infosync/info.go | 16 +----- server/BUILD.bazel | 1 + server/conn_stmt.go | 10 +++- server/conn_stmt_test.go | 89 +++++++++++++++++++++++++++++ server/driver_tidb.go | 40 +++++++++++++ server/driver_tidb_test.go | 25 ++++++++ server/server.go | 34 +++++++++++ session/session.go | 1 + sessionctx/variable/session.go | 53 +++++++++++++++++ sessionctx/variable/session_test.go | 54 +++++++++++++++++ testkit/mocksessionmanager.go | 24 +++++++- util/processinfo.go | 28 +++++++++ 13 files changed, 375 insertions(+), 26 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index d36de3a425025..edc891ad16ccf 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1585,13 +1585,11 @@ func TestLogAndShowSlowLog(t *testing.T) { } func TestReportingMinStartTimestamp(t *testing.T) { - _, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) + store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) + tk := testkit.NewTestKit(t, store) + se := tk.Session() infoSyncer := dom.InfoSyncer() - sm := &testkit.MockSessionManager{ - PS: make([]*util.ProcessInfo, 0), - } - infoSyncer.SetSessionManager(sm) beforeTS := oracle.GoTimeToTS(time.Now()) infoSyncer.ReportMinStartTS(dom.Store()) afterTS := oracle.GoTimeToTS(time.Now()) @@ -1600,13 +1598,21 @@ func TestReportingMinStartTimestamp(t *testing.T) { now := time.Now() validTS := oracle.GoTimeToLowerLimitStartTS(now.Add(time.Minute), tikv.MaxTxnTimeUse) lowerLimit := oracle.GoTimeToLowerLimitStartTS(now, tikv.MaxTxnTimeUse) + sm := se.GetSessionManager().(*testkit.MockSessionManager) sm.PS = []*util.ProcessInfo{ - {CurTxnStartTS: 0}, - {CurTxnStartTS: math.MaxUint64}, - {CurTxnStartTS: lowerLimit}, - {CurTxnStartTS: validTS}, + {CurTxnStartTS: 0, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, + {CurTxnStartTS: math.MaxUint64, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, + {CurTxnStartTS: lowerLimit, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, + {CurTxnStartTS: validTS, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, } - infoSyncer.SetSessionManager(sm) + infoSyncer.ReportMinStartTS(dom.Store()) + require.Equal(t, validTS, infoSyncer.GetMinStartTS()) + + unhold := se.GetSessionVars().ProtectedTSList.HoldTS(validTS - 1) + infoSyncer.ReportMinStartTS(dom.Store()) + require.Equal(t, validTS-1, infoSyncer.GetMinStartTS()) + + unhold() infoSyncer.ReportMinStartTS(dom.Store()) require.Equal(t, validTS, infoSyncer.GetMinStartTS()) } diff --git a/domain/infosync/info.go b/domain/infosync/info.go index c501d7f16d695..3d45ce691e252 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -689,8 +689,6 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { if sm == nil { return } - pl := sm.ShowProcessList() - innerSessionStartTSList := sm.GetInternalSessionStartTSList() // Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC. currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) @@ -704,18 +702,8 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { minStartTS := oracle.GoTimeToTS(now) logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("initial minStartTS", minStartTS), zap.Uint64("StartTSLowerLimit", startTSLowerLimit)) - for _, info := range pl { - if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS { - minStartTS = info.CurTxnStartTS - } - } - - for _, innerTS := range innerSessionStartTSList { - logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("Internal Session Transaction StartTS", innerTS)) - kv.PrintLongTimeInternalTxn(now, innerTS, false) - if innerTS > startTSLowerLimit && innerTS < minStartTS { - minStartTS = innerTS - } + if ts := sm.GetMinStartTS(startTSLowerLimit); ts > startTSLowerLimit && ts < minStartTS { + minStartTS = ts } is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS) diff --git a/server/BUILD.bazel b/server/BUILD.bazel index 56b3741259f81..54bc5ffcd1325 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -194,6 +194,7 @@ go_test( "//util/plancodec", "//util/resourcegrouptag", "//util/rowcodec", + "//util/sqlexec", "//util/topsql", "//util/topsql/collector", "//util/topsql/collector/mock", diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 255cb046674ec..cf2f9f2aa6e86 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -276,7 +276,9 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm if rs == nil { return false, cc.writeOK(ctx) } - if result, ok := rs.(*tidbResultSet); ok { + // since there are multiple implementations of ResultSet (the rs might be wrapped), we have to unwrap the rs before + // casting it to *tidbResultSet. + if result, ok := unwrapResultSet(rs).(*tidbResultSet); ok { if planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt); ok { result.preparedStmt = planCacheStmt } @@ -288,6 +290,12 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm if useCursor { cc.initResultEncoder(ctx) defer cc.rsEncoder.clean() + // fix https://github.com/pingcap/tidb/issues/39447. we need to hold the start-ts here because the process info + // will be set to sleep after fetch returned. + if pi := cc.ctx.ShowProcess(); pi != nil && pi.ProtectedTSList != nil && pi.CurTxnStartTS > 0 { + unhold := pi.HoldTS(pi.CurTxnStartTS) + rs = &rsWithHooks{ResultSet: rs, onClosed: unhold} + } stmt.StoreResultSet(rs) if err = cc.writeColumnInfo(rs.Columns()); err != nil { return false, err diff --git a/server/conn_stmt_test.go b/server/conn_stmt_test.go index 1b8ea55e61c35..2e60fc1085332 100644 --- a/server/conn_stmt_test.go +++ b/server/conn_stmt_test.go @@ -15,12 +15,15 @@ package server import ( + "context" + "encoding/binary" "testing" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" "github.com/stretchr/testify/require" ) @@ -251,3 +254,89 @@ func TestParseStmtFetchCmd(t *testing.T) { require.Equal(t, tc.err, err) } } + +func TestCursorReadHoldTS(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + srv := CreateMockServer(t, store) + srv.SetDomain(dom) + defer srv.Close() + + appendUint32 := binary.LittleEndian.AppendUint32 + ctx := context.Background() + c := CreateMockConn(t, srv) + tk := testkit.NewTestKitWithSession(t, store, c.Context().Session) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key)") + tk.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (7), (8)") + tk.MustQuery("select count(*) from t").Check(testkit.Rows("8")) + + stmt, _, _, err := c.Context().Prepare("select * from t") + require.NoError(t, err) + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + + // should hold ts after executing stmt with cursor + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), + mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, + ))) + ts := tk.Session().ShowProcess().GetMinStartTS(0) + require.Positive(t, ts) + // should unhold ts when result set exhausted + require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) + require.Equal(t, ts, tk.Session().ShowProcess().GetMinStartTS(0)) + require.Equal(t, ts, srv.GetMinStartTS(0)) + require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) + require.Equal(t, ts, tk.Session().ShowProcess().GetMinStartTS(0)) + require.Equal(t, ts, srv.GetMinStartTS(0)) + require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + + // should hold ts after executing stmt with cursor + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), + mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, + ))) + require.Positive(t, tk.Session().ShowProcess().GetMinStartTS(0)) + // should unhold ts when stmt reset + require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtReset}, uint32(stmt.ID())))) + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + + // should hold ts after executing stmt with cursor + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), + mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, + ))) + require.Positive(t, tk.Session().ShowProcess().GetMinStartTS(0)) + // should unhold ts when stmt closed + require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtClose}, uint32(stmt.ID())))) + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + + // create another 2 stmts and execute them + stmt1, _, _, err := c.Context().Prepare("select * from t") + require.NoError(t, err) + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt1.ID())), + mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, + ))) + ts1 := tk.Session().ShowProcess().GetMinStartTS(0) + require.Positive(t, ts1) + stmt2, _, _, err := c.Context().Prepare("select * from t") + require.NoError(t, err) + require.NoError(t, c.Dispatch(ctx, append( + appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt2.ID())), + mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, + ))) + ts2 := tk.Session().ShowProcess().GetMinStartTS(ts1) + require.Positive(t, ts2) + + require.Less(t, ts1, ts2) + require.Equal(t, ts1, srv.GetMinStartTS(0)) + require.Equal(t, ts2, srv.GetMinStartTS(ts1)) + require.Zero(t, srv.GetMinStartTS(ts2)) + + // should unhold all when session closed + c.Close() + require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) + require.Zero(t, srv.GetMinStartTS(0)) +} diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 0c329826d0683..7b25a998d618b 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -480,6 +480,46 @@ func (trs *tidbResultSet) Columns() []*ColumnInfo { return trs.columns } +// rsWithHooks wraps a ResultSet with some hooks (currently only onClosed). +type rsWithHooks struct { + ResultSet + onClosed func() +} + +// Close implements ResultSet#Close +func (rs *rsWithHooks) Close() error { + closed := rs.IsClosed() + err := rs.ResultSet.Close() + if !closed && rs.onClosed != nil { + rs.onClosed() + } + return err +} + +// OnFetchReturned implements fetchNotifier#OnFetchReturned +func (rs *rsWithHooks) OnFetchReturned() { + if impl, ok := rs.ResultSet.(fetchNotifier); ok { + impl.OnFetchReturned() + } +} + +// Unwrap returns the underlying result set +func (rs *rsWithHooks) Unwrap() ResultSet { + return rs.ResultSet +} + +// unwrapResultSet likes errors.Cause but for ResultSet +func unwrapResultSet(rs ResultSet) ResultSet { + var unRS ResultSet + if u, ok := rs.(interface{ Unwrap() ResultSet }); ok { + unRS = u.Unwrap() + } + if unRS == nil { + return rs + } + return unwrapResultSet(unRS) +} + func convertColumnInfo(fld *ast.ResultField) (ci *ColumnInfo) { ci = &ColumnInfo{ Name: fld.ColumnAsName.O, diff --git a/server/driver_tidb_test.go b/server/driver_tidb_test.go index b5f7e670aded0..b56632937e078 100644 --- a/server/driver_tidb_test.go +++ b/server/driver_tidb_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" "github.com/stretchr/testify/require" ) @@ -95,3 +96,27 @@ func TestConvertColumnInfo(t *testing.T) { colInfo = convertColumnInfo(&resultField) require.Equal(t, uint32(4), colInfo.ColumnLength) } + +func TestRSWithHooks(t *testing.T) { + closeCount := 0 + rs := &rsWithHooks{ + ResultSet: &tidbResultSet{recordSet: new(sqlexec.SimpleRecordSet)}, + onClosed: func() { closeCount++ }, + } + require.Equal(t, 0, closeCount) + rs.Close() + require.Equal(t, 1, closeCount) + rs.Close() + require.Equal(t, 1, closeCount) +} + +func TestUnwrapRS(t *testing.T) { + var nilRS ResultSet + require.Nil(t, unwrapResultSet(nilRS)) + rs0 := new(tidbResultSet) + rs1 := &rsWithHooks{ResultSet: rs0} + rs2 := &rsWithHooks{ResultSet: rs1} + for _, rs := range []ResultSet{rs0, rs1, rs2} { + require.Equal(t, rs0, unwrapResultSet(rs)) + } +} diff --git a/server/server.go b/server/server.go index 1fad475964152..42031fce9ffa1 100644 --- a/server/server.go +++ b/server/server.go @@ -981,3 +981,37 @@ func (s *Server) KillNonFlashbackClusterConn() { s.Kill(id, false) } } + +// GetMinStartTS implements SessionManager interface. +func (s *Server) GetMinStartTS(lowerBound uint64) (ts uint64) { + // sys processes + if s.dom != nil { + for _, pi := range s.dom.SysProcTracker().GetSysProcessList() { + if thisTS := pi.GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + } + // user sessions + func() { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + for _, client := range s.clients { + if thisTS := client.ctx.ShowProcess().GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + }() + // internal sessions + func() { + s.sessionMapMutex.Lock() + defer s.sessionMapMutex.Unlock() + analyzeProcID := util.GetAutoAnalyzeProcID(s.ServerID) + for se := range s.internalSessions { + if thisTS, processInfoID := session.GetStartTSFromSession(se); processInfoID != analyzeProcID && thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + }() + return +} diff --git a/session/session.go b/session/session.go index ca12271364b9b..c713f5bcce764 100644 --- a/session/session.go +++ b/session/session.go @@ -1602,6 +1602,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(), MaxExecutionTime: maxExecutionTime, RedactSQL: s.sessionVars.EnableRedactLog, + ProtectedTSList: &s.sessionVars.ProtectedTSList, } oldPi := s.ShowProcess() if p == nil { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1c1fa91edbc6a..f75a302510f17 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1322,6 +1322,9 @@ type SessionVars struct { // StoreBatchSize indicates the batch size limit of store batch, set this field to 0 to disable store batch. StoreBatchSize int + + // ProtectedTSList holds a list of timestamps that should delay GC. + ProtectedTSList protectedTSList } // GetNewChunkWithCapacity Attempt to request memory from the chunk pool @@ -3141,3 +3144,53 @@ func (s *SessionVars) GetRelatedTableForMDL() *sync.Map { func (s *SessionVars) EnableForceInlineCTE() bool { return s.enableForceInlineCTE } + +// protectedTSList implements util/processinfo#ProtectedTSList +type protectedTSList struct { + sync.Mutex + items map[uint64]int +} + +// HoldTS holds the timestamp to prevent its data from being GCed. +func (lst *protectedTSList) HoldTS(ts uint64) (unhold func()) { + lst.Lock() + if lst.items == nil { + lst.items = map[uint64]int{} + } + lst.items[ts] += 1 + lst.Unlock() + var once sync.Once + return func() { + once.Do(func() { + lst.Lock() + if lst.items != nil { + if lst.items[ts] > 1 { + lst.items[ts] -= 1 + } else { + delete(lst.items, ts) + } + } + lst.Unlock() + }) + } +} + +// GetMinProtectedTS returns the minimum protected timestamp that greater than `lowerBound` (0 if no such one). +func (lst *protectedTSList) GetMinProtectedTS(lowerBound uint64) (ts uint64) { + lst.Lock() + for k, v := range lst.items { + if v > 0 && k > lowerBound && (k < ts || ts == 0) { + ts = k + } + } + lst.Unlock() + return +} + +// Size returns the number of protected timestamps (exported for test). +func (lst *protectedTSList) Size() (size int) { + lst.Lock() + size = len(lst.items) + lst.Unlock() + return +} diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 732ce4ad606cf..edeb756a1f707 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -486,3 +486,57 @@ func TestGetReuseChunk(t *testing.T) { require.NotEqual(t, allocpool.Alloc, alloc) require.Nil(t, sessVars.ChunkPool.Alloc) } + +func TestPretectedTSList(t *testing.T) { + lst := &variable.NewSessionVars(nil).ProtectedTSList + + // empty set + require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) + require.Equal(t, 0, lst.Size()) + + // hold 1 + unhold1 := lst.HoldTS(1) + require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) + + // hold 2 twice + unhold2a := lst.HoldTS(2) + unhold2b := lst.HoldTS(2) + require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) + require.Equal(t, 2, lst.Size()) + + // unhold 2a + unhold2a() + require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) + require.Equal(t, 2, lst.Size()) + // unhold 2a again + unhold2a() + require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) + require.Equal(t, 2, lst.Size()) + + // unhold 1 + unhold1() + require.Equal(t, uint64(2), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) + require.Equal(t, 1, lst.Size()) + + // unhold 2b + unhold2b() + require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) + require.Equal(t, 0, lst.Size()) + + // unhold 2b again + unhold2b() + require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) + require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) + require.Equal(t, 0, lst.Size()) +} diff --git a/testkit/mocksessionmanager.go b/testkit/mocksessionmanager.go index 67280bc2e4cbe..550ff69132d91 100644 --- a/testkit/mocksessionmanager.go +++ b/testkit/mocksessionmanager.go @@ -145,7 +145,7 @@ func (msm *MockSessionManager) KillNonFlashbackClusterConn() { } } -// CheckOldRunningTxn is to get all startTS of every transactions running in the current internal sessions +// CheckOldRunningTxn implement SessionManager interface. func (msm *MockSessionManager) CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string) { msm.mu.Lock() for _, se := range msm.conn { @@ -153,3 +153,25 @@ func (msm *MockSessionManager) CheckOldRunningTxn(job2ver map[int64]int64, job2i } msm.mu.Unlock() } + +// GetMinStartTS implements SessionManager interface. +func (msm *MockSessionManager) GetMinStartTS(lowerBound uint64) (ts uint64) { + msm.PSMu.RLock() + defer msm.PSMu.RUnlock() + if len(msm.PS) > 0 { + for _, pi := range msm.PS { + if thisTS := pi.GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + return + } + msm.mu.Lock() + defer msm.mu.Unlock() + for _, s := range msm.conn { + if thisTS := s.ShowProcess().GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + } + return +} diff --git a/util/processinfo.go b/util/processinfo.go index 77f35ef94a5ee..dee4f4ea30a53 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -31,6 +31,14 @@ import ( "github.com/tikv/client-go/v2/oracle" ) +// ProtectedTSList holds a list of timestamps that should delay GC. +type ProtectedTSList interface { + // HoldTS holds the timestamp to prevent its data from being GCed. + HoldTS(ts uint64) (unhold func()) + // GetMinProtectedTS returns the minimum protected timestamp that greater than `lowerBound` (0 if no such one). + GetMinProtectedTS(lowerBound uint64) (ts uint64) +} + // OOMAlarmVariablesInfo is a struct for OOM alarm variables. type OOMAlarmVariablesInfo struct { SessionAnalyzeVersion int @@ -40,6 +48,7 @@ type OOMAlarmVariablesInfo struct { // ProcessInfo is a struct used for show processlist statement. type ProcessInfo struct { + ProtectedTSList Time time.Time ExpensiveLogTime time.Time Plan interface{} @@ -129,6 +138,23 @@ func (pi *ProcessInfo) ToRow(tz *time.Location) []interface{} { return append(pi.ToRowForShow(true), pi.Digest, bytesConsumed, diskConsumed, pi.txnStartTs(tz)) } +// GetMinStartTS returns the minimum start-ts (used to delay GC) that greater than `lowerBound` (0 if no such one). +func (pi *ProcessInfo) GetMinStartTS(lowerBound uint64) (ts uint64) { + if pi == nil { + return + } + if thisTS := pi.CurTxnStartTS; thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + if pi.ProtectedTSList == nil { + return + } + if thisTS := pi.GetMinProtectedTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { + ts = thisTS + } + return +} + // ascServerStatus is a slice of all defined server status in ascending order. var ascServerStatus = []uint16{ mysql.ServerStatusInTrans, @@ -197,6 +223,8 @@ type SessionManager interface { CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string) // KillNonFlashbackClusterConn kill all non flashback cluster connections. KillNonFlashbackClusterConn() + // GetMinStartTS returns the minimum start-ts (used to delay GC) that greater than `lowerBound` (0 if no such one). + GetMinStartTS(lowerBound uint64) uint64 } // GlobalConnID is the global connection ID, providing UNIQUE connection IDs across the whole TiDB cluster.