diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 3b48ab30f4e5f..666aff138b7a5 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -709,7 +709,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) -func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, +func iterateSnapshotRows(ctx *jobContext, store kv.Storage, priority int, t table.Table, version uint64, startKey kv.Key, endKey kv.Key, fn recordIterFunc) error { var firstKey kv.Key if startKey == nil { @@ -728,6 +728,9 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version ver := kv.Version{Ver: version} snap := store.GetSnapshot(ver) snap.SetOption(kv.Priority, priority) + if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil { + snap.SetOption(kv.ResourceGroupTagger, tagger) + } it, err := snap.Iter(firstKey, upperBound) if err != nil { diff --git a/ddl/column.go b/ddl/column.go index 882543c34863f..462e892cdf731 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1009,7 +1009,7 @@ func (w *worker) doModifyColumnTypeWithData( return ver, errors.Trace(err) } - reorgInfo, err := getReorgInfo(d, t, job, tbl, BuildElements(changingCol, changingIdxs)) + reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs)) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1148,7 +1148,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column if err != nil { return errors.Trace(err) } - originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) + originalStartHandle, originalEndHandle, err := getTableRange(w.jobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) } @@ -1255,7 +1255,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg taskDone := false var lastAccessedHandle kv.Key oprStartTime := startTime - err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, + err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0) @@ -1392,7 +1392,9 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) - w.ddlWorker.setResourceGroupTaggerForTopSQL(txn) + if tagger := w.ddlWorker.getResourceGroupTaggerForTopSQL(); tagger != nil { + txn.SetOption(kv.ResourceGroupTagger, tagger) + } rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 4ea8ae4765fce..e440d33591d93 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -97,11 +97,11 @@ type worker struct { lockSeqNum bool *ddlCtx - ddlJobCache + *jobContext } -// ddlJobCache is a cache for each DDL job. -type ddlJobCache struct { +// jobContext is the ddl job execution context. +type jobContext struct { // below fields are cache for top sql ddlJobCtx context.Context cacheSQL string @@ -115,7 +115,7 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan tp: tp, ddlJobCh: make(chan struct{}, 1), ctx: ctx, - ddlJobCache: ddlJobCache{ + jobContext: &jobContext{ ddlJobCtx: context.Background(), cacheSQL: "", cacheNormalizedSQL: "", @@ -518,12 +518,14 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta { return meta.NewMeta(txn) } -func (w *worker) setDDLLabelForTopSQL(job *model.Job) { +func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) { if !topsqlstate.TopSQLEnabled() || job == nil { + w.cacheDigest = nil + w.ddlJobCtx = context.Background() return } - if job.Query != w.cacheSQL { + if job.Query != w.cacheSQL || w.cacheDigest == nil { w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query) w.cacheSQL = job.Query } @@ -531,9 +533,9 @@ func (w *worker) setDDLLabelForTopSQL(job *model.Job) { w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false) } -func (w *worker) setResourceGroupTaggerForTopSQL(txn kv.Transaction) { +func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil { - return + return nil } digest := w.cacheDigest @@ -541,7 +543,7 @@ func (w *worker) setResourceGroupTaggerForTopSQL(txn kv.Transaction) { req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, nil, resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req))) } - txn.SetOption(kv.ResourceGroupTagger, tikvrpc.ResourceGroupTagger(tagger)) + return tagger } // handleDDLJobQueue handles DDL jobs in DDL Job queue. @@ -579,7 +581,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { } w.setDDLLabelForTopSQL(job) - w.setResourceGroupTaggerForTopSQL(txn) + if tagger := w.getResourceGroupTaggerForTopSQL(); tagger != nil { + txn.SetOption(kv.ResourceGroupTagger, tagger) + } if isDone, err1 := isDependencyJobDone(t, job); err1 != nil || !isDone { return errors.Trace(err1) } diff --git a/ddl/index.go b/ddl/index.go index 376e4442ef89f..109418cb17926 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -558,7 +558,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} - reorgInfo, err := getReorgInfo(d, t, job, tbl, elements) + reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1141,7 +1141,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac // taskDone means that the reorged handle is out of taskRange.endHandle. taskDone := false oprStartTime := startTime - err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, + err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in baseIndexWorker fetchRowColVals", 0) @@ -1297,7 +1297,9 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) - w.ddlWorker.setResourceGroupTaggerForTopSQL(txn) + if tagger := w.ddlWorker.getResourceGroupTaggerForTopSQL(); tagger != nil { + txn.SetOption(kv.ResourceGroupTagger, tagger) + } idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { @@ -1408,7 +1410,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } @@ -1509,7 +1511,9 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, w.priority) - w.ddlWorker.setResourceGroupTaggerForTopSQL(txn) + if tagger := w.ddlWorker.getResourceGroupTaggerForTopSQL(); tagger != nil { + txn.SetOption(kv.ResourceGroupTagger, tagger) + } idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { @@ -1590,7 +1594,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r if err != nil { return false, errors.Trace(err) } - start, end, err := getTableRange(reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) + start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority) if err != nil { return false, errors.Trace(err) } diff --git a/ddl/partition.go b/ddl/partition.go index 4634f021e2243..b222fab807f78 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1124,7 +1124,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) } } - reorgInfo, err := getReorgInfoFromPartitions(d, t, job, tbl, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(w.jobContext, d, t, job, tbl, physicalTableIDs, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version diff --git a/ddl/reorg.go b/ddl/reorg.go index e7c7de58a4ed4..315f5eac93c9f 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -542,9 +542,9 @@ func buildCommonHandleFromChunkRow(sctx *stmtctx.StatementContext, tblInfo *mode } // getTableRange gets the start and end handle of a table (or partition). -func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) { +func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) { // Get the start handle of this partition. - err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, nil, nil, + err = iterateSnapshotRows(ctx, d.store, priority, tbl, snapshotVer, nil, nil, func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (bool, error) { startHandleKey = rowKey return false, nil @@ -579,7 +579,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) { return ver, nil } -func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) { +func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) { var ( element *meta.Element start kv.Key @@ -616,7 +616,7 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elem } else { tb = tbl.(table.PhysicalTable) } - start, end, err = getTableRange(d, tb, ver.Ver, job.Priority) + start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority) if err != nil { return nil, errors.Trace(err) } @@ -671,7 +671,7 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elem return &info, nil } -func getReorgInfoFromPartitions(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) { +func getReorgInfoFromPartitions(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) { var ( element *meta.Element start kv.Key @@ -688,7 +688,7 @@ func getReorgInfoFromPartitions(d *ddlCtx, t *meta.Meta, job *model.Job, tbl tab } pid = partitionIDs[0] tb := tbl.(table.PartitionedTable).GetPartition(pid) - start, end, err = getTableRange(d, tb, ver.Ver, job.Priority) + start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority) if err != nil { return nil, errors.Trace(err) } diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index a5355627ad0c8..41f2d2772b5a3 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -143,7 +143,7 @@ func TestReorg(t *testing.T) { require.NoError(t, err) m = meta.NewMeta(txn) - info, err1 := getReorgInfo(d.ddlCtx, m, job, mockTbl, nil) + info, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, nil) require.NoError(t, err1) require.Equal(t, info.StartKey, kv.Key(handle.Encoded())) require.Equal(t, info.currElement, e) @@ -174,7 +174,7 @@ func TestReorg(t *testing.T) { err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) var err1 error - _, err1 = getReorgInfo(d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) + _, err1 = getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err1)) require.Equal(t, job.SnapshotVer, uint64(0)) return nil @@ -185,7 +185,7 @@ func TestReorg(t *testing.T) { require.NoError(t, err) err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) - info1, err1 := getReorgInfo(d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) + info1, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element}) require.NoError(t, err1) require.Equal(t, info1.currElement, info.currElement) require.Equal(t, info1.StartKey, info.StartKey) diff --git a/server/tidb_test.go b/server/tidb_test.go index cd718bb819601..4c7081478aacb 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -1889,7 +1889,7 @@ func TestTopSQLStatementStats(t *testing.T) { type resourceTagChecker struct { sync.Mutex - sqlDigests map[stmtstats.BinaryDigest]struct{} + sqlDigest2Reqs map[stmtstats.BinaryDigest]map[tikvrpc.CmdType]struct{} } func (c *resourceTagChecker) checkExist(t *testing.T, digest stmtstats.BinaryDigest, sqlStr string) { @@ -1906,10 +1906,21 @@ func (c *resourceTagChecker) checkExist(t *testing.T, digest stmtstats.BinaryDig c.Lock() defer c.Unlock() - _, ok := c.sqlDigests[digest] + _, ok := c.sqlDigest2Reqs[digest] require.True(t, ok, sqlStr) } +func (c *resourceTagChecker) checkReqExist(t *testing.T, digest stmtstats.BinaryDigest, sqlStr string, reqs ...tikvrpc.CmdType) { + c.Lock() + defer c.Unlock() + reqMap, ok := c.sqlDigest2Reqs[digest] + require.True(t, ok, sqlStr) + for _, req := range reqs { + _, ok := reqMap[req] + require.True(t, ok, sqlStr+"--"+req.String()) + } +} + func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.StatementStatsMap, *resourceTagChecker, chan struct{}, func()) { // Prepare stmt stats. stmtstats.SetupAggregator() @@ -1956,7 +1967,7 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S }) tagChecker := &resourceTagChecker{ - sqlDigests: make(map[stmtstats.BinaryDigest]struct{}), + sqlDigest2Reqs: make(map[stmtstats.BinaryDigest]map[tikvrpc.CmdType]struct{}), } unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) { tag := req.GetResourceGroupTag() @@ -1967,7 +1978,13 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S require.NoError(t, err) tagChecker.Lock() defer tagChecker.Unlock() - tagChecker.sqlDigests[stmtstats.BinaryDigest(sqlDigest)] = struct{}{} + + reqMap, ok := tagChecker.sqlDigest2Reqs[stmtstats.BinaryDigest(sqlDigest)] + if !ok { + reqMap = make(map[tikvrpc.CmdType]struct{}) + } + reqMap[req.Type] = struct{}{} + tagChecker.sqlDigest2Reqs[stmtstats.BinaryDigest(sqlDigest)] = reqMap } cleanFn := func() { @@ -2093,6 +2110,10 @@ func TestTopSQLStatementStats2(t *testing.T) { require.True(t, item.SumDurationNs > 1, sqlStr) foundMap[digest.SQLDigest] = sqlStr tagChecker.checkExist(t, digest.SQLDigest, sqlStr) + // The special check uses to test the issue #33202. + if strings.Contains(strings.ToLower(sqlStr), "add index") { + tagChecker.checkReqExist(t, digest.SQLDigest, sqlStr, tikvrpc.CmdScan) + } } } require.Equal(t, len(sqlDigests), len(foundMap), fmt.Sprintf("%v !=\n %v", sqlDigests, foundMap))