Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: set context correctly in the setDDLLabelForDiagnosis (#40090) #40131

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type reorgBackfillTask struct {
startKey kv.Key
endKey kv.Key
endInclude bool
source string
}

func (r *reorgBackfillTask) excludedEndKey() kv.Key {
Expand Down Expand Up @@ -498,6 +499,7 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
}
// Build reorg tasks.
job := reorgInfo.Job
source := getDDLRequestSource(job)
for i, keyRange := range kvRanges {
endKey := keyRange.EndKey
endK, err := getRangeEndKey(scheduler.jobCtx, dc.store, job.Priority, prefix, keyRange.StartKey, endKey)
Expand All @@ -515,7 +517,9 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
startKey: keyRange.StartKey,
endKey: endKey,
// If the boundaries overlap, we should ignore the preceding endKey.
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1}
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1,
source: source,
}
batchTasks = append(batchTasks, task)

if len(batchTasks) >= backfillTaskChanSize {
Expand Down
8 changes: 6 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) {
ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
if !exists {
ctx = NewJobContext()
ctx.setDDLLabelForDiagnosis(job)
dc.jobCtx.jobCtxMap[job.ID] = ctx
}
ctx.setDDLLabelForDiagnosis(job)
}

func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger {
Expand Down Expand Up @@ -1786,7 +1786,11 @@ func (s *session) execute(ctx context.Context, query string, label string) ([]ch
defer func() {
metrics.DDLJobTableDuration.WithLabelValues(label + "-" + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()
rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(kv.WithInternalSourceType(ctx, kv.InternalTxnDDL), query)

if ctx.Value(kv.RequestSourceKey) == nil {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
}
rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
5 changes: 4 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewJobContext() *JobContext {
cacheSQL: "",
cacheNormalizedSQL: "",
cacheDigest: nil,
tp: "unknown",
tp: "",
}
}

Expand Down Expand Up @@ -760,6 +760,9 @@ func getDDLRequestSource(job *model.Job) string {
}

func (w *JobContext) setDDLLabelForDiagnosis(job *model.Job) {
if w.tp != "" {
return
}
w.tp = getDDLRequestSource(job)
w.ddlJobCtx = kv.WithInternalSourceType(w.ddlJobCtx, w.ddlJobSourceType())
}
Expand Down
9 changes: 8 additions & 1 deletion ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func (c *copReqSender) run() {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
}
rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey())
ctx := kv.WithInternalSourceType(p.ctx, task.source)
rs, err := p.copCtx.buildTableScan(ctx, ver.Ver, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
Expand Down Expand Up @@ -422,6 +423,12 @@ func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start,
SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()).
SetConcurrency(1).
Build()
builder.RequestSource.RequestSourceInternal = true
if source := ctx.Value(kv.RequestSourceKey); source != nil {
builder.RequestSource.RequestSourceType = source.(kv.RequestSource).RequestSourceType
} else {
builder.RequestSource.RequestSourceType = kv.InternalTxnDDL
}
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ func updateDDLJob2Table(sctx *session, job *model.Job, updateRawArgs bool) error
// getDDLReorgHandle gets DDL reorg handle.
func getDDLReorgHandle(sess *session, job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) {
sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id from mysql.tidb_ddl_reorg where job_id = %d", job.ID)
rows, err := sess.execute(context.Background(), sql, "get_handle")
ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job))
rows, err := sess.execute(ctx, sql, "get_handle")
if err != nil {
return nil, nil, nil, 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"testing"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/stretchr/testify/require"
tikvutil "github.com/tikv/client-go/v2/util"
)

func TestKillStmt(t *testing.T) {
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestKillStmt(t *testing.T) {
func TestUserAttributes(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomain(t)
rootTK := testkit.NewTestKit(t, store)
ctx := context.WithValue(context.Background(), tikvutil.RequestSourceKey, tikvutil.RequestSource{RequestSourceInternal: true})
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege)

// https://dev.mysql.com/doc/refman/8.0/en/create-user.html#create-user-comments-attributes
rootTK.MustExec(`CREATE USER testuser COMMENT '1234'`)
Expand Down
28 changes: 15 additions & 13 deletions kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,22 @@ func BackOff(attempts uint) int {
func setRequestSourceForInnerTxn(ctx context.Context, txn Transaction) {
if source := ctx.Value(RequestSourceKey); source != nil {
requestSource := source.(RequestSource)
if !requestSource.RequestSourceInternal {
logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only")
if requestSource.RequestSourceType != "" {
if !requestSource.RequestSourceInternal {
logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only")
}
txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal)
txn.SetOption(RequestSourceType, requestSource.RequestSourceType)
return
}
txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal)
txn.SetOption(RequestSourceType, requestSource.RequestSourceType)
}
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " +
"the `RequestSourceTypeKey` is missing in the context")
}
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " +
"the `RequestSourceTypeKey` is missing in the context")
}
}
33 changes: 18 additions & 15 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4112,23 +4112,26 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo
} else {
s.sessionVars.RequestSourceType = stmtLabel
}
} else {
if source := ctx.Value(kv.RequestSourceKey); source != nil {
s.sessionVars.RequestSourceType = source.(kv.RequestSource).RequestSourceType
} else {
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+
"the `RequestSourceTypeKey` is missing in the context",
zap.Bool("internal", s.isInternal()),
zap.String("sql", stmtNode.Text()))
}
return
}
if source := ctx.Value(kv.RequestSourceKey); source != nil {
requestSource := source.(kv.RequestSource)
if requestSource.RequestSourceType != "" {
s.sessionVars.RequestSourceType = requestSource.RequestSourceType
return
}
}
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+
"the `RequestSourceTypeKey` is missing in the context",
zap.Bool("internal", s.isInternal()),
zap.String("sql", stmtNode.Text()))
}
}

// RemoveLockDDLJobs removes the DDL jobs which doesn't get the metadata lock from job2ver.
Expand Down