Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add backfill job related tables and operations #39616

Merged
merged 17 commits into from
Dec 12, 2022
Merged
99 changes: 74 additions & 25 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,97 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tidb/util/topsql"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

type backfillWorkerType byte
type backfillerType byte

const (
typeAddIndexWorker backfillWorkerType = 0
typeUpdateColumnWorker backfillWorkerType = 1
typeCleanUpIndexWorker backfillWorkerType = 2
typeAddIndexMergeTmpWorker backfillWorkerType = 3
typeAddIndexWorker backfillerType = 0
typeUpdateColumnWorker backfillerType = 1
typeCleanUpIndexWorker backfillerType = 2
typeAddIndexMergeTmpWorker backfillerType = 3

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
)

func (bWT backfillerType) String() string {
zimulala marked this conversation as resolved.
Show resolved Hide resolved
switch bWT {
case typeAddIndexWorker:
return "add index"
case typeUpdateColumnWorker:
return "update column"
case typeCleanUpIndexWorker:
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
default:
return "unknown"
}
}

// BackfillJob is for a tidb_ddl_backfill table's record.
type BackfillJob struct {
ID int64
JobID int64
EleID int64
EleKey []byte
Tp backfillerType
State model.JobState
StoreID int64
InstanceID string
InstanceLease types.Time
// range info
CurrKey []byte
StartKey []byte
EndKey []byte

StartTS uint64
FinishTS uint64
RowCount int64
Meta *model.BackfillMeta
}

// AbbrStr returns the BackfillJob's info without the Meta info.
func (bj *BackfillJob) AbbrStr() string {
return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s",
bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease)
}

// GetOracleTime returns the current time from TS.
func GetOracleTime(se *session) (time.Time, error) {
txn, err := se.Txn(true)
if err != nil {
return time.Time{}, err
}
return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil
}

// GetLeaseGoTime returns a types.Time by adding a lease.
func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
leaseTime := currTime.Add(lease)
return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp)
}

// By now the DDL jobs that need backfilling include:
// 1: add-index
// 2: modify-column-type
Expand Down Expand Up @@ -110,21 +174,6 @@ const (
// Instead, it is divided into batches, each time a kv transaction completes the backfilling
// of a partial batch.

func (bWT backfillWorkerType) String() string {
switch bWT {
case typeAddIndexWorker:
return "add index"
case typeUpdateColumnWorker:
return "update column"
case typeCleanUpIndexWorker:
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
default:
return "unknown"
}
}

type backfiller interface {
BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error)
AddMetricInfo(float64)
Expand Down Expand Up @@ -191,13 +240,13 @@ type backfillWorker struct {
resultCh chan *backfillResult
table table.Table
priority int
tp backfillWorkerType
tp backfillerType
ctx context.Context
cancel func()
}

func newBackfillWorker(ctx context.Context, sessCtx sessionctx.Context, id int, t table.PhysicalTable,
reorgInfo *reorgInfo, tp backfillWorkerType) *backfillWorker {
reorgInfo *reorgInfo, tp backfillerType) *backfillWorker {
bfCtx, cancel := context.WithCancel(ctx)
return &backfillWorker{
id: id,
Expand Down Expand Up @@ -611,7 +660,7 @@ type backfillScheduler struct {
ctx context.Context
reorgInfo *reorgInfo
sessPool *sessionPool
tp backfillWorkerType
tp backfillerType
tbl table.PhysicalTable
decodeColMap map[int64]decoder.Column
jobCtx *JobContext
Expand All @@ -628,7 +677,7 @@ type backfillScheduler struct {
const backfillTaskChanSize = 1024

func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool,
tp backfillWorkerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column,
tp backfillerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column,
jobCtx *JobContext) *backfillScheduler {
return &backfillScheduler{
ctx: ctx,
Expand Down Expand Up @@ -805,7 +854,7 @@ func (b *backfillScheduler) Close() {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

Expand Down
46 changes: 46 additions & 0 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// BackfillTable stores the information of backfill jobs.
BackfillTable = "tidb_ddl_backfill"
// BackfillHistoryTable stores the information of history backfill jobs.
BackfillHistoryTable = "tidb_ddl_backfill_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand All @@ -34,11 +38,53 @@ const (
HistoryTableID = meta.MaxInt48 - 3
// MDLTableID is the table ID of `tidb_mdl_info`.
MDLTableID = meta.MaxInt48 - 4
// BackfillTableID is the table ID of `tidb_ddl_backfill`.
BackfillTableID = meta.MaxInt48 - 5
// BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`.
BackfillHistoryTableID = meta.MaxInt48 - 6

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))"
// ReorgTableSQL is the CREATE TABLE SQL of `tidb_ddl_reorg`.
ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))"
// HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`.
HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))"
// BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`.
BackfillTableSQL = "create table " + BackfillTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
type int,
exec_id blob default null,
exec_lease Time,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
// BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`.
BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `(
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
type int,
exec_id blob default null,
exec_lease Time,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
)
3 changes: 3 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (d *ddl) SetInterceptor(i Interceptor) {
// JobNeedGCForTest is only used for test.
var JobNeedGCForTest = jobNeedGC

// NewSession is only used for test.
var NewSession = newSession
zimulala marked this conversation as resolved.
Show resolved Hide resolved

// GetMaxRowID is used for test.
func GetMaxRowID(store kv.Storage, priority int, t table.Table, startHandle, endHandle kv.Key) (kv.Key, error) {
return getRangeEndKey(NewJobContext(), store, priority, t.RecordPrefix(), startHandle, endHandle)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
jobTasks[i] = job
injectModifyJobArgFailPoint(job)
}

sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err = insertDDLJobs2Table(newSession(sess), true, jobTasks...)
}
Expand Down
Loading