Skip to content

Commit

Permalink
executor: fix issue that some query execution stats was omitted when …
Browse files Browse the repository at this point in the history
…execution was interrupted (#51787)

close #51660
  • Loading branch information
crazycs520 authored Mar 18, 2024
1 parent 5a2b8e8 commit 7056209
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 11 deletions.
9 changes: 8 additions & 1 deletion pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func recordExecutionSummariesForTiFlashTasks(sctx *stmtctx.StatementContext, exe

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) (err error) {
callee := copStats.CalleeAddress
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || (callee == "" && len(copStats.Stats) == 0) {
return
}

Expand Down Expand Up @@ -603,6 +603,13 @@ func (r *selectResult) Close() error {
if respSize > 0 {
r.memConsume(-respSize)
}
if unconsumed, ok := r.resp.(copr.HasUnconsumedCopRuntimeStats); ok && unconsumed != nil {
unconsumedCopStats := unconsumed.CollectUnconsumedCopRuntimeStats()
for _, copStats := range unconsumedCopStats {
_ = r.updateCopRuntimeStats(context.Background(), copStats, time.Duration(0))
r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil)
}
}
if r.stats != nil {
defer func() {
if ci, ok := r.resp.(copr.CopInfo); ok {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/context",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/plancodec",
"//pkg/util/replayer",
"//pkg/util/sqlkiller",
"//pkg/util/syncutil",
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/pkg/util/arena"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/plancodec"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
Expand Down Expand Up @@ -720,6 +721,16 @@ func TestConnExecutionTimeout(t *testing.T) {
tk.MustQuery("select SLEEP(1);").Check(testkit.Rows("0"))
err := tk.QueryToErr("select * FROM testTable2 WHERE SLEEP(1);")
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
// Test executor stats when execution time exceeded.
tk.MustExec("set @@tidb_slow_log_threshold=300")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep", `return(150)`))
err = tk.QueryToErr("select /*+ max_execution_time(600), set_var(tikv_client_read_timeout=100) */ * from testTable2")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep"))
require.Error(t, err)
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
planInfo, err := plancodec.DecodePlan(tk.Session().GetSessionVars().StmtCtx.GetEncodedPlan())
require.NoError(t, err)
require.Regexp(t, "TableReader.*cop_task: {num: .*, rpc_num: .*, rpc_time: .*", planInfo)

// Killed because of max execution time, reset Killed to 0.
tk.Session().GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.MaxExecTimeExceeded)
Expand Down
66 changes: 56 additions & 10 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,8 @@ type copIterator struct {
storeBatchedNum atomic.Uint64
storeBatchedFallbackNum atomic.Uint64

runawayChecker *resourcegroup.RunawayChecker
runawayChecker *resourcegroup.RunawayChecker
unconsumedStats *unconsumedCopRuntimeStats
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
Expand All @@ -723,6 +724,7 @@ type copIteratorWorker struct {

storeBatchedNum *atomic.Uint64
storeBatchedFallbackNum *atomic.Uint64
unconsumedStats *unconsumedCopRuntimeStats
}

// copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit.
Expand Down Expand Up @@ -833,6 +835,7 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableCollectExecutionInfo bool) {
taskCh := make(chan *copTask, 1)
smallTaskCh := make(chan *copTask, 1)
it.unconsumedStats = &unconsumedCopRuntimeStats{}
it.wg.Add(it.concurrency + it.smallTaskConcurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency+it.smallTaskConcurrency; i++ {
Expand All @@ -857,6 +860,7 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC
pagingTaskIdx: &it.pagingTaskIdx,
storeBatchedNum: &it.storeBatchedNum,
storeBatchedFallbackNum: &it.storeBatchedFallbackNum,
unconsumedStats: it.unconsumedStats,
}
go worker.run(ctx)
}
Expand Down Expand Up @@ -1096,6 +1100,23 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
return resp, nil
}

// HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats.
type HasUnconsumedCopRuntimeStats interface {
// CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats.
CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats
}

func (it *copIterator) CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats {
if it == nil || it.unconsumedStats == nil {
return nil
}
it.unconsumedStats.Lock()
stats := make([]*CopRuntimeStats, 0, len(it.unconsumedStats.stats))
stats = append(stats, it.unconsumedStats.stats...)
it.unconsumedStats.Unlock()
return stats
}

// Associate each region with an independent backoffer. In this way, when multiple regions are
// unavailable, TiDB can execute very quickly without blocking
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer {
Expand Down Expand Up @@ -1261,6 +1282,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
err = worker.handleTiDBSendReqErr(err, task, ch)
return nil, err
}
worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx)
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -1758,17 +1780,24 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
}
resp.detail.Stats = worker.kvclient.Stats
worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp)
}

func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) {
copStats.Stats = worker.kvclient.Stats
backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
copStats.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
copStats.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
copStats.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
copStats.BackoffTimes[backoff] = backoffTimes[backoff]
copStats.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
copStats.CalleeAddress = rpcCtx.Addr
}
if resp == nil {
return
}
sd := &util.ScanDetail{}
td := util.TimeDetail{}
Expand All @@ -1791,8 +1820,20 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
}
}
}
resp.detail.ScanDetail = sd
resp.detail.TimeDetail = td
copStats.ScanDetail = sd
copStats.TimeDetail = td
}

func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) {
if worker.kvclient.Stats == nil {
return
}
copStats := &CopRuntimeStats{}
worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil)
worker.unconsumedStats.Lock()
worker.unconsumedStats.stats = append(worker.unconsumedStats.stats, copStats)
worker.unconsumedStats.Unlock()
worker.kvclient.Stats = nil
}

// CopRuntimeStats contains execution detail information.
Expand All @@ -1803,6 +1844,11 @@ type CopRuntimeStats struct {
CoprCacheHit bool
}

type unconsumedCopRuntimeStats struct {
sync.Mutex
stats []*CopRuntimeStats
}

func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error {
errCode := errno.ErrUnknown
errMsg := err.Error()
Expand Down
4 changes: 4 additions & 0 deletions pkg/store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
}
})
failpoint.Inject("unistoreRPCSlowByInjestSleep", func(val failpoint.Value) {
time.Sleep(time.Duration(val.(int) * int(time.Millisecond)))
failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
})

select {
case <-ctx.Done():
Expand Down

0 comments on commit 7056209

Please sign in to comment.