Skip to content

Commit

Permalink
util: change record analyze to explain in oom alarm (#39099) (#39100)
Browse files Browse the repository at this point in the history
close #39036, ref #39094, ref #39098
  • Loading branch information
ti-chi-bot authored Nov 13, 2022
1 parent bee9915 commit cf36a9c
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 70 deletions.
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,8 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetail
if runtimeStatsColl != nil && runtimeStatsColl.ExistsCopStats(explainID) {
copStats = runtimeStatsColl.GetCopStats(explainID)
}
memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithLock(p.ID())
diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithLock(p.ID())
memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID())
diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithoutLock(p.ID())
return
}

Expand Down
5 changes: 0 additions & 5 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,11 +881,6 @@ func (p *basePlan) SCtx() sessionctx.Context {
return p.ctx
}

// SetSCtx Context implements Plan Set Context interface.
func (p *basePlan) SetSCtx(ctx sessionctx.Context) {
p.ctx = ctx
}

// buildPlanTrace implements Plan
func (p *basePhysicalPlan) buildPlanTrace() *tracing.PlanTrace {
planTrace := &tracing.PlanTrace{ID: p.ID(), TP: p.self.TP(), ExplainInfo: p.self.ExplainInfo()}
Expand Down
19 changes: 0 additions & 19 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ import (
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -1573,18 +1572,13 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
if explain, ok := p.(*plannercore.Explain); ok && explain.Analyze && explain.TargetPlan != nil {
p = explain.TargetPlan
}
canExplainAnalyze := false
if _, ok := p.(plannercore.PhysicalPlan); ok {
canExplainAnalyze = true
}
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
DB: s.sessionVars.CurrentDB,
Command: command,
Plan: p,
PlanExplainRows: plannercore.GetExplainRowsForPlan(p),
CurrentAnalyzeRows: s.getCurrentAnalyzePlan,
RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl,
Time: t,
State: s.Status(),
Expand All @@ -1597,7 +1591,6 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(),
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
CanExplainAnalyze: canExplainAnalyze,
}
oldPi := s.ShowProcess()
if p == nil {
Expand All @@ -1607,7 +1600,6 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
pi.Plan = oldPi.Plan
pi.PlanExplainRows = oldPi.PlanExplainRows
pi.RuntimeStatsColl = oldPi.RuntimeStatsColl
_, pi.CanExplainAnalyze = pi.Plan.(plannercore.PhysicalPlan)
}
}
// We set process info before building plan, so we extended execution time.
Expand Down Expand Up @@ -1636,17 +1628,6 @@ func (s *session) getOomAlarmVariablesInfo() util.OOMAlarmVariablesInfo {
}
}

func (s *session) getCurrentAnalyzePlan(p interface{}, runtimeStatsColl *execdetails.RuntimeStatsColl) [][]string {
explain := &plannercore.Explain{
TargetPlan: p.(plannercore.Plan),
Format: types.ExplainFormatROW,
Analyze: false,
RuntimeStatsColl: runtimeStatsColl,
}
explain.SetSCtx(s)
return plannercore.GetExplainAnalyzeRowsForPlan(explain)
}

func (s *session) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {
s.diskFullOpt = level
}
Expand Down
25 changes: 12 additions & 13 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ import (

// ExecDetails contains execution detail information.
type ExecDetails struct {
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
CommitDetail *util.CommitDetails
LockKeysDetail *util.LockKeysDetails
ScanDetail *util.ScanDetail
BackoffSleep map[string]time.Duration
BackoffTimes map[string]int
CalleeAddress string
TimeDetail util.TimeDetail
RequestCount int
CopTime time.Duration
BackoffTime time.Duration
LockKeysDuration time.Duration
RequestCount int
}

type stmtExecDetailKeyType struct{}
Expand Down Expand Up @@ -368,36 +368,33 @@ type CopRuntimeStats struct {
scanDetail *util.ScanDetail
// do not use kv.StoreType because it will meet cycle import error
storeType string
// count CopRuntimeStats total rows
totalRows int64
sync.Mutex
}

// RecordOneCopTask records a specific cop tasks's execution detail.
func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.ExecutorExecutionSummary) {
crs.Lock()
defer crs.Unlock()
currentRows := int64(*summary.NumProducedRows)
crs.totalRows += currentRows
crs.stats[address] = append(crs.stats[address],
&basicCopRuntimeStats{BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations),
consume: int64(*summary.TimeProcessedNs),
rows: currentRows},
rows: int64(*summary.NumProducedRows)},
threads: int32(summary.GetConcurrency()),
storeType: crs.storeType})
}

// GetActRows return total rows of CopRuntimeStats.
func (crs *CopRuntimeStats) GetActRows() (totalRows int64) {
crs.Lock()
defer crs.Unlock()
return crs.totalRows
for _, instanceStats := range crs.stats {
for _, stat := range instanceStats {
totalRows += stat.rows
}
}
return totalRows
}

// MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information.
func (crs *CopRuntimeStats) MergeBasicStats() (procTimes []time.Duration, totalTime time.Duration, totalTasks, totalLoops, totalThreads int32) {
crs.Lock()
defer crs.Unlock()
procTimes = make([]time.Duration, 0, 32)
for _, instanceStats := range crs.stats {
for _, stat := range instanceStats {
Expand Down Expand Up @@ -788,7 +785,9 @@ func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo {

// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo.
type RuntimeStatsWithConcurrencyInfo struct {
// executor concurrency information
concurrency []*ConcurrencyInfo
// protect concurrency
sync.Mutex
}

Expand Down
14 changes: 0 additions & 14 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,20 +576,6 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker {
return nil
}

// SearchTrackerWithLock searches the specific tracker under this tracker with lock.
func (t *Tracker) SearchTrackerWithLock(label int) *Tracker {
t.mu.Lock()
defer t.mu.Unlock()
if t.label == label {
return t
}
children := t.mu.children[label]
if len(children) > 0 {
return children[0]
}
return nil
}

// SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes.
func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) {
t.mu.Lock()
Expand Down
1 change: 0 additions & 1 deletion util/memoryusagealarm/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_test(
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//util",
"//util/execdetails",
"//util/memory",
"@com_github_stretchr_testify//assert",
],
Expand Down
12 changes: 6 additions & 6 deletions util/memoryusagealarm/memoryusagealarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,12 @@ func (record *memoryUsageAlarm) tryRemoveRedundantRecords() {
}
}

func getCurrentAnalyzePlan(info *util.ProcessInfo) string {
func getPlanString(info *util.ProcessInfo) string {
var buf strings.Builder
rows := info.CurrentAnalyzeRows(info.Plan, info.RuntimeStatsColl)
buf.WriteString(fmt.Sprintf("|%v|%v|%v|%v|%v|%v|%v|%v|%v|", "id", "estRows", "actRows", "task", "access object", "execution info", "operator info", "memory", "disk"))
rows := info.PlanExplainRows
buf.WriteString(fmt.Sprintf("|%v|%v|%v|%v|%v|", "id", "estRows", "task", "access object", "operator info"))
for _, row := range rows {
buf.WriteString(fmt.Sprintf("\n|%v|%v|%v|%v|%v|%v|%v|%v|%v|", row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8]))
buf.WriteString(fmt.Sprintf("\n|%v|%v|%v|%v|%v|", row[0], row[1], row[2], row[3], row[4]))
}
return buf.String()
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *util.ProcessInfo)
fields = append(fields, zap.Int64("tidb_mem_quota_query", info.OOMAlarmVariablesInfo.SessionMemQuotaQuery))
fields = append(fields, zap.Int("tidb_analyze_version", info.OOMAlarmVariablesInfo.SessionAnalyzeVersion))
fields = append(fields, zap.Bool("tidb_enable_rate_limit_action", info.OOMAlarmVariablesInfo.SessionEnabledRateLimitAction))
fields = append(fields, zap.String("current_analyze_plan", getCurrentAnalyzePlan(info)))
fields = append(fields, zap.String("current_analyze_plan", getPlanString(info)))
for _, field := range fields {
switch field.Type {
case zapcore.StringType:
Expand Down Expand Up @@ -323,7 +323,7 @@ func (record *memoryUsageAlarm) recordSQL(sm util.SessionManager, recordDir stri
processInfo := sm.ShowProcessList()
pinfo := make([]*util.ProcessInfo, 0, len(processInfo))
for _, info := range processInfo {
if len(info.Info) != 0 && info.CanExplainAnalyze {
if len(info.Info) != 0 {
pinfo = append(pinfo, info)
}
}
Expand Down
Loading

0 comments on commit cf36a9c

Please sign in to comment.