Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, parser: Implement the write-reorg state split task related functions, and the related interfaces of backfill worker #39982

Merged
merged 13 commits into from
Jan 4, 2023
Merged
537 changes: 471 additions & 66 deletions ddl/backfilling.go

Large diffs are not rendered by default.

58 changes: 38 additions & 20 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
TestReorgGoroutineRunning <- a
for {
time.Sleep(30 * time.Millisecond)
if w.getReorgCtx(reorgInfo.Job).isReorgCanceled() {
if w.getReorgCtx(reorgInfo.Job.ID).isReorgCanceled() {
// Job is cancelled. So it can't be done.
failpoint.Return(dbterror.ErrCancelledDDLJob)
}
Expand All @@ -1081,7 +1081,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
return errors.Trace(err)
}
//nolint:forcetypeassert
originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job.ID), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1104,11 +1104,11 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
// Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle].
if i == startElementOffsetToResetHandle+1 {
reorgInfo.StartKey, reorgInfo.EndKey = originalStartHandle, originalEndHandle
w.getReorgCtx(reorgInfo.Job).setNextKey(reorgInfo.StartKey)
w.getReorgCtx(reorgInfo.Job.ID).setNextKey(reorgInfo.StartKey)
}

// Update the element in the reorgCtx to keep the atomic access for daemon-worker.
w.getReorgCtx(reorgInfo.Job).setCurrentElement(reorgInfo.elements[i+1])
w.getReorgCtx(reorgInfo.Job.ID).setCurrentElement(reorgInfo.elements[i+1])

// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
Expand All @@ -1132,7 +1132,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
}

type updateColumnWorker struct {
*backfillWorker
*backfillCtx
oldColInfo *model.ColumnInfo
newColInfo *model.ColumnInfo
metricCounter prometheus.Counter
Expand All @@ -1144,11 +1144,10 @@ type updateColumnWorker struct {
rowMap map[int64]types.Datum

// For SQL Mode and warnings.
sqlMode mysql.SQLMode
jobContext *JobContext
}

func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
func newUpdateColumnWorker(sessCtx sessionctx.Context, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
logutil.BgLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.String("reorgInfo", reorgInfo.String()))
Expand All @@ -1164,21 +1163,40 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &updateColumnWorker{
backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeUpdateColumnWorker),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
sqlMode: reorgInfo.ReorgMeta.SQLMode,
jobContext: jc,
backfillCtx: newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
jobContext: jc,
}
}

func (w *updateColumnWorker) AddMetricInfo(cnt float64) {
w.metricCounter.Add(cnt)
}

func (*updateColumnWorker) String() string {
return typeUpdateColumnWorker.String()
}

func (*updateColumnWorker) GetTask() (*BackfillJob, error) {
panic("[ddl] update column worker GetTask function doesn't implement")
}

func (*updateColumnWorker) UpdateTask(*BackfillJob) error {
panic("[ddl] update column worker UpdateTask function doesn't implement")
}

func (*updateColumnWorker) FinishTask(*BackfillJob) error {
panic("[ddl] update column worker FinishTask function doesn't implement")
}

func (w *updateColumnWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}

type rowRecord struct {
key []byte // It's used to lock a record. Record it to reduce the encoding time.
vals []byte // It's the record.
Expand All @@ -1204,8 +1222,8 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
taskDone := false
var lastAccessedHandle kv.Key
oprStartTime := startTime
err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, w.table.RecordPrefix(), txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
err := iterateSnapshotKeys(w.GetCtx().jobContext(taskRange.getJobID()), w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(),
txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in updateColumnWorker fetchRowColVals", 0)
oprStartTime = oprEndTime
Expand Down Expand Up @@ -1346,8 +1364,8 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, w.priority)
if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil {
txn.SetOption(kv.Priority, handleRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}

Expand Down
22 changes: 11 additions & 11 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,15 @@ func (dc *ddlCtx) isOwner() bool {
return isOwner
}

func (dc *ddlCtx) setDDLLabelForTopSQL(job *model.Job) {
func (dc *ddlCtx) setDDLLabelForTopSQL(jobID int64, jobQuery string) {
dc.jobCtx.Lock()
defer dc.jobCtx.Unlock()
ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
ctx, exists := dc.jobCtx.jobCtxMap[jobID]
if !exists {
ctx = NewJobContext()
dc.jobCtx.jobCtxMap[job.ID] = ctx
dc.jobCtx.jobCtxMap[jobID] = ctx
}
ctx.setDDLLabelForTopSQL(job)
ctx.setDDLLabelForTopSQL(jobQuery)
}

func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) {
Expand All @@ -444,10 +444,10 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) {
ctx.setDDLLabelForDiagnosis(job)
}

func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger {
func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) tikvrpc.ResourceGroupTagger {
dc.jobCtx.Lock()
defer dc.jobCtx.Unlock()
ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
ctx, exists := dc.jobCtx.jobCtxMap[jobID]
if !exists {
return nil
}
Expand All @@ -460,19 +460,19 @@ func (dc *ddlCtx) removeJobCtx(job *model.Job) {
delete(dc.jobCtx.jobCtxMap, job.ID)
}

func (dc *ddlCtx) jobContext(job *model.Job) *JobContext {
func (dc *ddlCtx) jobContext(jobID int64) *JobContext {
dc.jobCtx.RLock()
defer dc.jobCtx.RUnlock()
if jobContext, exists := dc.jobCtx.jobCtxMap[job.ID]; exists {
if jobContext, exists := dc.jobCtx.jobCtxMap[jobID]; exists {
return jobContext
}
return NewJobContext()
}

func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx {
func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
dc.reorgCtx.RLock()
defer dc.reorgCtx.RUnlock()
return dc.reorgCtx.reorgCtxMap[job.ID]
return dc.reorgCtx.reorgCtxMap[jobID]
}

func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx {
Expand All @@ -497,7 +497,7 @@ func (dc *ddlCtx) removeReorgCtx(job *model.Job) {
}

func (dc *ddlCtx) notifyReorgCancel(job *model.Job) {
rc := dc.getReorgCtx(job)
rc := dc.getReorgCtx(job.ID)
if rc == nil {
return
}
Expand Down
22 changes: 11 additions & 11 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,14 +721,14 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta {
return meta.NewMeta(txn)
}

func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) {
if !topsqlstate.TopSQLEnabled() || job == nil {
func (w *JobContext) setDDLLabelForTopSQL(jobQuery string) {
if !topsqlstate.TopSQLEnabled() || jobQuery == "" {
return
}

if job.Query != w.cacheSQL || w.cacheDigest == nil {
w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query)
w.cacheSQL = job.Query
if jobQuery != w.cacheSQL || w.cacheDigest == nil {
w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(jobQuery)
w.cacheSQL = jobQuery
w.ddlJobCtx = topsql.AttachAndRegisterSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, false)
} else {
topsql.AttachAndRegisterSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, false)
Expand Down Expand Up @@ -815,10 +815,10 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
if w.tp == addIdxWorker && job.IsRunning() {
txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull)
}
w.setDDLLabelForTopSQL(job)
w.setDDLLabelForTopSQL(job.ID, job.Query)
w.setDDLSourceForDiagnosis(job)
jobContext := w.jobContext(job)
if tagger := w.getResourceGroupTaggerForTopSQL(job); tagger != nil {
jobContext := w.jobContext(job.ID)
if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
t := meta.NewMeta(txn)
Expand Down Expand Up @@ -954,10 +954,10 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull)
}

w.setDDLLabelForTopSQL(job)
w.setDDLLabelForTopSQL(job.ID, job.Query)
w.setDDLSourceForDiagnosis(job)
jobContext := w.jobContext(job)
if tagger := w.getResourceGroupTaggerForTopSQL(job); tagger != nil {
jobContext := w.jobContext(job.ID)
if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
if isDone, err1 := isDependencyJobDone(t, job); err1 != nil || !isDone {
Expand Down
4 changes: 1 addition & 3 deletions ddl/ddl_workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

Expand All @@ -36,10 +35,9 @@ func TestDDLWorkerPool(t *testing.T) {
}

func TestBackfillWorkerPool(t *testing.T) {
reorgInfo := &reorgInfo{Job: &model.Job{ID: 1}}
f := func() func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newBackfillWorker(context.Background(), nil, 1, nil, reorgInfo, typeAddIndexWorker)
wk := newBackfillWorker(context.Background(), 1, nil)
return wk, nil
}
}
Expand Down
Loading