Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: show more information about cop tasks in slow log #10165

Merged
merged 8 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,14 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
indexIDs = strings.Replace(fmt.Sprintf("%v", a.Ctx.GetSessionVars().StmtCtx.IndexIDs), " ", ",", -1)
}
execDetail := sessVars.StmtCtx.GetExecDetails()
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := a.getStatsInfo()
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, sql))
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, sql))
} else {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, sql))
logutil.SlowQueryLogger.Warn(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
Expand Down
56 changes: 56 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package stmtctx

import (
"fmt"
"math"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -95,6 +97,7 @@ type StatementContext struct {
warnings []SQLWarn
histogramsNotLoad bool
execDetails execdetails.ExecDetails
allExecDetails []*execdetails.ExecDetails
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
}
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64
Expand Down Expand Up @@ -392,6 +395,7 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
sc.mu.allExecDetails = append(sc.mu.allExecDetails, details)
}
sc.mu.execDetails.CommitDetail = commitDetails
sc.mu.Unlock()
Expand Down Expand Up @@ -423,3 +427,55 @@ func (sc *StatementContext) ShouldIgnoreOverflowError() bool {
}
return false
}

// CopTasksDetails returns some useful information of cop-tasks during execution.
func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
sc.mu.Lock()
defer sc.mu.Unlock()
n := len(sc.mu.allExecDetails)
d := &CopTasksDetails{NumCopTasks: n}
if n == 0 {
return d
}
d.AvgProcessTime = sc.mu.execDetails.ProcessTime / time.Duration(n)
d.AvgWaitTime = sc.mu.execDetails.WaitTime / time.Duration(n)

sort.Slice(sc.mu.allExecDetails, func(i, j int) bool {
return sc.mu.allExecDetails[i].ProcessTime < sc.mu.allExecDetails[j].ProcessTime
})
d.P90ProcessTime = sc.mu.allExecDetails[n*9/10].ProcessTime
d.MaxProcessTime = sc.mu.allExecDetails[n-1].ProcessTime
d.MaxProcessAddress = sc.mu.allExecDetails[n-1].CalleeAddress

sort.Slice(sc.mu.allExecDetails, func(i, j int) bool {
return sc.mu.allExecDetails[i].WaitTime < sc.mu.allExecDetails[j].WaitTime
})
d.P90WaitTime = sc.mu.allExecDetails[n*9/10].WaitTime
d.MaxWaitTime = sc.mu.allExecDetails[n-1].WaitTime
d.MaxWaitAddress = sc.mu.allExecDetails[n-1].CalleeAddress
return d
}

//CopTasksDetails collects some useful information of cop-tasks during execution.
type CopTasksDetails struct {
NumCopTasks int

AvgProcessTime time.Duration
P90ProcessTime time.Duration
MaxProcessAddress string
MaxProcessTime time.Duration

AvgWaitTime time.Duration
P90WaitTime time.Duration
MaxWaitAddress string
MaxWaitTime time.Duration
}

// String implements the fmt.Stringer interface.
func (d CopTasksDetails) String() string {
// formatted like slow log
return fmt.Sprintf("Num_tasks: %d Avg_process_time %v P90_process_time %v Max_process_time %v"+
"Max_process_address %s Avg_wait_time %v P90_wait_time %v Max_wait_time %v Max_wait_address %s",
d.NumCopTasks, d.AvgProcessTime, d.P90ProcessTime, d.MaxProcessTime, d.MaxProcessAddress,
d.AvgWaitTime, d.P90WaitTime, d.MaxWaitTime, d.MaxWaitAddress)
}
46 changes: 46 additions & 0 deletions sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package stmtctx

import (
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/util/execdetails"
)

func TestCopTasksDetails(t *testing.T) {
ctx := new(StatementContext)
for i := 0; i < 100; i++ {
d := &execdetails.ExecDetails{
CalleeAddress: fmt.Sprintf("%v", i+1),
ProcessTime: time.Second * time.Duration(i+1),
WaitTime: time.Millisecond * time.Duration(i+1),
}
ctx.MergeExecDetails(d, nil)
}
c := ctx.CopTasksDetails()
if c.NumCopTasks != 100 ||
c.AvgProcessTime != time.Second*101/2 ||
c.P90ProcessTime != time.Second*91 ||
c.MaxProcessTime != time.Second*100 ||
c.MaxProcessAddress != "100" ||
c.AvgWaitTime != time.Millisecond*101/2 ||
c.P90WaitTime != time.Millisecond*91 ||
c.MaxWaitTime != time.Millisecond*100 ||
c.MaxWaitAddress != "100" {
t.Fatal(c)
}
}
9 changes: 8 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,8 @@ const (
SlowLogQuerySQLStr = "Query" // use for slow log table, slow log will not print this field name but print sql directly.
// SlowLogStatsInfoStr is plan stats info.
SlowLogStatsInfoStr = "Stats"
// SlowLogCopTasks includes some useful information about cop-tasks.
SlowLogCopTasks = "Cop_tasks"
)

// SlowLogFormat uses for formatting slow log.
Expand All @@ -912,8 +914,10 @@ const (
// # Is_internal: false
// # Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
// # Stats: t1:1,t2:2
// # Cop_tasks:
// select * from t_slim;
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string, statsInfos map[string]uint64, sql string) string {
func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDetail execdetails.ExecDetails, indexIDs string, digest string,
statsInfos map[string]uint64, copTasks *stmtctx.CopTasksDetails, sql string) string {
var buf bytes.Buffer
execDetailStr := execDetail.String()
buf.WriteString(SlowLogRowPrefixStr + SlowLogTxnStartTSStr + SlowLogSpaceMarkStr + strconv.FormatUint(txnTS, 10) + "\n")
Expand Down Expand Up @@ -957,6 +961,9 @@ func (s *SessionVars) SlowLogFormat(txnTS uint64, costTime time.Duration, execDe
}
buf.WriteString("\n")
}
if copTasks != nil {
buf.WriteString(SlowLogRowPrefixStr + SlowLogCopTasks + SlowLogSpaceMarkStr + copTasks.String() + "\n")
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
}
if len(sql) == 0 {
sql = ";"
}
Expand Down
15 changes: 14 additions & 1 deletion sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
)
Expand Down Expand Up @@ -106,6 +107,17 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
}
statsInfos := make(map[string]uint64)
statsInfos["t1"] = 0
copTasks := &stmtctx.CopTasksDetails{
NumCopTasks: 10,
AvgProcessTime: time.Second,
P90ProcessTime: time.Second * 2,
MaxProcessAddress: "10.6.131.78",
MaxProcessTime: time.Second * 3,
AvgWaitTime: time.Millisecond * 10,
P90WaitTime: time.Millisecond * 20,
MaxWaitTime: time.Millisecond * 30,
MaxWaitAddress: "10.6.131.79",
}
resultString := `# Txn_start_ts: 406649736972468225
# User: root@192.168.0.1
# Conn_ID: 1
Expand All @@ -116,9 +128,10 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
# Is_internal: true
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
# Stats: t1:pseudo
# Cop_tasks: Num_tasks: 10 Avg_process_time 1s P90_process_time 2s Max_process_time 3sMax_process_address 10.6.131.78 Avg_wait_time 10ms P90_wait_time 20ms Max_wait_time 30ms Max_wait_address 10.6.131.79
select * from t;`
sql := "select * from t"
digest := parser.DigestHash(sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, sql)
logString := seVar.SlowLogFormat(txnTS, costTime, execDetail, "[1,2]", digest, statsInfos, copTasks, sql)
c.Assert(logString, Equals, resultString)
}