Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#38738
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
mjonss authored and ti-chi-bot committed May 25, 2023
1 parent 9dd1a1a commit 921a1e8
Show file tree
Hide file tree
Showing 12 changed files with 566 additions and 5 deletions.
317 changes: 317 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,323 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
return nil
}

<<<<<<< HEAD
=======
func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool,
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error {
bJobs = bJobs[:0]
instanceID := ""
if notDistTask {
instanceID = reorgInfo.d.uuid
}
// TODO: Adjust the number of ranges(region) for each task.
for _, task := range batchTasks {
bm := &model.BackfillMeta{
PhysicalTableID: reorgInfo.PhysicalTableID,
IsUnique: isUnique,
EndInclude: task.endInclude,
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
SQLMode: reorgInfo.ReorgMeta.SQLMode,
Location: reorgInfo.ReorgMeta.Location,
JobMeta: &model.JobMeta{
SchemaID: reorgInfo.Job.SchemaID,
TableID: reorgInfo.Job.TableID,
Query: reorgInfo.Job.Query,
},
}
bj := &BackfillJob{
ID: *id,
JobID: reorgInfo.Job.ID,
EleID: reorgInfo.currElement.ID,
EleKey: reorgInfo.currElement.TypeKey,
Tp: bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
}
*id++
bJobs = append(bJobs, bj)
}
if err := AddBackfillJobs(sess, bJobs); err != nil {
return errors.Trace(err)
}
return nil
}

func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool,
bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error {
endKey := reorgInfo.EndKey
isFirstOps := true
bJobs := make([]*BackfillJob, 0, genTaskBatch)
for {
kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch)
if len(batchTasks) == 0 {
break
}
notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt)
if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil {
return errors.Trace(err)
}
isFirstOps = false

remains := kvRanges[len(batchTasks):]
// TODO: After adding backfillCh do asyncNotify(dc.backfillJobCh).
logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table",
zap.Int("batchTasksCnt", len(batchTasks)),
zap.Int("totalRegionCnt", len(kvRanges)),
zap.Int("remainRegionCnt", len(remains)),
zap.String("startHandle", hex.EncodeToString(startKey)),
zap.String("endHandle", hex.EncodeToString(endKey)))

if len(remains) == 0 {
break
}

for {
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
return errors.Trace(err)
}
if bJobCnt < minGenTaskBatch {
break
}
time.Sleep(retrySQLInterval)
}
startKey = remains[0].StartKey
}
return nil
}

func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error {
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
if startKey == nil && endKey == nil {
return nil
}

if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil {
return errors.Trace(err)
}

currBackfillJobID := int64(1)
err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
return errors.Trace(err)
}
maxBfJob, err := GetMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
return errors.Trace(err)
}
if maxBfJob != nil {
startKey = maxBfJob.EndKey
currBackfillJobID = maxBfJob.ID + 1
}

var isUnique bool
if bfWorkerType == typeAddIndexWorker {
idxInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID)
isUnique = idxInfo.Unique
}
err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID)
if err != nil {
return errors.Trace(err)
}

var backfillJobFinished bool
jobID := reorgInfo.Job.ID
ticker := time.NewTicker(300 * time.Millisecond)
defer ticker.Stop()
for {
if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil {
return errors.Trace(err)
}

select {
case <-ticker.C:
if !backfillJobFinished {
err := checkAndHandleInterruptedBackfillJobs(sess, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", jobID), zap.Error(err))
return errors.Trace(err)
}

bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false)
if err != nil {
logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", jobID), zap.Error(err))
return errors.Trace(err)
}
if bfJob == nil {
backfillJobFinished = true
logutil.BgLogger().Info("[ddl] finish backfill jobs", zap.Int64("job ID", jobID))
}
}
if backfillJobFinished {
// TODO: Consider whether these backfill jobs are always out of sync.
isSynced, err := checkJobIsSynced(sess, jobID)
if err != nil {
logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Int64("job ID", jobID), zap.Error(err))
return errors.Trace(err)
}
if isSynced {
logutil.BgLogger().Info("[ddl] sync backfill jobs", zap.Int64("job ID", jobID))
return nil
}
}
case <-dc.ctx.Done():
return dc.ctx.Err()
}
}
}

func checkJobIsSynced(sess *session, jobID int64) (bool, error) {
var err error
var unsyncedInstanceIDs []string
for i := 0; i < retrySQLTimes; i++ {
unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, jobID, "check_backfill_history_job_sync")
if err == nil && len(unsyncedInstanceIDs) == 0 {
return true, nil
}

logutil.BgLogger().Info("[ddl] checkJobIsSynced failed",
zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err))
time.Sleep(retrySQLInterval)
}

return false, errors.Trace(err)
}

func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (err error) {
var bJobs []*BackfillJob
for i := 0; i < retrySQLTimes; i++ {
bJobs, err = GetInterruptedBackfillJobsForOneEle(sess, jobID, currEleID, currEleKey)
if err == nil {
break
}
logutil.BgLogger().Info("[ddl] getInterruptedBackfillJobsForOneEle failed", zap.Error(err))
time.Sleep(retrySQLInterval)
}
if err != nil {
return errors.Trace(err)
}
if len(bJobs) == 0 {
return nil
}

for i := 0; i < retrySQLTimes; i++ {
err = MoveBackfillJobsToHistoryTable(sess, bJobs[0])
if err == nil {
return errors.Errorf(bJobs[0].Meta.ErrMsg)
}
logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err))
time.Sleep(retrySQLInterval)
}
return errors.Trace(err)
}

func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) {
err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey)
if err != nil {
return 0, errors.Trace(err)
}

backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'",
jobID, currEleID, currEleKey), "check_backfill_job_count")
if err != nil {
return 0, errors.Trace(err)
}

return backfillJobCnt, nil
}

func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) {
var err error
var bJobs []*BackfillJob
descStr := ""
if isDesc {
descStr = "order by id desc"
}
for i := 0; i < retrySQLTimes; i++ {
bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' %s limit 1",
jobID, currEleID, currEleKey, descStr), "check_backfill_job_state")
if err != nil {
logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err))
continue
}

if len(bJobs) != 0 {
return bJobs[0], nil
}
break
}
return nil, errors.Trace(err)
}

// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable.
func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) {
bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true)
if err != nil {
return nil, errors.Trace(err)
}
hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, true)
if err != nil {
return nil, errors.Trace(err)
}

if bfJob == nil {
return hJob, nil
}
if hJob == nil {
return bfJob, nil
}
if bfJob.ID > hJob.ID {
return bfJob, nil
}
return hJob, nil
}

// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table.
func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error {
s, ok := sctx.(*session)
if !ok {
return errors.Errorf("sess ctx:%#v convert session failed", sctx)
}

return s.runInTxn(func(se *session) error {
// TODO: Consider batch by batch update backfill jobs and insert backfill history jobs.
bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'",
bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job")
if err != nil {
return errors.Trace(err)
}
if len(bJobs) == 0 {
return nil
}

txn, err := se.txn()
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
err = RemoveBackfillJob(se, true, bJobs[0])
if err == nil {
for _, bj := range bJobs {
bj.State = model.JobStateCancelled
bj.FinishTS = startTS
}
err = AddBackfillHistoryJob(se, bJobs)
}
logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs)))
return errors.Trace(err)
})
}

>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738))
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)

Expand Down
13 changes: 13 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j
func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
<<<<<<< HEAD
var rh *reorgHandler
if w.concurrentDDL {
sctx, err1 := w.sessPool.get()
Expand All @@ -834,6 +835,18 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
rh = newReorgHandler(newMeta, newSession(sctx), w.concurrentDDL)
} else {
rh = newReorgHandler(t, w.sess, w.concurrentDDL)
=======
sctx, err1 := w.sessPool.get()
if err1 != nil {
err = errors.Trace(err1)
return
}
defer w.sessPool.put(sctx)
rh := newReorgHandler(newSession(sctx))
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738))
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
Expand Down
6 changes: 6 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,7 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) {
tk.MustExec("admin check table t")
}

<<<<<<< HEAD
func TestFixDDLTxnWillConflictWithReorgTxnNotConcurrent(t *testing.T) {
store := testkit.CreateMockStore(t)
tk0 := testkit.NewTestKit(t, store)
Expand All @@ -2448,14 +2449,19 @@ func TestFixDDLTxnWillConflictWithReorgTxnNotConcurrent(t *testing.T) {
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint"))
}

=======
>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738))
func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF")
<<<<<<< HEAD
defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default")
=======
>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738))
tk.MustExec("alter table t add index(a)")
tk.MustExec("set @@sql_mode=''")
tk.MustExec("insert into t values(128),(129)")
Expand Down
3 changes: 3 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4556,6 +4556,7 @@ func TestAlterModifyPartitionColTruncateWarning(t *testing.T) {
"Warning 1265 Data truncated for column 'a', value is ' 654321'"))
}

<<<<<<< HEAD
func TestIssue40135Ver2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -4593,6 +4594,8 @@ func TestIssue40135Ver2(t *testing.T) {
tk.MustExec("admin check table t40135")
}

=======
>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738))
func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
Loading

0 comments on commit 921a1e8

Please sign in to comment.