Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: record more rpc runtime information in cop runt ... (#18916) #19264

Merged
merged 13 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -422,11 +421,6 @@ func (r *mockResultSubset) GetData() []byte { return r.data }
// GetStartKey implements kv.ResultSubset interface.
func (r *mockResultSubset) GetStartKey() kv.Key { return nil }

// GetExecDetails implements kv.ResultSubset interface.
func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
return &execdetails.ExecDetails{}
}

// MemSize implements kv.ResultSubset interface.
func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) }

Expand Down
142 changes: 134 additions & 8 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
package distsql

import (
"bytes"
"context"
"fmt"
"sort"
"strconv"
"sync/atomic"
"time"

Expand All @@ -25,6 +29,8 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -82,6 +88,8 @@ type selectResult struct {
fetchDuration time.Duration
durationReported bool
memTracker *memory.Tracker

stats *selectResultRuntimeStats
}

func (r *selectResult) Fetch(ctx context.Context) {
Expand Down Expand Up @@ -129,14 +137,18 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
for _, warning := range r.selectResp.Warnings {
sc.AppendWarning(terror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg))
}
resultDetail := resultSubset.GetExecDetails()
r.updateCopRuntimeStats(ctx, resultDetail, resultSubset.RespTime())
r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
if resultDetail != nil {
resultDetail.CopTime = duration

hasStats, ok := resultSubset.(CopRuntimeStats)
if ok {
copStats := hasStats.GetCopRuntimeStats()
if copStats != nil {
r.updateCopRuntimeStats(ctx, copStats, resultSubset.RespTime())
copStats.CopTime = duration
sc.MergeExecDetails(&copStats.ExecDetails, nil)
}
}
sc.MergeExecDetails(resultDetail, nil)
if len(r.selectResp.Chunks) != 0 {
break
}
Expand Down Expand Up @@ -232,8 +244,8 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro
return nil
}

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execdetails.ExecDetails, respTime time.Duration) {
callee := detail.CalleeAddress
func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) {
callee := copStats.CalleeAddress
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}
Expand All @@ -244,8 +256,19 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execde

return
}
if r.stats == nil {
stmtCtx := r.ctx.GetSessionVars().StmtCtx
id := r.rootPlanID
originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id)
r.stats = &selectResultRuntimeStats{
RuntimeStats: originRuntimeStats,
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats)
}
r.stats.mergeCopRuntimeStats(copStats, respTime)

r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID, respTime, detail)
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
Expand Down Expand Up @@ -289,3 +312,106 @@ func (r *selectResult) Close() error {
}
return r.resp.Close()
}

// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats.
type CopRuntimeStats interface {
// GetCopRuntimeStats gets the cop runtime stats information.
GetCopRuntimeStats() *tikv.CopRuntimeStats
}

type selectResultRuntimeStats struct {
execdetails.RuntimeStats
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) {
s.copRespTime = append(s.copRespTime, respTime)
s.procKeys = append(s.procKeys, copStats.ProcessedKeys)

for k, v := range copStats.BackoffSleep {
s.backoffSleep[k] += v
}
s.totalProcessTime += copStats.ProcessTime
s.totalWaitTime += copStats.WaitTime
s.rpcStat.Merge(copStats.RegionRequestRuntimeStats)
}

func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
if s.RuntimeStats != nil {
buf.WriteString(s.RuntimeStats.String())
}
if len(s.copRespTime) > 0 {
size := len(s.copRespTime)
buf.WriteString(", ")
if size == 1 {
buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0]))
} else {
sort.Slice(s.copRespTime, func(i, j int) bool {
return s.copRespTime[i] < s.copRespTime[j]
})
vMax, vMin := s.copRespTime[size-1], s.copRespTime[0]
vP95 := s.copRespTime[size*19/20]
sum := 0.0
for _, t := range s.copRespTime {
sum += float64(t)
}
vAvg := time.Duration(sum / float64(size))

sort.Slice(s.procKeys, func(i, j int) bool {
return s.procKeys[i] < s.procKeys[j]
})
keyMax := s.procKeys[size-1]
keyP95 := s.procKeys[size*19/20]
buf.WriteString(fmt.Sprintf("cop_task: {num: %v, max: %v, min: %v, avg: %v, p95: %v", size, vMax, vMin, vAvg, vP95))
if keyMax > 0 {
buf.WriteString(", max_proc_keys: ")
buf.WriteString(strconv.FormatInt(keyMax, 10))
buf.WriteString(", p95_proc_keys: ")
buf.WriteString(strconv.FormatInt(keyP95, 10))
}
if s.totalProcessTime > 0 {
buf.WriteString(", tot_proc: ")
buf.WriteString(s.totalProcessTime.String())
if s.totalWaitTime > 0 {
buf.WriteString(", tot_wait: ")
buf.WriteString(s.totalWaitTime.String())
}
}
}
}
copRPC := s.rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
delete(s.rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(time.Duration(copRPC.Consume).String())
}
buf.WriteString("}")

rpcStatsStr := s.rpcStat.String()
if len(rpcStatsStr) > 0 {
buf.WriteString(", ")
buf.WriteString(rpcStatsStr)
}

if len(s.backoffSleep) > 0 {
buf.WriteString(", backoff{")
idx := 0
for k, d := range s.backoffSleep {
if idx > 0 {
buf.WriteString(", ")
}
idx++
buf.WriteString(fmt.Sprintf("%s: %s", k, d.String()))
}
buf.WriteString("}")
}
return buf.String()
}
7 changes: 4 additions & 3 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -29,7 +30,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
sr := selectResult{ctx: ctx}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "a"}, 0)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
t := uint64(1)
Expand All @@ -39,12 +40,12 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
},
}
c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue)
sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234), IsFalse)

sr.copPlanIDs = []int{sr.rootPlanID}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats(1234).String(), Equals, "time:1ns, loops:1")
}
12 changes: 8 additions & 4 deletions distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,15 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons
}
r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts)
r.partialCount++
resultDetail := resultSubset.GetExecDetails()
if resultDetail != nil {
resultDetail.CopTime = duration

hasStats, ok := resultSubset.(CopRuntimeStats)
if ok {
copStats := hasStats.GetCopRuntimeStats()
if copStats != nil {
copStats.CopTime = duration
r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil)
}
}
r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(resultDetail, nil)
return false, nil
}

Expand Down
3 changes: 0 additions & 3 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -341,8 +340,6 @@ type ResultSubset interface {
GetData() []byte
// GetStartKey gets the start key.
GetStartKey() Key
// GetExecDetails gets the detail information.
GetExecDetails() *execdetails.ExecDetails
// MemSize returns how many bytes of memory this result use for tracing memory usage.
MemSize() int64
// RespTime returns the response time for the request.
Expand Down
6 changes: 0 additions & 6 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,12 +1030,6 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan) (actRows, analyzeInfo, memor
analyzeInfo = "time:0ns, loops:0"
actRows = "0"
}
switch p.(type) {
case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader:
if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 {
analyzeInfo += ", " + s.String()
}
}

memoryInfo = "N/A"
memTracker := ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID())
Expand Down
12 changes: 4 additions & 8 deletions store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
Expand All @@ -42,7 +41,7 @@ type batchCopTask struct {

type batchCopResponse struct {
pbResp *coprocessor.BatchResponse
detail *execdetails.ExecDetails
detail *CopRuntimeStats

// batch Cop Response is yet to return startKey. So batchCop cannot retry partially.
startKey kv.Key
Expand All @@ -63,7 +62,7 @@ func (rs *batchCopResponse) GetStartKey() kv.Key {

// GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop.
// TODO: Will fix in near future.
func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails {
func (rs *batchCopResponse) GetCopRuntimeStats() *CopRuntimeStats {
return rs.detail
}

Expand All @@ -77,9 +76,6 @@ func (rs *batchCopResponse) MemSize() int64 {
rs.respSize += int64(cap(rs.startKey))
if rs.detail != nil {
rs.respSize += int64(sizeofExecDetails)
if rs.detail.CommitDetail != nil {
rs.respSize += int64(sizeofCommitDetails)
}
}
if rs.pbResp != nil {
// Using a approximate size since it's hard to get a accurate value.
Expand Down Expand Up @@ -304,7 +300,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *
for idx := 0; idx < len(tasks); idx++ {
ret, err := b.handleTaskOnce(ctx, bo, tasks[idx])
if err != nil {
resp := &batchCopResponse{err: errors.Trace(err), detail: new(execdetails.ExecDetails)}
resp := &batchCopResponse{err: errors.Trace(err), detail: new(CopRuntimeStats)}
b.sendToRespCh(resp)
break
}
Expand Down Expand Up @@ -415,7 +411,7 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro

resp := batchCopResponse{
pbResp: response,
detail: new(execdetails.ExecDetails),
detail: new(CopRuntimeStats),
}

resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
Expand Down
Loading