Skip to content

Commit

Permalink
executor: show more information about cop tasks in slow log (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Apr 23, 2019
1 parent 7d9e25d commit 9e28c79
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 4 deletions.
5 changes: 3 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,14 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
indexIDs = strings.Replace(fmt.Sprintf("%v", a.Ctx.GetSessionVars().StmtCtx.IndexIDs), " ", ",", -1)
}
execDetail := sessVars.StmtCtx.GetExecDetails()
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := a.getStatsInfo()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, sql))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, sql))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, sql))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
48 changes: 48 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package stmtctx

import (
"math"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -74,6 +75,7 @@ type StatementContext struct {
warnings []SQLWarn
histogramsNotLoad bool
execDetails execdetails.ExecDetails
allExecDetails []*execdetails.ExecDetails
}
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64
Expand Down Expand Up @@ -273,6 +275,8 @@ func (sc *StatementContext) ResetForRetry() {
sc.mu.affectedRows = 0
sc.mu.foundRows = 0
sc.mu.warnings = nil
sc.mu.execDetails = execdetails.ExecDetails{}
sc.mu.allExecDetails = make([]*execdetails.ExecDetails, 0, 4)
sc.mu.Unlock()
sc.TableIDs = sc.TableIDs[:0]
sc.IndexIDs = sc.IndexIDs[:0]
Expand All @@ -289,6 +293,7 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
sc.mu.allExecDetails = append(sc.mu.allExecDetails, details)
}
sc.mu.execDetails.CommitDetail = commitDetails
sc.mu.Unlock()
Expand Down Expand Up @@ -320,3 +325,46 @@ func (sc *StatementContext) ShouldIgnoreOverflowError() bool {
}
return false
}

// CopTasksDetails returns some useful information of cop-tasks during execution.
func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
sc.mu.Lock()
defer sc.mu.Unlock()
n := len(sc.mu.allExecDetails)
d := &CopTasksDetails{NumCopTasks: n}
if n == 0 {
return d
}
d.AvgProcessTime = sc.mu.execDetails.ProcessTime / time.Duration(n)
d.AvgWaitTime = sc.mu.execDetails.WaitTime / time.Duration(n)

sort.Slice(sc.mu.allExecDetails, func(i, j int) bool {
return sc.mu.allExecDetails[i].ProcessTime < sc.mu.allExecDetails[j].ProcessTime
})
d.P90ProcessTime = sc.mu.allExecDetails[n*9/10].ProcessTime
d.MaxProcessTime = sc.mu.allExecDetails[n-1].ProcessTime
d.MaxProcessAddress = sc.mu.allExecDetails[n-1].CalleeAddress

sort.Slice(sc.mu.allExecDetails, func(i, j int) bool {
return sc.mu.allExecDetails[i].WaitTime < sc.mu.allExecDetails[j].WaitTime
})
d.P90WaitTime = sc.mu.allExecDetails[n*9/10].WaitTime
d.MaxWaitTime = sc.mu.allExecDetails[n-1].WaitTime
d.MaxWaitAddress = sc.mu.allExecDetails[n-1].CalleeAddress
return d
}

//CopTasksDetails collects some useful information of cop-tasks during execution.
type CopTasksDetails struct {
NumCopTasks int

AvgProcessTime time.Duration
P90ProcessTime time.Duration
MaxProcessAddress string
MaxProcessTime time.Duration

AvgWaitTime time.Duration
P90WaitTime time.Duration
MaxWaitAddress string
MaxWaitTime time.Duration
}
46 changes: 46 additions & 0 deletions sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package stmtctx

import (
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/util/execdetails"
)

func TestCopTasksDetails(t *testing.T) {
ctx := new(StatementContext)
for i := 0; i < 100; i++ {
d := &execdetails.ExecDetails{
CalleeAddress: fmt.Sprintf("%v", i+1),
ProcessTime: time.Second * time.Duration(i+1),
WaitTime: time.Millisecond * time.Duration(i+1),
}
ctx.MergeExecDetails(d, nil)
}
c := ctx.CopTasksDetails()
if c.NumCopTasks != 100 ||
c.AvgProcessTime != time.Second*101/2 ||
c.P90ProcessTime != time.Second*91 ||
c.MaxProcessTime != time.Second*100 ||
c.MaxProcessAddress != "100" ||
c.AvgWaitTime != time.Millisecond*101/2 ||
c.P90WaitTime != time.Millisecond*91 ||
c.MaxWaitTime != time.Millisecond*100 ||
c.MaxWaitAddress != "100" {
t.Fatal(c)
}
}
19 changes: 18 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,12 @@ const (
SlowLogQuerySQLStr = "Query" // use for slow log table, slow log will not print this field name but print sql directly.
// SlowLogStatsInfoStr is plan stats info.
SlowLogStatsInfoStr = "Stats"
// SlowLogNumCopTasksStr is the number of cop-tasks.
SlowLogNumCopTasksStr = "Num_cop_tasks"
// SlowLogCopProcessStr includes some useful information about cop-tasks' process time.
SlowLogCopProcessStr = "Cop_process"
// SlowLogCopWaitStr includes some useful information about cop-tasks' wait time.
SlowLogCopWaitStr = "Cop_wait"
)

// SlowLogFormat uses for formatting slow log.
Expand All @@ -799,8 +805,10 @@ const (
// # Is_internal: false
// # Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
// # Stats: t1:1,t2:2
// # Cop_tasks:
// select * from t_slim;
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string, statsInfos map[string]uint64, sql string) string {
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string,
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, sql string) string {
var buf bytes.Buffer
execDetailStr := execDetail.String()
buf.WriteString(SlowLogPrefixStr + SlowLogTxnStartTSStr + SlowLogSpaceMarkStr + strconv.FormatUint(txnTS, 10) + "\n")
Expand Down Expand Up @@ -844,6 +852,15 @@ func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDe
}
buf.WriteString("\n")
}
if copTasks != nil {
buf.WriteString(SlowLogRowPrefixStr + SlowLogNumCopTasksStr + SlowLogSpaceMarkStr + strconv.FormatInt(int64(copTasks.NumCopTasks), 10) + "\n")
buf.WriteString(SlowLogRowPrefixStr + SlowLogCopProcessStr + SlowLogSpaceMarkStr +
fmt.Sprintf("Avg_time: %v P90_time: %v Max_time: %v Max_addr: %v", copTasks.AvgProcessTime,
copTasks.P90ProcessTime, copTasks.MaxProcessTime, copTasks.MaxProcessAddress) + "\n")
buf.WriteString(SlowLogRowPrefixStr + SlowLogCopWaitStr + SlowLogSpaceMarkStr +
fmt.Sprintf("Avg_time: %v P90_time: %v Max_time: %v Max_Addr: %v", copTasks.AvgWaitTime,
copTasks.P90WaitTime, copTasks.MaxWaitTime, copTasks.MaxWaitAddress) + "\n")
}
if len(sql) == 0 {
sql = ";"
}
Expand Down
17 changes: 16 additions & 1 deletion sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
)
Expand Down Expand Up @@ -78,6 +79,17 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
}
statsInfos := make(map[string]uint64)
statsInfos["t1"] = 0
copTasks := &stmtctx.CopTasksDetails{
NumCopTasks: 10,
AvgProcessTime: time.Second,
P90ProcessTime: time.Second * 2,
MaxProcessAddress: "10.6.131.78",
MaxProcessTime: time.Second * 3,
AvgWaitTime: time.Millisecond * 10,
P90WaitTime: time.Millisecond * 20,
MaxWaitTime: time.Millisecond * 30,
MaxWaitAddress: "10.6.131.79",
}
resultString := `# Txn_start_ts: 406649736972468225
# User: root@192.168.0.1
# Conn_ID: 1
Expand All @@ -88,9 +100,12 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Is_internal: true
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Stats: t1:pseudo
# Num_cop_tasks: 10
# Cop_process: Avg_time: 1s P90_time: 2s Max_time: 3s Max_addr: 10.6.131.78
# Cop_wait: Avg_time: 10ms P90_time: 20ms Max_time: 30ms Max_Addr: 10.6.131.79
select * from t;`
sql := "select * from t"
digest := parser.DigestHash(sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, sql)
c.Assert(logString, Equals, resultString)
}

0 comments on commit 9e28c79

Please sign in to comment.