From 9488ca9db6e96fd004230048626c0efcba84da18 Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 12 Dec 2022 14:34:51 +0800 Subject: [PATCH] *: Add backfill job related tables and operations (#39616) close pingcap/tidb#37122 --- ddl/backfilling.go | 99 ++++++-- ddl/constant.go | 46 ++++ ddl/ddl_test.go | 3 + ddl/ddl_worker.go | 1 + ddl/job_table.go | 224 +++++++++++++++++ ddl/job_table_test.go | 278 ++++++++++++++++++---- executor/infoschema_cluster_table_test.go | 2 +- meta/meta.go | 21 ++ meta/meta_test.go | 6 + parser/model/ddl.go | 34 +++ parser/model/ddl_test.go | 19 ++ session/session.go | 26 +- session/session_test.go | 9 + 13 files changed, 686 insertions(+), 82 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index f20e28f08b1f3..92d5ee943c647 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -38,6 +39,7 @@ import ( "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" @@ -45,19 +47,81 @@ import ( decoder "github.com/pingcap/tidb/util/rowDecoder" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tidb/util/topsql" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) -type backfillWorkerType byte +type backfillerType byte const ( - typeAddIndexWorker backfillWorkerType = 0 - typeUpdateColumnWorker backfillWorkerType = 1 - typeCleanUpIndexWorker backfillWorkerType = 2 - typeAddIndexMergeTmpWorker backfillWorkerType = 3 + typeAddIndexWorker backfillerType = 0 + typeUpdateColumnWorker backfillerType = 1 + typeCleanUpIndexWorker backfillerType = 2 + typeAddIndexMergeTmpWorker backfillerType = 3 + + // InstanceLease is the instance lease. + InstanceLease = 1 * time.Minute ) +func (bT backfillerType) String() string { + switch bT { + case typeAddIndexWorker: + return "add index" + case typeUpdateColumnWorker: + return "update column" + case typeCleanUpIndexWorker: + return "clean up index" + case typeAddIndexMergeTmpWorker: + return "merge temporary index" + default: + return "unknown" + } +} + +// BackfillJob is for a tidb_ddl_backfill table's record. +type BackfillJob struct { + ID int64 + JobID int64 + EleID int64 + EleKey []byte + Tp backfillerType + State model.JobState + StoreID int64 + InstanceID string + InstanceLease types.Time + // range info + CurrKey []byte + StartKey []byte + EndKey []byte + + StartTS uint64 + FinishTS uint64 + RowCount int64 + Meta *model.BackfillMeta +} + +// AbbrStr returns the BackfillJob's info without the Meta info. +func (bj *BackfillJob) AbbrStr() string { + return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s", + bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease) +} + +// GetOracleTime returns the current time from TS. +func GetOracleTime(se *session) (time.Time, error) { + txn, err := se.Txn(true) + if err != nil { + return time.Time{}, err + } + return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil +} + +// GetLeaseGoTime returns a types.Time by adding a lease. +func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time { + leaseTime := currTime.Add(lease) + return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp) +} + // By now the DDL jobs that need backfilling include: // 1: add-index // 2: modify-column-type @@ -110,21 +174,6 @@ const ( // Instead, it is divided into batches, each time a kv transaction completes the backfilling // of a partial batch. -func (bWT backfillWorkerType) String() string { - switch bWT { - case typeAddIndexWorker: - return "add index" - case typeUpdateColumnWorker: - return "update column" - case typeCleanUpIndexWorker: - return "clean up index" - case typeAddIndexMergeTmpWorker: - return "merge temporary index" - default: - return "unknown" - } -} - type backfiller interface { BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) AddMetricInfo(float64) @@ -191,13 +240,13 @@ type backfillWorker struct { resultCh chan *backfillResult table table.Table priority int - tp backfillWorkerType + tp backfillerType ctx context.Context cancel func() } func newBackfillWorker(ctx context.Context, sessCtx sessionctx.Context, id int, t table.PhysicalTable, - reorgInfo *reorgInfo, tp backfillWorkerType) *backfillWorker { + reorgInfo *reorgInfo, tp backfillerType) *backfillWorker { bfCtx, cancel := context.WithCancel(ctx) return &backfillWorker{ id: id, @@ -611,7 +660,7 @@ type backfillScheduler struct { ctx context.Context reorgInfo *reorgInfo sessPool *sessionPool - tp backfillWorkerType + tp backfillerType tbl table.PhysicalTable decodeColMap map[int64]decoder.Column jobCtx *JobContext @@ -628,7 +677,7 @@ type backfillScheduler struct { const backfillTaskChanSize = 1024 func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool, - tp backfillWorkerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column, + tp backfillerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column, jobCtx *JobContext) *backfillScheduler { return &backfillScheduler{ ctx: ctx, @@ -805,7 +854,7 @@ func (b *backfillScheduler) Close() { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { +func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { job := reorgInfo.Job totalAddedCount := job.GetRowCount() diff --git a/ddl/constant.go b/ddl/constant.go index bf4d69fb8fd33..3fe6bf4a04ee6 100644 --- a/ddl/constant.go +++ b/ddl/constant.go @@ -25,6 +25,10 @@ const ( ReorgTable = "tidb_ddl_reorg" // HistoryTable stores the history DDL jobs. HistoryTable = "tidb_ddl_history" + // BackfillTable stores the information of backfill jobs. + BackfillTable = "tidb_ddl_backfill" + // BackfillHistoryTable stores the information of history backfill jobs. + BackfillHistoryTable = "tidb_ddl_backfill_history" // JobTableID is the table ID of `tidb_ddl_job`. JobTableID = meta.MaxInt48 - 1 @@ -34,6 +38,10 @@ const ( HistoryTableID = meta.MaxInt48 - 3 // MDLTableID is the table ID of `tidb_mdl_info`. MDLTableID = meta.MaxInt48 - 4 + // BackfillTableID is the table ID of `tidb_ddl_backfill`. + BackfillTableID = meta.MaxInt48 - 5 + // BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`. + BackfillHistoryTableID = meta.MaxInt48 - 6 // JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`. JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))" @@ -41,4 +49,42 @@ const ( ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))" // HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`. HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))" + // BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`. + BackfillTableSQL = "create table " + BackfillTable + `( + id bigint not null, + ddl_job_id bigint not null, + ele_id bigint not null, + ele_key blob, + store_id bigint, + type int, + exec_id blob default null, + exec_lease Time, + state int, + curr_key blob, + start_key blob, + end_key blob, + start_ts bigint, + finish_ts bigint, + row_count bigint, + backfill_meta longblob, + unique key(ddl_job_id, ele_id, ele_key(20), id))` + // BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`. + BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `( + id bigint not null, + ddl_job_id bigint not null, + ele_id bigint not null, + ele_key blob, + store_id bigint, + type int, + exec_id blob default null, + exec_lease Time, + state int, + curr_key blob, + start_key blob, + end_key blob, + start_ts bigint, + finish_ts bigint, + row_count bigint, + backfill_meta longblob, + unique key(ddl_job_id, ele_id, ele_key(20), id))` ) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 9760608674c6f..a2e75119e4d12 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -56,6 +56,9 @@ func (d *ddl) SetInterceptor(i Interceptor) { // JobNeedGCForTest is only used for test. var JobNeedGCForTest = jobNeedGC +// NewSession is only used for test. +var NewSession = newSession + // GetMaxRowID is used for test. func GetMaxRowID(store kv.Storage, priority int, t table.Table, startHandle, endHandle kv.Key) (kv.Key, error) { return getRangeEndKey(NewJobContext(), store, priority, t.RecordPrefix(), startHandle, endHandle) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 9f90ad1039525..8621dcb08361c 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -425,6 +425,7 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error { jobTasks[i] = job injectModifyJobArgFailPoint(job) } + sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) err = insertDDLJobs2Table(newSession(sess), true, jobTasks...) } diff --git a/ddl/job_table.go b/ddl/job_table.go index 5bd702232a9f6..a6e19b7f7edf0 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -496,6 +497,229 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) { return jobs, nil } +// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. +func AddBackfillJobs(sess *session, backfillJobs []*BackfillJob) error { + return addBackfillJobs(sess, BackfillTable, backfillJobs) +} + +// AddBackfillHistoryJob adds the backfill jobs to the tidb_ddl_backfill_history table. +func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error { + return addBackfillJobs(sess, BackfillHistoryTable, backfillJobs) +} + +// addBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. +func addBackfillJobs(sess *session, tableName string, backfillJobs []*BackfillJob) error { + sqlPrefix := fmt.Sprintf( + "insert into mysql.%s(id, ddl_job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values", tableName) + var sql string + label := fmt.Sprintf("add_%s_job", tableName) + // Do runInTxn to get StartTS. + return runInTxn(newSession(sess), func(se *session) error { + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + + startTS := txn.StartTS() + for i, bj := range backfillJobs { + if tableName == BackfillTable { + bj.StartTS = startTS + } + if tableName == BackfillHistoryTable { + bj.FinishTS = startTS + } + mateByte, err := bj.Meta.Encode() + if err != nil { + return errors.Trace(err) + } + + if i == 0 { + sql = sqlPrefix + fmt.Sprintf("(%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", + bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, + bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + continue + } + sql += fmt.Sprintf(", (%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", + bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, + bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + } + _, err = sess.execute(context.Background(), sql, label) + return errors.Trace(err) + }) +} + +// GetBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element. +func GetBackfillJobsForOneEle(sess *session, batch int, excludedJobIDs []int64, lease time.Duration) ([]*BackfillJob, error) { + eJobIDsBuilder := strings.Builder{} + for i, id := range excludedJobIDs { + if i == 0 { + eJobIDsBuilder.WriteString(" and ddl_job_id not in (") + } + eJobIDsBuilder.WriteString(strconv.Itoa(int(id))) + if i == len(excludedJobIDs)-1 { + eJobIDsBuilder.WriteString(")") + } else { + eJobIDsBuilder.WriteString(", ") + } + } + + var err error + var bJobs []*BackfillJob + s := newSession(sess) + err = runInTxn(s, func(se *session) error { + currTime, err := GetOracleTime(s) + if err != nil { + return err + } + + bJobs, err = GetBackfillJobs(sess, BackfillTable, + fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') %s order by ddl_job_id, ele_key, ele_id limit %d", + currTime.Add(-lease), eJobIDsBuilder.String(), batch), "get_backfill_job") + return err + }) + if err != nil || len(bJobs) == 0 { + return nil, err + } + + validLen := 1 + firstJobID, firstEleID, firstEleKey := bJobs[0].JobID, bJobs[0].EleID, bJobs[0].EleKey + for i := 1; i < len(bJobs); i++ { + if bJobs[i].JobID != firstJobID || bJobs[i].EleID != firstEleID || !bytes.Equal(bJobs[i].EleKey, firstEleKey) { + break + } + validLen++ + } + + return bJobs[:validLen], nil +} + +// GetAndMarkBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element, +// and update these jobs with instance ID and lease. +func GetAndMarkBackfillJobsForOneEle(sess *session, batch int, jobID int64, uuid string, lease time.Duration) ([]*BackfillJob, error) { + var validLen int + var bJobs []*BackfillJob + s := newSession(sess) + err := runInTxn(s, func(se *session) error { + currTime, err := GetOracleTime(se) + if err != nil { + return err + } + + bJobs, err = GetBackfillJobs(sess, BackfillTable, + fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_key, ele_id limit %d", + currTime.Add(-lease), jobID, batch), "get_mark_backfill_job") + if err != nil { + return err + } + if len(bJobs) == 0 { + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job, lease is timeout") + } + + validLen = 0 + firstJobID, firstEleID, firstEleKey := bJobs[0].JobID, bJobs[0].EleID, bJobs[0].EleKey + for i := 0; i < len(bJobs); i++ { + if bJobs[i].JobID != firstJobID || bJobs[i].EleID != firstEleID || !bytes.Equal(bJobs[i].EleKey, firstEleKey) { + break + } + validLen++ + + bJobs[i].InstanceID = uuid + bJobs[i].InstanceLease = GetLeaseGoTime(currTime, lease) + // TODO: batch update + if err = updateBackfillJob(sess, BackfillTable, bJobs[i], "get_mark_backfill_job"); err != nil { + return err + } + } + return nil + }) + if validLen == 0 { + return nil, err + } + + return bJobs[:validLen], err +} + +// GetInterruptedBackfillJobsForOneEle gets the interrupted backfill jobs in the tblName table that contains only one element. +func GetInterruptedBackfillJobsForOneEle(sess *session, jobID, eleID int64, eleKey []byte) ([]*BackfillJob, error) { + bJobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' and (state = %d or state = %d)", + jobID, eleID, eleKey, model.JobStateRollingback, model.JobStateCancelling), "get_interrupt_backfill_job") + if err != nil || len(bJobs) == 0 { + return nil, err + } + return bJobs, nil +} + +// GetBackfillJobCount gets the number of rows in the tblName table according to condition. +func GetBackfillJobCount(sess *session, tblName, condition string, label string) (int, error) { + rows, err := sess.execute(context.Background(), fmt.Sprintf("select count(1) from mysql.%s where %s", tblName, condition), label) + if err != nil { + return 0, errors.Trace(err) + } + if len(rows) == 0 { + return 0, dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get wrong result cnt:%d", len(rows))) + } + + return int(rows[0].GetInt64(0)), nil +} + +// GetBackfillJobs gets the backfill jobs in the tblName table according to condition. +func GetBackfillJobs(sess *session, tblName, condition string, label string) ([]*BackfillJob, error) { + rows, err := sess.execute(context.Background(), fmt.Sprintf("select * from mysql.%s where %s", tblName, condition), label) + if err != nil { + return nil, errors.Trace(err) + } + bJobs := make([]*BackfillJob, 0, len(rows)) + for _, row := range rows { + bJob := BackfillJob{ + ID: row.GetInt64(0), + JobID: row.GetInt64(1), + EleID: row.GetInt64(2), + EleKey: row.GetBytes(3), + StoreID: row.GetInt64(4), + Tp: backfillerType(row.GetInt64(5)), + InstanceID: row.GetString(6), + InstanceLease: row.GetTime(7), + State: model.JobState(row.GetInt64(8)), + CurrKey: row.GetBytes(9), + StartKey: row.GetBytes(10), + EndKey: row.GetBytes(11), + StartTS: row.GetUint64(12), + FinishTS: row.GetUint64(13), + RowCount: row.GetInt64(14), + } + bJob.Meta = &model.BackfillMeta{} + err = bJob.Meta.Decode(row.GetBytes(15)) + if err != nil { + return nil, errors.Trace(err) + } + bJobs = append(bJobs, &bJob) + } + return bJobs, nil +} + +// RemoveBackfillJob removes the backfill jobs from the tidb_ddl_backfill table. +// If isOneEle is true, removes all jobs with backfillJob's ddl_job_id, ele_id and ele_key. Otherwise, removes the backfillJob. +func RemoveBackfillJob(sess *session, isOneEle bool, backfillJob *BackfillJob) error { + sql := fmt.Sprintf("delete from mysql.tidb_ddl_backfill where ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + backfillJob.JobID, backfillJob.EleID, backfillJob.EleKey) + if !isOneEle { + sql += fmt.Sprintf(" and id = %d", backfillJob.ID) + } + _, err := sess.execute(context.Background(), sql, "remove_backfill_job") + return err +} + +func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob, label string) error { + mate, err := backfillJob.Meta.Encode() + if err != nil { + return err + } + sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_lease = '%s', state = %d, backfill_meta = '%s' where ddl_job_id = %d and ele_id = %d and ele_key = '%s' and id = %d", + tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State, mate, backfillJob.JobID, backfillJob.EleID, backfillJob.EleKey, backfillJob.ID) + _, err = sess.execute(context.Background(), sql, label) + return err +} + // MoveJobFromQueue2Table move existing DDLs in queue to table. func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error { sess, err := d.sessPool.get() diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 25d8d4300eacd..ca30cf903107d 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -15,16 +15,23 @@ package ddl_test import ( + "context" + "fmt" "sync" "testing" "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/dbterror" "github.com/stretchr/testify/require" "golang.org/x/exp/slices" ) @@ -178,67 +185,236 @@ func check(t *testing.T, record []int64, ids ...int64) { } } -func TestAlwaysChoiceProcessingJob(t *testing.T) { - if !variable.EnableConcurrentDDL.Load() { - t.Skipf("test requires concurrent ddl") +func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query string) []*ddl.BackfillJob { + bJobs := make([]*ddl.BackfillJob, 0, cnt) + for i := 0; i < cnt; i++ { + sKey := []byte(fmt.Sprintf("%d", i)) + eKey := []byte(fmt.Sprintf("%d", i+1)) + bm := &model.BackfillMeta{ + EndInclude: true, + JobMeta: &model.JobMeta{ + SchemaID: schemaID, + TableID: tblID, + Query: query, + }, + } + bj := &ddl.BackfillJob{ + ID: int64(i), + JobID: jobID, + EleID: eleID, + EleKey: meta.IndexElementKey, + State: model.JobStateNone, + CurrKey: sKey, + StartKey: sKey, + EndKey: eKey, + Meta: bm, + } + bJobs = append(bJobs, bj) } - store, dom := testkit.CreateMockStoreAndDomain(t) + return bJobs +} - d := dom.DDL() +func equalBackfillJob(t *testing.T, a, b *ddl.BackfillJob, lessTime types.Time) { + require.Equal(t, a.ID, b.ID) + require.Equal(t, a.JobID, b.JobID) + require.Equal(t, a.EleID, b.EleID) + require.Equal(t, a.EleKey, b.EleKey) + require.Equal(t, a.StoreID, b.StoreID) + require.Equal(t, a.InstanceID, b.InstanceID) + require.GreaterOrEqual(t, b.InstanceLease.Compare(lessTime), 0) + require.Equal(t, a.State, b.State) + require.Equal(t, a.Meta, b.Meta) +} + +func getIdxConditionStr(jobID, eleID int64) string { + return fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + jobID, eleID, meta.IndexElementKey) +} +func readInTxn(se sessionctx.Context, f func(sessionctx.Context)) (err error) { + err = sessiontxn.NewTxn(context.Background(), se) + if err != nil { + return err + } + f(se) + se.RollbackTxn(context.Background()) + return nil +} + +func TestSimpleExecBackfillJobs(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - tk.MustExec("create table t(a int, b int)") + d := dom.DDL() + se := ddl.NewSession(tk.Session()) - ddlJobs := []string{ - "alter table t add index idx(a)", - "alter table t add index idx(b)", + jobID1 := int64(2) + jobID2 := int64(3) + eleID1 := int64(4) + eleID2 := int64(5) + uuid := d.GetID() + instanceLease := ddl.InstanceLease + // test no backfill job + bJobs, err := ddl.GetBackfillJobsForOneEle(se, 1, []int64{jobID1, jobID2}, instanceLease) + require.NoError(t, err) + require.Nil(t, bJobs) + bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) + require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job, lease is timeout").Error()) + require.Nil(t, bJobs) + allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + jobID1, eleID2, meta.IndexElementKey), "check_backfill_job_count") + require.NoError(t, err) + require.Equal(t, allCnt, 0) + // Test some backfill jobs, add backfill jobs to the table. + cnt := 2 + bjTestCases := make([]*ddl.BackfillJob, 0, cnt*2) + bJobs1 := makeAddIdxBackfillJobs(1, 2, jobID1, eleID1, cnt, "alter table add index idx(a)") + bJobs2 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID2, cnt, "alter table add index idx(b)") + bJobs3 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID1, cnt, "alter table add index idx(c)") + bjTestCases = append(bjTestCases, bJobs1...) + bjTestCases = append(bjTestCases, bJobs2...) + bjTestCases = append(bjTestCases, bJobs3...) + err = ddl.AddBackfillJobs(se, bjTestCases) + // ID jobID eleID InstanceID + // ------------------------------------- + // 0 jobID1 eleID1 uuid + // 1 jobID1 eleID1 "" + // 0 jobID2 eleID2 "" + // 1 jobID2 eleID2 "" + // 0 jobID2 eleID1 "" + // 1 jobID2 eleID1 "" + require.NoError(t, err) + // test get some backfill jobs + bJobs, err = ddl.GetBackfillJobsForOneEle(se, 1, []int64{jobID2 - 1, jobID2 + 1}, instanceLease) + require.NoError(t, err) + require.Len(t, bJobs, 1) + expectJob := bjTestCases[4] + if expectJob.ID != bJobs[0].ID { + expectJob = bjTestCases[5] } + require.Equal(t, expectJob, bJobs[0]) + previousTime, err := ddl.GetOracleTime(se) + require.EqualError(t, err, "[kv:8024]invalid transaction") + readInTxn(se, func(sessionctx.Context) { + previousTime, err = ddl.GetOracleTime(se) + require.NoError(t, err) + }) - hook := &ddl.TestDDLCallback{} - var wg util.WaitGroupWrapper - wg.Add(1) - var once sync.Once - var idxa, idxb int64 - hook.OnGetJobBeforeExported = func(jobType string) { - once.Do(func() { - var jobs []*model.Job - for i, job := range ddlJobs { - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - recordSet, _ := tk.Exec(job) - if recordSet != nil { - require.NoError(t, recordSet.Close()) - } - }) - for { - time.Sleep(time.Millisecond * 100) - var err error - jobs, err = ddl.GetAllDDLJobs(testkit.NewTestKit(t, store).Session(), nil) - require.NoError(t, err) - if len(jobs) == i+1 { - break - } - } - } - idxa = jobs[0].ID - idxb = jobs[1].ID - require.Greater(t, idxb, idxa) - tk := testkit.NewTestKit(t, store) - tk.MustExec("update mysql.tidb_ddl_job set processing = 1 where job_id = ?", idxb) - wg.Done() - }) + bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID2, uuid, instanceLease) + require.NoError(t, err) + require.Len(t, bJobs, 1) + expectJob = bjTestCases[4] + if expectJob.ID != bJobs[0].ID { + expectJob = bjTestCases[5] } + expectJob.InstanceID = uuid + equalBackfillJob(t, expectJob, bJobs[0], ddl.GetLeaseGoTime(previousTime, instanceLease)) + var currTime time.Time + readInTxn(se, func(sessionctx.Context) { + currTime, err = ddl.GetOracleTime(se) + require.NoError(t, err) + }) + currGoTime := ddl.GetLeaseGoTime(currTime, instanceLease) + require.GreaterOrEqual(t, currGoTime.Compare(bJobs[0].InstanceLease), 0) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, cnt) - record := make([]int64, 0, 16) - hook.OnGetJobAfterExported = func(jobType string, job *model.Job) { - // record the job schedule order - record = append(record, job.ID) - } + // remove a backfill job + err = ddl.RemoveBackfillJob(se, false, bJobs1[0]) + // ID jobID eleID + // ------------------------ + // 1 jobID1 eleID1 + // 0 jobID2 eleID2 + // 1 jobID2 eleID2 + // 0 jobID2 eleID1 + // 1 jobID2 eleID1 + require.NoError(t, err) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 1) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, cnt) + // remove all backfill jobs + err = ddl.RemoveBackfillJob(se, true, bJobs2[0]) + // ID jobID eleID + // ------------------------ + // 1 jobID1 eleID1 + // 0 jobID2 eleID1 + // 1 jobID2 eleID1 + require.NoError(t, err) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 1) + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + require.NoError(t, err) + require.Equal(t, allCnt, 0) + // clean backfill job + err = ddl.RemoveBackfillJob(se, true, bJobs1[1]) + require.NoError(t, err) + err = ddl.RemoveBackfillJob(se, true, bJobs3[0]) + require.NoError(t, err) + // ID jobID eleID + // ------------------------ - d.SetHook(hook) - wg.Wait() + // test history backfill jobs + err = ddl.AddBackfillHistoryJob(se, []*ddl.BackfillJob{bJobs2[0]}) + require.NoError(t, err) + // ID jobID eleID + // ------------------------ + // 0 jobID2 eleID2 + readInTxn(se, func(sessionctx.Context) { + currTime, err = ddl.GetOracleTime(se) + require.NoError(t, err) + }) + condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d order by ddl_job_id", currTime.Add(-instanceLease), jobID1) + bJobs, err = ddl.GetBackfillJobs(se, ddl.BackfillHistoryTable, condition, "test_get_bj") + require.NoError(t, err) + require.Len(t, bJobs, 1) + require.Greater(t, bJobs[0].FinishTS, uint64(0)) + + // test GetInterruptedBackfillJobsForOneEle + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + require.NoError(t, err) + require.Nil(t, bJobs) + // ID jobID eleID + // ------------------------ + // 0 jobID1 eleID1 + // 1 jobID1 eleID1 + // 0 jobID2 eleID2 + // 1 jobID2 eleID2 + err = ddl.AddBackfillJobs(se, bjTestCases) + require.NoError(t, err) + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + require.NoError(t, err) + require.Nil(t, bJobs) + bJobs1[0].State = model.JobStateRollingback + bJobs1[0].ID = 2 + bJobs1[0].InstanceID = uuid + bJobs1[1].State = model.JobStateCancelling + bJobs1[1].ID = 3 + err = ddl.AddBackfillJobs(se, bJobs1) + require.NoError(t, err) + // ID jobID eleID state + // -------------------------------- + // 0 jobID1 eleID1 JobStateNone + // 1 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID2 JobStateNone + // 1 jobID2 eleID2 JobStateNone + // 0 jobID2 eleID1 JobStateNone + // 1 jobID2 eleID1 JobStateNone + // 2 jobID1 eleID1 JobStateRollingback + // 3 jobID1 eleID1 JobStateCancelling + bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, meta.IndexElementKey) + require.NoError(t, err) + require.Len(t, bJobs, 2) + equalBackfillJob(t, bJobs1[0], bJobs[0], types.ZeroTime) + equalBackfillJob(t, bJobs1[1], bJobs[1], types.ZeroTime) - check(t, record, idxb, idxa) + // test the BackfillJob's AbbrStr + require.Equal(t, fmt.Sprintf("ID:2, JobID:2, EleID:4, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr()) + require.Equal(t, "ID:3, JobID:2, EleID:4, Type:add index, State:cancelling, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) + require.Equal(t, "ID:0, JobID:3, EleID:5, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs2[0].AbbrStr()) + require.Equal(t, "ID:1, JobID:3, EleID:5, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs2[1].AbbrStr()) } diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index bdf7ac14235e5..5729af8de9dd1 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - result := 42 + result := 44 require.Len(t, rows, result) // More tests about the privileges. diff --git a/meta/meta.go b/meta/meta.go index 15c8869767e0d..9f262be8b464d 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -954,6 +954,27 @@ func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) { return tableInfo, errors.Trace(err) } +// CheckTableExists checks if the table is existed with dbID and tableID. +func (m *Meta) CheckTableExists(dbID int64, tableID int64) (bool, error) { + // Check if db exists. + dbKey := m.dbKey(dbID) + if err := m.checkDBExists(dbKey); err != nil { + return false, errors.Trace(err) + } + + // Check if table exists. + tableKey := m.tableKey(tableID) + v, err := m.txn.HGet(dbKey, tableKey) + if err != nil { + return false, errors.Trace(err) + } + if v != nil { + return true, nil + } + + return false, nil +} + // DDL job structure // DDLJobList: list jobs // DDLJobHistory: hash diff --git a/meta/meta_test.go b/meta/meta_test.go index 0529fdbcc5eda..28603f76189ad 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -252,10 +252,16 @@ func TestMeta(t *testing.T) { table, err := m.GetTable(1, 1) require.NoError(t, err) require.Equal(t, tbInfo, table) + tblExist, err := m.CheckTableExists(1, 1) + require.NoError(t, err) + require.Equal(t, true, tblExist) table, err = m.GetTable(1, 2) require.NoError(t, err) require.Nil(t, table) + tblExist, err = m.CheckTableExists(1, 2) + require.NoError(t, err) + require.Equal(t, false, tblExist) tbInfo2 := &model.TableInfo{ ID: 2, diff --git a/parser/model/ddl.go b/parser/model/ddl.go index c9b36a9e9ef3a..7bb9eaef01e6f 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -407,6 +407,40 @@ func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) { sub.SchemaVer = ver } +// JobMeta is meta info of Job. +type JobMeta struct { + SchemaID int64 `json:"schema_id"` + TableID int64 `json:"table_id"` + // Query string of the ddl job. + Query string `json:"query"` + // Priority is only used to set the operation priority of adding indices. + Priority int `json:"priority"` +} + +// BackfillMeta is meta info of the backfill job. +type BackfillMeta struct { + EndInclude bool `json:"end_include"` + ErrMsg string `json:"err_msg"` + + SQLMode mysql.SQLMode `json:"sql_mode"` + Warnings map[errors.ErrorID]*terror.Error `json:"warnings"` + WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"` + Location *TimeZoneLocation `json:"location"` + *JobMeta `json:"job_meta"` +} + +// Encode encodes BackfillMeta with json format. +func (bm *BackfillMeta) Encode() ([]byte, error) { + b, err := json.Marshal(bm) + return b, errors.Trace(err) +} + +// Decode decodes BackfillMeta from the json buffer. +func (bm *BackfillMeta) Decode(b []byte) error { + err := json.Unmarshal(b, bm) + return errors.Trace(err) +} + // Job is for a DDL operation. type Job struct { ID int64 `json:"id"` diff --git a/parser/model/ddl_test.go b/parser/model/ddl_test.go index 7fddcca54e351..d67b6ac91175a 100644 --- a/parser/model/ddl_test.go +++ b/parser/model/ddl_test.go @@ -51,3 +51,22 @@ func TestJobSize(t *testing.T) { job := model.Job{} require.Equal(t, 288, int(unsafe.Sizeof(job)), msg) } + +func TestBackfillMetaCodec(t *testing.T) { + jm := &model.JobMeta{ + SchemaID: 1, + TableID: 2, + Query: "alter table t add index idx(a)", + Priority: 1, + } + bm := &model.BackfillMeta{ + EndInclude: true, + ErrMsg: "has a err", + JobMeta: jm, + } + bmBytes, err := bm.Encode() + require.NoError(t, err) + bmRet := &model.BackfillMeta{} + bmRet.Decode(bmBytes) + require.Equal(t, bm, bmRet) +} diff --git a/session/session.go b/session/session.go index d5714d0c741d3..df01e592f7506 100644 --- a/session/session.go +++ b/session/session.go @@ -3084,6 +3084,14 @@ var ( {ddl.ReorgTableSQL, ddl.ReorgTableID}, {ddl.HistoryTableSQL, ddl.HistoryTableID}, } + // BackfillTables is a list of tables definitions used in dist reorg DDL. + BackfillTables = []struct { + SQL string + id int64 + }{ + {ddl.BackfillTableSQL, ddl.BackfillTableID}, + {ddl.BackfillHistoryTableSQL, ddl.BackfillHistoryTableID}, + } mdlTable = "create table mysql.tidb_mdl_info(job_id BIGINT NOT NULL PRIMARY KEY, version BIGINT NOT NULL, table_ids text(65535));" ) @@ -3101,25 +3109,33 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) { } } -// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history. +// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_ddl_backfill and tidb_ddl_backfill_history. func InitDDLJobTables(store kv.Storage) error { return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) exists, err := t.CheckDDLTableExists() - if err != nil || exists { + if err != nil { return errors.Trace(err) } dbID, err := t.CreateMySQLDatabaseIfNotExists() if err != nil { return err } - tableIDs := make([]int64, 0, len(DDLJobTables)) - for _, tbl := range DDLJobTables { + tables := append(DDLJobTables, BackfillTables...) + if exists { + tblExist, err := t.CheckTableExists(dbID, BackfillTables[0].id) + if err != nil || tblExist { + return errors.Trace(err) + } + tables = BackfillTables + } + tableIDs := make([]int64, 0, len(tables)) + for _, tbl := range tables { tableIDs = append(tableIDs, tbl.id) } splitAndScatterTable(store, tableIDs) p := parser.New() - for _, tbl := range DDLJobTables { + for _, tbl := range tables { stmt, err := p.ParseOneStmt(tbl.SQL, "", "") if err != nil { return errors.Trace(err) diff --git a/session/session_test.go b/session/session_test.go index 1c7c8f2ba7611..530aee66d4ead 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -73,4 +73,13 @@ func TestMetaTableRegion(t *testing.T) { require.Equal(t, ddlJobTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.JobTableID)) require.NotEqual(t, ddlJobTableRegionID, ddlReorgTableRegionID) + + ddlBackfillTableRegionID := tk.MustQuery("show table mysql.tidb_ddl_backfill regions").Rows()[0][0] + ddlBackfillTableRegionStartKey := tk.MustQuery("show table mysql.tidb_ddl_backfill regions").Rows()[0][1] + require.Equal(t, ddlBackfillTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackfillTableID)) + ddlBackfillHistoryTableRegionID := tk.MustQuery("show table mysql.tidb_ddl_backfill_history regions").Rows()[0][0] + ddlBackfillHistoryTableRegionStartKey := tk.MustQuery("show table mysql.tidb_ddl_backfill_history regions").Rows()[0][1] + require.Equal(t, ddlBackfillHistoryTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackfillHistoryTableID)) + + require.NotEqual(t, ddlBackfillTableRegionID, ddlBackfillHistoryTableRegionID) }