Skip to content

Commit

Permalink
txn: Add txn state's view (#22908)
Browse files Browse the repository at this point in the history
  • Loading branch information
longfangsong authored May 12, 2021
1 parent 1ae648b commit b1d134d
Show file tree
Hide file tree
Showing 21 changed files with 493 additions and 42 deletions.
5 changes: 5 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -70,6 +71,10 @@ type mockSessionManager struct {
PS []*util.ProcessInfo
}

func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
panic("unimplemented!")
}

func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
for _, item := range msm.PS {
Expand Down
5 changes: 5 additions & 0 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
Expand Down Expand Up @@ -241,6 +242,10 @@ type mockSessionManager struct {
PS []*util.ProcessInfo
}

func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
panic("unimplemented!")
}

func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
for _, item := range msm.PS {
Expand Down
3 changes: 3 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -377,6 +378,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
if txn.Valid() {
txnStartTS = txn.StartTS()
}

return &recordSet{
executor: e,
stmt: a,
Expand Down Expand Up @@ -590,6 +592,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
// todo: Report deadlock
if ErrDeadlock.Equal(err) {
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startLocking).Seconds())
}
Expand Down
4 changes: 3 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,9 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TablePlacementPolicy),
strings.ToLower(infoschema.TableClientErrorsSummaryGlobal),
strings.ToLower(infoschema.TableClientErrorsSummaryByUser),
strings.ToLower(infoschema.TableClientErrorsSummaryByHost):
strings.ToLower(infoschema.TableClientErrorsSummaryByHost),
strings.ToLower(infoschema.TableTiDBTrx),
strings.ToLower(infoschema.ClusterTableTiDBTrx):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
Expand Down
5 changes: 5 additions & 0 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
plannerutil "github.com/pingcap/tidb/planner/util"
txninfo "github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -60,6 +61,10 @@ type mockSessionManager struct {
serverID uint64
}

func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
panic("unimplemented!")
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
Expand Down
5 changes: 5 additions & 0 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
txninfo "github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/israce"
Expand All @@ -38,6 +39,10 @@ type mockSessionManager1 struct {
PS []*util.ProcessInfo
}

func (msm *mockSessionManager1) ShowTxnList() []*txninfo.TxnInfo {
return nil
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
Expand Down
38 changes: 38 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
infoschema.TableClientErrorsSummaryByUser,
infoschema.TableClientErrorsSummaryByHost:
err = e.setDataForClientErrorsSummary(sctx, e.table.Name.O)
case infoschema.TableTiDBTrx:
e.setDataForTiDBTrx(sctx)
case infoschema.ClusterTableTiDBTrx:
err = e.setDataForClusterTiDBTrx(sctx)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -2011,6 +2015,40 @@ func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context
return nil
}

func (e *memtableRetriever) setDataForTiDBTrx(ctx sessionctx.Context) {
sm := ctx.GetSessionManager()
if sm == nil {
return
}

loginUser := ctx.GetSessionVars().User
var hasProcessPriv bool
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
hasProcessPriv = true
}
}
infoList := sm.ShowTxnList()
for _, info := range infoList {
// If you have the PROCESS privilege, you can see all running transactions.
// Otherwise, you can see only your own transactions.
if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username {
continue
}
e.rows = append(e.rows, info.ToDatum())
}
}

func (e *memtableRetriever) setDataForClusterTiDBTrx(ctx sessionctx.Context) error {
e.setDataForTiDBTrx(ctx)
rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows)
if err != nil {
return err
}
e.rows = rows
return nil
}

type hugeMemTableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down
5 changes: 5 additions & 0 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/session"
txninfo "github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
Expand Down Expand Up @@ -728,6 +729,10 @@ type mockSessionManager struct {
serverID uint64
}

func (sm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
panic("unimplemented!")
}

func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
return sm.processInfoMap
}
Expand Down
5 changes: 5 additions & 0 deletions executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/domain"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
txninfo "github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/israce"
Expand Down Expand Up @@ -135,6 +136,10 @@ type mockSessionManager2 struct {
killed bool
}

func (sm *mockSessionManager2) ShowTxnList() []*txninfo.TxnInfo {
panic("unimplemented!")
}

func (sm *mockSessionManager2) ShowProcessList() map[uint64]*util.ProcessInfo {
pl := make(map[uint64]*util.ProcessInfo)
if pi, ok := sm.GetProcessInfo(0); ok {
Expand Down
5 changes: 5 additions & 0 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
Expand Down Expand Up @@ -796,6 +797,10 @@ type mockSessionManager1 struct {
Se session.Session
}

func (msm *mockSessionManager1) ShowTxnList() []*txninfo.TxnInfo {
panic("unimplemented!")
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
Expand Down
3 changes: 3 additions & 0 deletions infoschema/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
ClusterTableStatementsSummary = "CLUSTER_STATEMENTS_SUMMARY"
// ClusterTableStatementsSummaryHistory is the string constant of cluster statement summary history table.
ClusterTableStatementsSummaryHistory = "CLUSTER_STATEMENTS_SUMMARY_HISTORY"
// ClusterTableTiDBTrx is the string constant of cluster transaction running table.
ClusterTableTiDBTrx = "CLUSTER_TIDB_TRX"
)

// memTableToClusterTables means add memory table to cluster table.
Expand All @@ -45,6 +47,7 @@ var memTableToClusterTables = map[string]string{
TableProcesslist: ClusterTableProcesslist,
TableStatementsSummary: ClusterTableStatementsSummary,
TableStatementsSummaryHistory: ClusterTableStatementsSummaryHistory,
TableTiDBTrx: ClusterTableTiDBTrx,
}

func init() {
Expand Down
1 change: 1 addition & 0 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (*testSuite) TestInfoTables(c *C) {
"TABLESPACES",
"COLLATION_CHARACTER_SET_APPLICABILITY",
"PROCESSLIST",
"TIDB_TRX",
}
for _, t := range infoTables {
tb, err1 := is.TableByName(util.InformationSchemaName, model.NewCIStr(t))
Expand Down
38 changes: 30 additions & 8 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
Expand Down Expand Up @@ -161,6 +163,8 @@ const (
TableClientErrorsSummaryByUser = "CLIENT_ERRORS_SUMMARY_BY_USER"
// TableClientErrorsSummaryByHost is the string constant of client errors table.
TableClientErrorsSummaryByHost = "CLIENT_ERRORS_SUMMARY_BY_HOST"
// TableTiDBTrx is current running transaction status table.
TableTiDBTrx = "TIDB_TRX"
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -233,22 +237,25 @@ var tableIDMap = map[string]int64{
TableClientErrorsSummaryGlobal: autoid.InformationSchemaDBID + 67,
TableClientErrorsSummaryByUser: autoid.InformationSchemaDBID + 68,
TableClientErrorsSummaryByHost: autoid.InformationSchemaDBID + 69,
TableTiDBTrx: autoid.InformationSchemaDBID + 70,
ClusterTableTiDBTrx: autoid.InformationSchemaDBID + 71,
}

type columnInfo struct {
name string
tp byte
size int
decimal int
flag uint
deflt interface{}
comment string
name string
tp byte
size int
decimal int
flag uint
deflt interface{}
comment string
enumElems []string
}

func buildColumnInfo(col columnInfo) *model.ColumnInfo {
mCharset := charset.CharsetBin
mCollation := charset.CharsetBin
if col.tp == mysql.TypeVarchar || col.tp == mysql.TypeBlob || col.tp == mysql.TypeLongBlob {
if col.tp == mysql.TypeVarchar || col.tp == mysql.TypeBlob || col.tp == mysql.TypeLongBlob || col.tp == mysql.TypeEnum {
mCharset = charset.CharsetUTF8MB4
mCollation = charset.CollationUTF8MB4
}
Expand All @@ -259,6 +266,7 @@ func buildColumnInfo(col columnInfo) *model.ColumnInfo {
Flen: col.size,
Decimal: col.decimal,
Flag: col.flag,
Elems: col.enumElems,
}
return &model.ColumnInfo{
Name: model.NewCIStr(col.name),
Expand Down Expand Up @@ -1332,6 +1340,19 @@ var tableClientErrorsSummaryByHostCols = []columnInfo{
{name: "LAST_SEEN", tp: mysql.TypeTimestamp, size: 26},
}

var tableTiDBTrxCols = []columnInfo{
{name: "ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.PriKeyFlag | mysql.NotNullFlag | mysql.UnsignedFlag},
{name: "START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Start time of the transaction"},
{name: "DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"},
{name: "STATE", tp: mysql.TypeEnum, enumElems: txninfo.TxnRunningStateStrs, comment: "Current running state of the transaction"},
{name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Current lock waiting's start time"},
{name: "LEN", tp: mysql.TypeLonglong, size: 64, comment: "How many entries are in MemDB"},
{name: "SIZE", tp: mysql.TypeLonglong, size: 64, comment: "MemDB used memory"},
{name: "SESSION_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag, comment: "Which session this transaction belongs to"},
{name: "USER", tp: mysql.TypeVarchar, size: 16, comment: "The user who open this session"},
{name: "DB", tp: mysql.TypeVarchar, size: 64, comment: "The schema this transaction works on"},
}

// GetShardingInfo returns a nil or description string for the sharding information of given TableInfo.
// The returned description string may be:
// - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified.
Expand Down Expand Up @@ -1701,6 +1722,7 @@ var tableNameToColumns = map[string][]columnInfo{
TableClientErrorsSummaryGlobal: tableClientErrorsSummaryGlobalCols,
TableClientErrorsSummaryByUser: tableClientErrorsSummaryByUserCols,
TableClientErrorsSummaryByHost: tableClientErrorsSummaryByHostCols,
TableTiDBTrx: tableTiDBTrxCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
Loading

0 comments on commit b1d134d

Please sign in to comment.