Skip to content

Commit

Permalink
topsql: fix the issue of TopSQL doesn't catch the scan_rows of DDL ex…
Browse files Browse the repository at this point in the history
…ecutions (pingcap#33370)

close pingcap#33202
  • Loading branch information
crazycs520 authored Mar 30, 2022
1 parent b35b30b commit 191a2dc
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 35 deletions.
5 changes: 4 additions & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64,
func iterateSnapshotRows(ctx *jobContext, store kv.Storage, priority int, t table.Table, version uint64,
startKey kv.Key, endKey kv.Key, fn recordIterFunc) error {
var firstKey kv.Key
if startKey == nil {
Expand All @@ -728,6 +728,9 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version
ver := kv.Version{Ver: version}
snap := store.GetSnapshot(ver)
snap.SetOption(kv.Priority, priority)
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}

reorgInfo, err := getReorgInfo(d, t, job, tbl, BuildElements(changingCol, changingIdxs))
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1148,7 +1148,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
if err != nil {
return errors.Trace(err)
}
originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
originalStartHandle, originalEndHandle, err := getTableRange(w.jobContext, reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
taskDone := false
var lastAccessedHandle kv.Key
oprStartTime := startTime
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in updateColumnWorker fetchRowColVals", 0)
Expand Down Expand Up @@ -1392,7 +1392,9 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, w.priority)
w.ddlWorker.setResourceGroupTaggerForTopSQL(txn)
if tagger := w.ddlWorker.getResourceGroupTaggerForTopSQL(); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}

rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down
24 changes: 14 additions & 10 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ type worker struct {
lockSeqNum bool

*ddlCtx
ddlJobCache
*jobContext
}

// ddlJobCache is a cache for each DDL job.
type ddlJobCache struct {
// jobContext is the ddl job execution context.
type jobContext struct {
// below fields are cache for top sql
ddlJobCtx context.Context
cacheSQL string
Expand All @@ -115,7 +115,7 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan
tp: tp,
ddlJobCh: make(chan struct{}, 1),
ctx: ctx,
ddlJobCache: ddlJobCache{
jobContext: &jobContext{
ddlJobCtx: context.Background(),
cacheSQL: "",
cacheNormalizedSQL: "",
Expand Down Expand Up @@ -518,30 +518,32 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta {
return meta.NewMeta(txn)
}

func (w *worker) setDDLLabelForTopSQL(job *model.Job) {
func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) {
if !topsqlstate.TopSQLEnabled() || job == nil {
w.cacheDigest = nil
w.ddlJobCtx = context.Background()
return
}

if job.Query != w.cacheSQL {
if job.Query != w.cacheSQL || w.cacheDigest == nil {
w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query)
w.cacheSQL = job.Query
}

w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false)
}

func (w *worker) setResourceGroupTaggerForTopSQL(txn kv.Transaction) {
func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil {
return
return nil
}

digest := w.cacheDigest
tagger := func(req *tikvrpc.Request) {
req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, nil,
resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req)))
}
txn.SetOption(kv.ResourceGroupTagger, tikvrpc.ResourceGroupTagger(tagger))
return tagger
}

// handleDDLJobQueue handles DDL jobs in DDL Job queue.
Expand Down Expand Up @@ -579,7 +581,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
}

w.setDDLLabelForTopSQL(job)
w.setResourceGroupTaggerForTopSQL(txn)
if tagger := w.getResourceGroupTaggerForTopSQL(); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
if isDone, err1 := isDependencyJobDone(t, job); err1 != nil || !isDone {
return errors.Trace(err1)
}
Expand Down
16 changes: 10 additions & 6 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
reorgInfo, err := getReorgInfo(d, t, job, tbl, elements)
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac
// taskDone means that the reorged handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
err := iterateSnapshotRows(w.ddlWorker.jobContext, w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in baseIndexWorker fetchRowColVals", 0)
Expand Down Expand Up @@ -1297,7 +1297,9 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, w.priority)
w.ddlWorker.setResourceGroupTaggerForTopSQL(txn)
if tagger := w.ddlWorker.getResourceGroupTaggerForTopSQL(); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}

idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down Expand Up @@ -1408,7 +1410,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo
if err != nil {
return false, errors.Trace(err)
}
start, end, err := getTableRange(reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -1509,7 +1511,9 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, w.priority)
w.ddlWorker.setResourceGroupTaggerForTopSQL(txn)
if tagger := w.ddlWorker.getResourceGroupTaggerForTopSQL(); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}

idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
Expand Down Expand Up @@ -1590,7 +1594,7 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r
if err != nil {
return false, errors.Trace(err)
}
start, end, err := getTableRange(reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
start, end, err := getTableRange(w.jobContext, reorg.d, t.GetPartition(pid), currentVer.Ver, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey})
}
}
reorgInfo, err := getReorgInfoFromPartitions(d, t, job, tbl, physicalTableIDs, elements)
reorgInfo, err := getReorgInfoFromPartitions(w.jobContext, d, t, job, tbl, physicalTableIDs, elements)

if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
12 changes: 6 additions & 6 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,9 @@ func buildCommonHandleFromChunkRow(sctx *stmtctx.StatementContext, tblInfo *mode
}

// getTableRange gets the start and end handle of a table (or partition).
func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) {
func getTableRange(ctx *jobContext, d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, priority int) (startHandleKey, endHandleKey kv.Key, err error) {
// Get the start handle of this partition.
err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, nil, nil,
err = iterateSnapshotRows(ctx, d.store, priority, tbl, snapshotVer, nil, nil,
func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (bool, error) {
startHandleKey = rowKey
return false, nil
Expand Down Expand Up @@ -579,7 +579,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
return ver, nil
}

func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfo(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand Down Expand Up @@ -616,7 +616,7 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elem
} else {
tb = tbl.(table.PhysicalTable)
}
start, end, err = getTableRange(d, tb, ver.Ver, job.Priority)
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -671,7 +671,7 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elem
return &info, nil
}

func getReorgInfoFromPartitions(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
func getReorgInfoFromPartitions(ctx *jobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
Expand All @@ -688,7 +688,7 @@ func getReorgInfoFromPartitions(d *ddlCtx, t *meta.Meta, job *model.Job, tbl tab
}
pid = partitionIDs[0]
tb := tbl.(table.PartitionedTable).GetPartition(pid)
start, end, err = getTableRange(d, tb, ver.Ver, job.Priority)
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions ddl/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestReorg(t *testing.T) {
require.NoError(t, err)

m = meta.NewMeta(txn)
info, err1 := getReorgInfo(d.ddlCtx, m, job, mockTbl, nil)
info, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, nil)
require.NoError(t, err1)
require.Equal(t, info.StartKey, kv.Key(handle.Encoded()))
require.Equal(t, info.currElement, e)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestReorg(t *testing.T) {
err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
_, err1 = getReorgInfo(d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
_, err1 = getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err1))
require.Equal(t, job.SnapshotVer, uint64(0))
return nil
Expand All @@ -185,7 +185,7 @@ func TestReorg(t *testing.T) {
require.NoError(t, err)
err = kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
info1, err1 := getReorgInfo(d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
info1, err1 := getReorgInfo(&jobContext{}, d.ddlCtx, m, job, mockTbl, []*meta.Element{element})
require.NoError(t, err1)
require.Equal(t, info1.currElement, info.currElement)
require.Equal(t, info1.StartKey, info.StartKey)
Expand Down
29 changes: 25 additions & 4 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1889,7 +1889,7 @@ func TestTopSQLStatementStats(t *testing.T) {

type resourceTagChecker struct {
sync.Mutex
sqlDigests map[stmtstats.BinaryDigest]struct{}
sqlDigest2Reqs map[stmtstats.BinaryDigest]map[tikvrpc.CmdType]struct{}
}

func (c *resourceTagChecker) checkExist(t *testing.T, digest stmtstats.BinaryDigest, sqlStr string) {
Expand All @@ -1906,10 +1906,21 @@ func (c *resourceTagChecker) checkExist(t *testing.T, digest stmtstats.BinaryDig

c.Lock()
defer c.Unlock()
_, ok := c.sqlDigests[digest]
_, ok := c.sqlDigest2Reqs[digest]
require.True(t, ok, sqlStr)
}

func (c *resourceTagChecker) checkReqExist(t *testing.T, digest stmtstats.BinaryDigest, sqlStr string, reqs ...tikvrpc.CmdType) {
c.Lock()
defer c.Unlock()
reqMap, ok := c.sqlDigest2Reqs[digest]
require.True(t, ok, sqlStr)
for _, req := range reqs {
_, ok := reqMap[req]
require.True(t, ok, sqlStr+"--"+req.String())
}
}

func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.StatementStatsMap, *resourceTagChecker, chan struct{}, func()) {
// Prepare stmt stats.
stmtstats.SetupAggregator()
Expand Down Expand Up @@ -1956,7 +1967,7 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S
})

tagChecker := &resourceTagChecker{
sqlDigests: make(map[stmtstats.BinaryDigest]struct{}),
sqlDigest2Reqs: make(map[stmtstats.BinaryDigest]map[tikvrpc.CmdType]struct{}),
}
unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) {
tag := req.GetResourceGroupTag()
Expand All @@ -1967,7 +1978,13 @@ func setupForTestTopSQLStatementStats(t *testing.T) (*tidbTestSuite, stmtstats.S
require.NoError(t, err)
tagChecker.Lock()
defer tagChecker.Unlock()
tagChecker.sqlDigests[stmtstats.BinaryDigest(sqlDigest)] = struct{}{}

reqMap, ok := tagChecker.sqlDigest2Reqs[stmtstats.BinaryDigest(sqlDigest)]
if !ok {
reqMap = make(map[tikvrpc.CmdType]struct{})
}
reqMap[req.Type] = struct{}{}
tagChecker.sqlDigest2Reqs[stmtstats.BinaryDigest(sqlDigest)] = reqMap
}

cleanFn := func() {
Expand Down Expand Up @@ -2093,6 +2110,10 @@ func TestTopSQLStatementStats2(t *testing.T) {
require.True(t, item.SumDurationNs > 1, sqlStr)
foundMap[digest.SQLDigest] = sqlStr
tagChecker.checkExist(t, digest.SQLDigest, sqlStr)
// The special check uses to test the issue #33202.
if strings.Contains(strings.ToLower(sqlStr), "add index") {
tagChecker.checkReqExist(t, digest.SQLDigest, sqlStr, tikvrpc.CmdScan)
}
}
}
require.Equal(t, len(sqlDigests), len(foundMap), fmt.Sprintf("%v !=\n %v", sqlDigests, foundMap))
Expand Down

0 comments on commit 191a2dc

Please sign in to comment.