Skip to content

Commit

Permalink
ddl: set jobs dependency by schema and table name (#49699)
Browse files Browse the repository at this point in the history
close #49498
  • Loading branch information
tangenta authored Dec 26, 2023
1 parent 8e0a770 commit 2dfbaa8
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 97 deletions.
2 changes: 2 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"ddl.go",
"ddl_algorithm.go",
"ddl_api.go",
"ddl_running_jobs.go",
"ddl_tiflash_api.go",
"ddl_worker.go",
"ddl_workerpool.go",
Expand Down Expand Up @@ -204,6 +205,7 @@ go_test(
"ddl_algorithm_test.go",
"ddl_api_test.go",
"ddl_error_test.go",
"ddl_running_jobs_test.go",
"ddl_test.go",
"ddl_worker_test.go",
"ddl_workerpool_test.go",
Expand Down
13 changes: 4 additions & 9 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,9 @@ type ddlCtx struct {

*waitSchemaSyncedController
*schemaVersionManager
// recording the running jobs.
runningJobs struct {
sync.RWMutex
ids map[int64]struct{}
}
// It holds the running DDL jobs ID.
runningJobIDs []string

runningJobs *runningJobs

// reorgCtx is used for reorganization.
reorgCtx reorgContexts
// backfillCtx is used for backfill workers.
Expand Down Expand Up @@ -660,15 +656,14 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
autoidCli: opt.AutoIDClient,
schemaVersionManager: newSchemaVersionManager(),
waitSchemaSyncedController: newWaitSchemaSyncedController(),
runningJobIDs: make([]string, 0, jobRecordCapacity),
runningJobs: newRunningJobs(),
}
ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx)
ddlCtx.jobCtx.jobCtxMap = make(map[int64]*JobContext)
ddlCtx.mu.hook = opt.Hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx)
ddlCtx.runningJobs.ids = make(map[int64]struct{})

d := &ddl{
ddlCtx: ddlCtx,
Expand Down
121 changes: 95 additions & 26 deletions pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place
return nil
}

if tb.Meta().TempTableType != model.TempTableNone {
tblInfo := tb.Meta()
if tblInfo.TempTableType != model.TempTableNone {
return errors.Trace(dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("placement"))
}

Expand All @@ -491,9 +492,9 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAlterTablePlacement,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{placementPolicyRef},
Expand Down Expand Up @@ -656,6 +657,10 @@ func (d *ddl) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *RecoverSc
Type: model.ActionRecoverSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{recoverSchemaInfo, recoverCheckFlagNone},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: recoverSchemaInfo.Name.L,
Table: model.InvolvingAll,
}},
}
err := d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -2829,6 +2834,11 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context,
return errors.Trace(fmt.Errorf("except table info"))
}
args = append(args, info)
jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo,
model.InvolvingSchemaInfo{
Database: dbName.L,
Table: info.Name.L,
})
}
if len(args) == 0 {
return nil
Expand Down Expand Up @@ -2893,6 +2903,11 @@ func (d *ddl) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy *mode
Type: model.ActionCreatePlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{policy, onExist == OnExistReplace},
// CREATE PLACEMENT does not affect any schemas or tables.
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -2956,6 +2971,11 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
0, /* commitTS */
variable.On, /* tidb_ttl_job_enable */
[]kv.KeyRange{} /* flashback key_ranges */},
// FLASHBACK CLUSTER affects all schemas and tables.
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingAll,
Table: model.InvolvingAll,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -3889,10 +3909,10 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
if err != nil {
return errors.Trace(err)
}
tbInfo := t.Meta()
var actionType model.ActionType
switch tp {
case autoid.AutoRandomType:
tbInfo := t.Meta()
pkCol := tbInfo.GetPkColInfo()
if tbInfo.AutoRandomBits == 0 || pkCol == nil {
return errors.Trace(dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomRebaseNotApplicable))
Expand Down Expand Up @@ -3926,9 +3946,9 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
TableName: tbInfo.Name.L,
Type: actionType,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newBase, force},
Expand Down Expand Up @@ -3961,14 +3981,15 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
if err != nil {
return errors.Trace(err)
}
if t.Meta().TempTableType != model.TempTableNone {
tbInfo := t.Meta()
if tbInfo.TempTableType != model.TempTableNone {
return dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits")
}
if uVal == t.Meta().ShardRowIDBits {
if uVal == tbInfo.ShardRowIDBits {
// Nothing need to do.
return nil
}
if uVal > 0 && t.Meta().HasClusteredIndex() {
if uVal > 0 && tbInfo.HasClusteredIndex() {
return dbterror.ErrUnsupportedShardRowIDBits
}
err = verifyNoOverflowShardBits(d.sessPool, t, uVal)
Expand All @@ -3978,9 +3999,9 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
job := &model.Job{
Type: model.ActionShardRowID,
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
TableName: tbInfo.Name.L,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{uVal},
}
Expand Down Expand Up @@ -4141,6 +4162,7 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
if err != nil {
return errors.Trace(err)
}
tbInfo := t.Meta()
if err = checkAddColumnTooManyColumns(len(t.Cols()) + 1); err != nil {
return errors.Trace(err)
}
Expand All @@ -4152,16 +4174,16 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
if col == nil {
return nil
}
err = CheckAfterPositionExists(t.Meta(), spec.Position)
err = CheckAfterPositionExists(tbInfo, spec.Position)
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
TableName: tbInfo.Name.L,
Type: model.ActionAddColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col, spec.Position, 0, spec.IfNotExists},
Expand Down Expand Up @@ -5044,6 +5066,10 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{defID, ptSchema.ID, ptMeta.ID, partName, spec.WithValidation},
CtxVars: []interface{}{[]int64{ntSchema.ID, ptSchema.ID}, []int64{ntMeta.ID, ptMeta.ID}},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{Database: ptSchema.Name.L, Table: ptMeta.Name.L},
{Database: ntSchema.Name.L, Table: ntMeta.Name.L},
},
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -6849,6 +6875,10 @@ func (d *ddl) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{schemas[0].ID, newIdent.Name, schemas[0].Name},
CtxVars: []interface{}{[]int64{schemas[0].ID, schemas[1].ID}, []int64{tableID}},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{Database: schemas[0].Name.L, Table: oldIdent.Name.L},
{Database: schemas[1].Name.L, Table: newIdent.Name.L},
},
}

err = d.DoDDLJob(ctx, job)
Expand All @@ -6864,6 +6894,7 @@ func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id
newSchemaIDs := make([]int64, 0, len(oldIdents))
tableIDs := make([]int64, 0, len(oldIdents))
oldSchemaNames := make([]*model.CIStr, 0, len(oldIdents))
involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(oldIdents)*2)

var schemas []*model.DBInfo
var tableID int64
Expand All @@ -6888,16 +6919,22 @@ func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id
oldSchemaIDs = append(oldSchemaIDs, schemas[0].ID)
newSchemaIDs = append(newSchemaIDs, schemas[1].ID)
oldSchemaNames = append(oldSchemaNames, &schemas[0].Name)
involveSchemaInfo = append(involveSchemaInfo, model.InvolvingSchemaInfo{
Database: schemas[0].Name.L, Table: oldIdents[i].Name.L,
}, model.InvolvingSchemaInfo{
Database: schemas[1].Name.L, Table: newIdents[i].Name.L,
})
}

job := &model.Job{
SchemaID: schemas[1].ID,
TableID: tableIDs[0],
SchemaName: schemas[1].Name.L,
Type: model.ActionRenameTables,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames},
CtxVars: []interface{}{append(oldSchemaIDs, newSchemaIDs...), tableIDs},
SchemaID: schemas[1].ID,
TableID: tableIDs[0],
SchemaName: schemas[1].Name.L,
Type: model.ActionRenameTables,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames},
CtxVars: []interface{}{append(oldSchemaIDs, newSchemaIDs...), tableIDs},
InvolvingSchemaInfo: involveSchemaInfo,
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -7901,6 +7938,7 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
SessionID: ctx.GetSessionVars().ConnectionID,
}
uniqueTableID := make(map[int64]struct{})
involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(stmt.TableLocks))
// Check whether the table was already locked by another.
for _, tl := range stmt.TableLocks {
tb := tl.Table
Expand All @@ -7925,6 +7963,10 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
}
uniqueTableID[t.Meta().ID] = struct{}{}
lockTables = append(lockTables, model.TableLockTpInfo{SchemaID: schema.ID, TableID: t.Meta().ID, Tp: tl.Type})
involveSchemaInfo = append(involveSchemaInfo, model.InvolvingSchemaInfo{
Database: schema.Name.L,
Table: t.Meta().Name.L,
})
}

unlockTables := ctx.GetAllTableLocks()
Expand All @@ -7939,6 +7981,8 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
Type: model.ActionLockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{arg},

InvolvingSchemaInfo: involveSchemaInfo,
}
// AddTableLock here is avoiding this job was executed successfully but the session was killed before return.
ctx.AddTableLock(lockTables)
Expand Down Expand Up @@ -8567,6 +8611,10 @@ func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceG
Type: model.ActionCreateResourceGroup,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{groupInfo, false},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8613,6 +8661,10 @@ func (d *ddl) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGr
Type: model.ActionDropResourceGroup,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{groupName},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8665,6 +8717,10 @@ func (d *ddl) AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResource
Type: model.ActionAlterResourceGroup,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newGroupInfo},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8725,6 +8781,10 @@ func (d *ddl) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacemen
Type: model.ActionDropPlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{policyName},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8759,6 +8819,10 @@ func (d *ddl) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacem
Type: model.ActionAlterPlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newPolicyInfo},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8958,8 +9022,9 @@ func (d *ddl) CreateCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constr
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAddCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{constraintInfo},
Expand All @@ -8981,16 +9046,18 @@ func (d *ddl) DropCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrNa
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
}
tblInfo := t.Meta()

constraintInfo := t.Meta().FindConstraintInfoByName(constrName.L)
constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L)
if constraintInfo == nil {
return dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionDropCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{constrName},
Expand All @@ -9011,16 +9078,18 @@ func (d *ddl) AlterCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrN
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
}
tblInfo := t.Meta()

constraintInfo := t.Meta().FindConstraintInfoByName(constrName.L)
constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L)
if constraintInfo == nil {
return dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAlterCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{constrName, enforced},
Expand Down
Loading

0 comments on commit 2dfbaa8

Please sign in to comment.