Skip to content

Commit

Permalink
executor: fix issue that some query execution stats was omitted when …
Browse files Browse the repository at this point in the history
…execution was interrupted (pingcap#51787)

close pingcap#51660

Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed May 16, 2024
1 parent a1b81e7 commit 9882f6a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 12 deletions.
9 changes: 8 additions & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) {
callee := copStats.CalleeAddress
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || (callee == "" && copStats.ReqStats == nil) {
return
}

Expand Down Expand Up @@ -452,6 +452,13 @@ func (r *selectResult) Close() error {
if respSize > 0 {
r.memConsume(-respSize)
}
if unconsumed, ok := r.resp.(copr.HasUnconsumedCopRuntimeStats); ok && unconsumed != nil {
unconsumedCopStats := unconsumed.CollectUnconsumedCopRuntimeStats()
for _, copStats := range unconsumedCopStats {
r.updateCopRuntimeStats(context.Background(), copStats, time.Duration(0))
r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil)
}
}
if r.stats != nil {
defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
}
Expand Down
12 changes: 12 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/plancodec"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/testutils"
Expand Down Expand Up @@ -761,6 +762,17 @@ func TestConnExecutionTimeout(t *testing.T) {

err = cc.handleQuery(context.Background(), "alter table testTable2 add index idx(age);")
require.NoError(t, err)

// Test executor stats when execution time exceeded.
tk.MustExec("set @@tidb_slow_log_threshold=300")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCSlowByInjestSleep", `return(150)`))
err = tk.QueryToErr("select /*+ max_execution_time(600), set_var(tikv_client_read_timeout=100) */ * from testTable2")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCSlowByInjestSleep"))
require.Error(t, err)
require.Equal(t, "[tikv:1317]Query execution was interrupted", err.Error())
planInfo, err := plancodec.DecodePlan(tk.Session().GetSessionVars().StmtCtx.GetEncodedPlan())
require.NoError(t, err)
require.Regexp(t, "TableReader.*cop_task: {num: .*.*num_rpc:.*", planInfo)
}

func TestShutDown(t *testing.T) {
Expand Down
68 changes: 57 additions & 11 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,9 @@ type copIterator struct {
resolvedLocks util.TSSet
committedLocks util.TSSet

actionOnExceed *rateLimitAction
pagingTaskIdx uint32
actionOnExceed *rateLimitAction
pagingTaskIdx uint32
unconsumedStats *unconsumedCopRuntimeStats
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
Expand All @@ -568,6 +569,7 @@ type copIteratorWorker struct {

enableCollectExecutionInfo bool
pagingTaskIdx *uint32
unconsumedStats *unconsumedCopRuntimeStats
}

// copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit.
Expand Down Expand Up @@ -676,6 +678,7 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableCollectExecutionInfo bool) {
taskCh := make(chan *copTask, 1)
smallTaskCh := make(chan *copTask, 1)
it.unconsumedStats = &unconsumedCopRuntimeStats{}
it.wg.Add(it.concurrency + it.smallTaskConcurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency+it.smallTaskConcurrency; i++ {
Expand All @@ -698,6 +701,7 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC
replicaReadSeed: it.replicaReadSeed,
enableCollectExecutionInfo: enableCollectExecutionInfo,
pagingTaskIdx: &it.pagingTaskIdx,
unconsumedStats: it.unconsumedStats,
}
go worker.run(ctx)
}
Expand Down Expand Up @@ -919,6 +923,23 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
return resp, nil
}

// HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats.
type HasUnconsumedCopRuntimeStats interface {
// CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats.
CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats
}

func (it *copIterator) CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats {
if it == nil || it.unconsumedStats == nil {
return nil
}
it.unconsumedStats.Lock()
stats := make([]*CopRuntimeStats, 0, len(it.unconsumedStats.stats))
stats = append(stats, it.unconsumedStats.stats...)
it.unconsumedStats.Unlock()
return stats
}

// Associate each region with an independent backoffer. In this way, when multiple regions are
// unavailable, TiDB can execute very quickly without blocking
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer {
Expand Down Expand Up @@ -1062,6 +1083,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
err = worker.handleTiDBSendReqErr(err, task, ch)
return nil, err
}
worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx)
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -1451,17 +1473,24 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
}
resp.detail.ReqStats = worker.kvclient.Stats
worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp)
}

func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) {
copStats.ReqStats = worker.kvclient.Stats
backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
copStats.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
copStats.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
copStats.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
copStats.BackoffTimes[backoff] = backoffTimes[backoff]
copStats.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
copStats.CalleeAddress = rpcCtx.Addr
}
if resp == nil {
return
}
sd := &util.ScanDetail{}
td := util.TimeDetail{}
Expand All @@ -1484,8 +1513,20 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
}
}
}
resp.detail.ScanDetail = sd
resp.detail.TimeDetail = td
copStats.ScanDetail = sd
copStats.TimeDetail = td
}

func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) {
if worker.kvclient.Stats == nil {
return
}
copStats := &CopRuntimeStats{}
worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil)
worker.unconsumedStats.Lock()
worker.unconsumedStats.stats = append(worker.unconsumedStats.stats, copStats)
worker.unconsumedStats.Unlock()
worker.kvclient.Stats = nil
}

// CopRuntimeStats contains execution detail information.
Expand All @@ -1496,6 +1537,11 @@ type CopRuntimeStats struct {
CoprCacheHit bool
}

type unconsumedCopRuntimeStats struct {
sync.Mutex
stats []*CopRuntimeStats
}

func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error {
errCode := errno.ErrUnknown
errMsg := err.Error()
Expand Down
4 changes: 4 additions & 0 deletions store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
}
})
failpoint.Inject("unistoreRPCSlowByInjestSleep", func(val failpoint.Value) {
time.Sleep(time.Duration(val.(int) * int(time.Millisecond)))
failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
})

select {
case <-ctx.Done():
Expand Down

0 comments on commit 9882f6a

Please sign in to comment.