Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: improve backfill more general (#41093) #41290

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (bT backfillerType) String() string {
}
}

// BackfillJob is for a tidb_ddl_backfill table's record.
// BackfillJob is for a tidb_background_subtask table's record.
type BackfillJob struct {
ID int64
JobID int64
Expand All @@ -101,15 +101,9 @@ type BackfillJob struct {
State model.JobState
InstanceID string
InstanceLease types.Time
// range info
CurrKey []byte
StartKey []byte
EndKey []byte

StartTS uint64
FinishTS uint64
RowCount int64
Meta *model.BackfillMeta
StartTS uint64
StateUpdateTS uint64
Meta *model.BackfillMeta
}

// AbbrStr returns the BackfillJob's info without the Meta info.
Expand Down Expand Up @@ -321,7 +315,7 @@ func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey
if err != nil {
return err
}
bfJob.CurrKey = nextKey
bfJob.Meta.CurrKey = nextKey
bfJob.InstanceID = execID
bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease)
return w.backfiller.UpdateTask(bfJob)
Expand Down Expand Up @@ -475,7 +469,7 @@ func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResul
// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller)
task.bfJob.RowCount = int64(result.addedCount)
task.bfJob.Meta.RowCount = int64(result.addedCount)
if result.err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker runTask failed",
zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(result.err))
Expand Down Expand Up @@ -1142,6 +1136,8 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob
Type: reorgInfo.Job.Type,
Query: reorgInfo.Job.Query,
},
StartKey: task.startKey,
EndKey: task.endKey,
}
bj := &BackfillJob{
ID: sJobCtx.currBackfillJobID.Add(1),
Expand All @@ -1152,11 +1148,9 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob
Tp: sJobCtx.bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
}
bj.Meta.CurrKey = task.startKey
bJobs = append(bJobs, bj)
}
if err := AddBackfillJobs(sess, bJobs); err != nil {
Expand Down
82 changes: 37 additions & 45 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// BackfillTable stores the information of backfill jobs.
BackfillTable = "tidb_ddl_backfill"
// BackfillHistoryTable stores the information of history backfill jobs.
BackfillHistoryTable = "tidb_ddl_backfill_history"
// BackgroundSubtaskTable stores the information of backfill jobs.
BackgroundSubtaskTable = "tidb_background_subtask"
// BackgroundSubtaskHistoryTable stores the information of history backfill jobs.
BackgroundSubtaskHistoryTable = "tidb_background_subtask_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand All @@ -38,53 +38,45 @@ const (
HistoryTableID = meta.MaxInt48 - 3
// MDLTableID is the table ID of `tidb_mdl_info`.
MDLTableID = meta.MaxInt48 - 4
// BackfillTableID is the table ID of `tidb_ddl_backfill`.
BackfillTableID = meta.MaxInt48 - 5
// BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`.
BackfillHistoryTableID = meta.MaxInt48 - 6
// BackgroundSubtaskTableID is the table ID of `tidb_background_subtask`.
BackgroundSubtaskTableID = meta.MaxInt48 - 5
// BackgroundSubtaskHistoryTableID is the table ID of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableID = meta.MaxInt48 - 6

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))"
// ReorgTableSQL is the CREATE TABLE SQL of `tidb_ddl_reorg`.
ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))"
// HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`.
HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))"
// BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`.
BackfillTableSQL = "create table " + BackfillTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
ddl_physical_tid bigint,
// BackgroundSubtaskTableSQL is the CREATE TABLE SQL of `tidb_background_subtask`.
BackgroundSubtaskTableSQL = "create table " + BackgroundSubtaskTable + `(
id bigint not null auto_increment primary key,
namespace varchar(256),
task_key varchar(256),
ddl_physical_tid bigint(20),
type int,
exec_id blob default null,
exec_lease timestamp,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
// BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`.
BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
ddl_physical_tid bigint,
type int,
exec_id blob default null,
exec_lease timestamp,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
exec_id varchar(256),
exec_expired timestamp,
state varchar(64) not null,
checkpoint longblob not null,
start_time bigint,
state_update_time bigint,
meta longblob,
unique key(namespace, task_key))`
// BackgroundSubtaskHistoryTableSQL is the CREATE TABLE SQL of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableSQL = "create table " + BackgroundSubtaskHistoryTable + `(
id bigint not null auto_increment primary key,
namespace varchar(256),
task_key varchar(256),
ddl_physical_tid bigint(20),
type int,
exec_id varchar(256),
exec_expired timestamp,
state varchar(64) not null,
checkpoint longblob not null,
start_time bigint,
state_update_time bigint,
meta longblob,
unique key(namespace, task_key))`
)
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) {
rc := dc.getReorgCtx(bfJob.JobID)
if rc == nil {
ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}
dc.newReorgCtx(bfJob.JobID, bfJob.StartKey, ele, bfJob.RowCount)
dc.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, ele, bfJob.Meta.RowCount)
} else {
rc.references.Add(1)
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ func TestUsingReorgCtx(t *testing.T) {
wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil}
m := &model.BackfillMeta{StartKey: []byte("skey"), RowCount: 1}
bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil, Meta: m}
for i := 0; i < 100; i++ {
d.(ddl.DDLForTest).SetReorgCtxForBackfill(bfJob)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
Expand Down
4 changes: 2 additions & 2 deletions ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBac
physicalTable: pt,
// TODO: Remove these fields after remove the old logic.
sqlQuery: bfJob.Meta.Query,
startKey: bfJob.StartKey,
endKey: bfJob.EndKey,
startKey: bfJob.Meta.StartKey,
endKey: bfJob.Meta.EndKey,
endInclude: bfJob.Meta.EndInclude,
priority: bfJob.Meta.Priority}, nil
}
Expand Down
36 changes: 17 additions & 19 deletions ddl/dist_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC
return errors.Trace(err)
}

bfJobs, err := getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEle.ID, currEle.TypeKey)
bfJobs, err := getBackfillJobWithRetry(sess, BackgroundSubtaskTable, ddlJobID, currEle.ID, currEle.TypeKey)
if err != nil {
logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -380,10 +380,10 @@ func checkJobIsFinished(sess *session, ddlJobID int64) (bool, error) {
return true, nil
}

logutil.BgLogger().Info("[ddl] checkJobIsSynced failed",
zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err))
time.Sleep(RetrySQLInterval)
}
logutil.BgLogger().Info("[ddl] checkJobIsSynced failed",
zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", retrySQLTimes), zap.Error(err))

return false, errors.Trace(err)
}
Expand All @@ -393,8 +393,8 @@ func GetBackfillErr(sess *session, ddlJobID, currEleID int64, currEleKey []byte)
var err error
var metas []*model.BackfillMeta
for i := 0; i < retrySQLTimes; i++ {
metas, err = GetBackfillMetas(sess, BackfillHistoryTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s",
ddlJobID, currEleID, wrapKey2String(currEleKey)), "get_backfill_job_metas")
metas, err = GetBackfillMetas(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "get_backfill_job_metas")
if err == nil {
for _, m := range metas {
if m.Error != nil {
Expand Down Expand Up @@ -445,9 +445,9 @@ func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey
return 0, errors.Trace(err)
}

backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable,
fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and ddl_physical_tid = %d",
ddlJobID, currEleID, wrapKey2String(currEleKey), pTblID), "check_backfill_job_count")
backfillJobCnt, err = GetBackfillJobCount(sess, BackgroundSubtaskTable,
fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_count")
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -459,31 +459,29 @@ func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleI
var err error
var bJobs []*BackfillJob
for i := 0; i < retrySQLTimes; i++ {
bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s limit 1",
ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_state")
bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" limit 1",
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_state")
if err != nil {
logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err))
time.Sleep(RetrySQLInterval)
continue
}

return bJobs, nil
}
return nil, errors.Trace(err)
}

// GetPhysicalTableMetas gets the max backfill metas per physical table in BackfillTable and BackfillHistoryTable.
// GetPhysicalTableMetas gets the max backfill metas per physical table in BackgroundSubtaskTable and BackgroundSubtaskHistoryTable.
func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (map[int64]*BackfillJobRangeMeta, error) {
condition := fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", ddlJobID, currEleID, wrapKey2String(currEleKey))
pTblMs, err := GetBackfillIDAndMetas(sess, BackfillTable, condition, "get_ptbl_metas")
condition := fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", ddlJobID, hex.EncodeToString(currEleKey), currEleID)
pTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskTable, condition, "get_ptbl_metas")
if err != nil {
return nil, errors.Trace(err)
}
hPTblMs, err := GetBackfillIDAndMetas(sess, BackfillHistoryTable, condition, "get_ptbl_metas")
hPTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskHistoryTable, condition, "get_ptbl_metas")
if err != nil {
return nil, errors.Trace(err)
}

metaMap := make(map[int64]*BackfillJobRangeMeta, len(pTblMs)+len(hPTblMs))
for _, m := range pTblMs {
metaMap[m.PhyTblID] = m
Expand All @@ -506,8 +504,8 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob)

return s.runInTxn(func(se *session) error {
// TODO: Consider batch by batch update backfill jobs and insert backfill history jobs.
bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s",
bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey)), "update_backfill_job")
bJobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID), "update_backfill_job")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -524,7 +522,7 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob)
if err == nil {
for _, bj := range bJobs {
bj.State = model.JobStateCancelled
bj.FinishTS = startTS
bj.StateUpdateTS = startTS
}
err = AddBackfillHistoryJob(se, bJobs)
}
Expand Down
8 changes: 4 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,8 +1374,8 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error {
s := newSession(w.backfillCtx.sessCtx)

return s.runInTxn(func(se *session) error {
jobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d",
bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey), bfJob.ID), "update_backfill_task")
jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%d_%s_%d_%d'",
bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID, bfJob.ID), "update_backfill_task")
if err != nil {
return err
}
Expand All @@ -1391,7 +1391,7 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error {
return err
}
bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease)
return updateBackfillJob(se, BackfillTable, bfJob, "update_backfill_task")
return updateBackfillJob(se, BackgroundSubtaskTable, bfJob, "update_backfill_task")
})
}

Expand All @@ -1402,7 +1402,7 @@ func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error {
if err != nil {
return errors.Trace(err)
}
bfJob.FinishTS = txn.StartTS()
bfJob.StateUpdateTS = txn.StartTS()
err = RemoveBackfillJob(se, false, bfJob)
if err != nil {
return err
Expand Down
Loading