From e034d0bf899cfa4d5cecb065d060364512be4cf6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 19 Feb 2024 22:52:55 +0800 Subject: [PATCH] ddl: set jobs dependency by schema and table name (#49699) (#49784) close pingcap/tidb#49498 --- pkg/ddl/BUILD.bazel | 2 + pkg/ddl/ddl.go | 13 +--- pkg/ddl/ddl_api.go | 121 ++++++++++++++++++++++++------- pkg/ddl/ddl_api_test.go | 68 +++++++++++++++++ pkg/ddl/ddl_running_jobs.go | 113 +++++++++++++++++++++++++++++ pkg/ddl/ddl_running_jobs_test.go | 112 ++++++++++++++++++++++++++++ pkg/ddl/ddl_worker.go | 1 + pkg/ddl/job_table.go | 85 ++++++---------------- pkg/ddl/mock.go | 6 ++ pkg/parser/model/ddl.go | 29 ++++++++ pkg/parser/model/ddl_test.go | 2 +- 11 files changed, 455 insertions(+), 97 deletions(-) create mode 100644 pkg/ddl/ddl_running_jobs.go create mode 100644 pkg/ddl/ddl_running_jobs_test.go diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index d802df3fa6723..287083e6d07a2 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "ddl.go", "ddl_algorithm.go", "ddl_api.go", + "ddl_running_jobs.go", "ddl_tiflash_api.go", "ddl_worker.go", "ddl_workerpool.go", @@ -199,6 +200,7 @@ go_test( "ddl_algorithm_test.go", "ddl_api_test.go", "ddl_error_test.go", + "ddl_running_jobs_test.go", "ddl_test.go", "ddl_worker_test.go", "ddl_workerpool_test.go", diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 10a4931b63011..9df97c3c208e9 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -360,13 +360,9 @@ type ddlCtx struct { *waitSchemaSyncedController *schemaVersionManager - // recording the running jobs. - runningJobs struct { - sync.RWMutex - ids map[int64]struct{} - } - // It holds the running DDL jobs ID. - runningJobIDs []string + + runningJobs *runningJobs + // reorgCtx is used for reorganization. reorgCtx reorgContexts // backfillCtx is used for backfill workers. @@ -658,7 +654,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { autoidCli: opt.AutoIDClient, schemaVersionManager: newSchemaVersionManager(), waitSchemaSyncedController: newWaitSchemaSyncedController(), - runningJobIDs: make([]string, 0, jobRecordCapacity), + runningJobs: newRunningJobs(), } ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx) ddlCtx.jobCtx.jobCtxMap = make(map[int64]*JobContext) @@ -666,7 +662,6 @@ func newDDL(ctx context.Context, options ...Option) *ddl { ddlCtx.mu.interceptor = &BaseInterceptor{} ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx) - ddlCtx.runningJobs.ids = make(map[int64]struct{}) d := &ddl{ ddlCtx: ddlCtx, diff --git a/pkg/ddl/ddl_api.go b/pkg/ddl/ddl_api.go index 10bba5e2c8770..b2f4b78febda2 100644 --- a/pkg/ddl/ddl_api.go +++ b/pkg/ddl/ddl_api.go @@ -478,7 +478,8 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place return nil } - if tb.Meta().TempTableType != model.TempTableNone { + tblInfo := tb.Meta() + if tblInfo.TempTableType != model.TempTableNone { return errors.Trace(dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("placement")) } @@ -489,9 +490,9 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place job := &model.Job{ SchemaID: schema.ID, - TableID: tb.Meta().ID, + TableID: tblInfo.ID, SchemaName: schema.Name.L, - TableName: tb.Meta().Name.L, + TableName: tblInfo.Name.L, Type: model.ActionAlterTablePlacement, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{placementPolicyRef}, @@ -654,6 +655,10 @@ func (d *ddl) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *RecoverSc Type: model.ActionRecoverSchema, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{recoverSchemaInfo, recoverCheckFlagNone}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: recoverSchemaInfo.Name.L, + Table: model.InvolvingAll, + }}, } err := d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -2823,6 +2828,11 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, return errors.Trace(fmt.Errorf("except table info")) } args = append(args, info) + jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo, + model.InvolvingSchemaInfo{ + Database: dbName.L, + Table: info.Name.L, + }) } if len(args) == 0 { return nil @@ -2887,6 +2897,11 @@ func (d *ddl) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy *mode Type: model.ActionCreatePlacementPolicy, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{policy, onExist == OnExistReplace}, + // CREATE PLACEMENT does not affect any schemas or tables. + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -2950,6 +2965,11 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error 0, /* commitTS */ variable.On, /* tidb_ttl_job_enable */ []kv.KeyRange{} /* flashback key_ranges */}, + // FLASHBACK CLUSTER affects all schemas and tables. + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingAll, + Table: model.InvolvingAll, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -3881,10 +3901,10 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6 if err != nil { return errors.Trace(err) } + tbInfo := t.Meta() var actionType model.ActionType switch tp { case autoid.AutoRandomType: - tbInfo := t.Meta() pkCol := tbInfo.GetPkColInfo() if tbInfo.AutoRandomBits == 0 || pkCol == nil { return errors.Trace(dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomRebaseNotApplicable)) @@ -3918,9 +3938,9 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6 } job := &model.Job{ SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tbInfo.ID, SchemaName: schema.Name.L, - TableName: t.Meta().Name.L, + TableName: tbInfo.Name.L, Type: actionType, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{newBase, force}, @@ -3953,14 +3973,15 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint if err != nil { return errors.Trace(err) } - if t.Meta().TempTableType != model.TempTableNone { + tbInfo := t.Meta() + if tbInfo.TempTableType != model.TempTableNone { return dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits") } - if uVal == t.Meta().ShardRowIDBits { + if uVal == tbInfo.ShardRowIDBits { // Nothing need to do. return nil } - if uVal > 0 && t.Meta().HasClusteredIndex() { + if uVal > 0 && tbInfo.HasClusteredIndex() { return dbterror.ErrUnsupportedShardRowIDBits } err = verifyNoOverflowShardBits(d.sessPool, t, uVal) @@ -3970,9 +3991,9 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint job := &model.Job{ Type: model.ActionShardRowID, SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tbInfo.ID, SchemaName: schema.Name.L, - TableName: t.Meta().Name.L, + TableName: tbInfo.Name.L, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{uVal}, } @@ -4133,6 +4154,7 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab if err != nil { return errors.Trace(err) } + tbInfo := t.Meta() if err = checkAddColumnTooManyColumns(len(t.Cols()) + 1); err != nil { return errors.Trace(err) } @@ -4144,16 +4166,16 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab if col == nil { return nil } - err = CheckAfterPositionExists(t.Meta(), spec.Position) + err = CheckAfterPositionExists(tbInfo, spec.Position) if err != nil { return errors.Trace(err) } job := &model.Job{ SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tbInfo.ID, SchemaName: schema.Name.L, - TableName: t.Meta().Name.L, + TableName: tbInfo.Name.L, Type: model.ActionAddColumn, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{col, spec.Position, 0, spec.IfNotExists}, @@ -5032,6 +5054,10 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{defID, ptSchema.ID, ptMeta.ID, partName, spec.WithValidation}, CtxVars: []interface{}{[]int64{ntSchema.ID, ptSchema.ID}, []int64{ntMeta.ID, ptMeta.ID}}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ + {Database: ptSchema.Name.L, Table: ptMeta.Name.L}, + {Database: ntSchema.Name.L, Table: ntMeta.Name.L}, + }, } err = d.DoDDLJob(ctx, job) @@ -6837,6 +6863,10 @@ func (d *ddl) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{schemas[0].ID, newIdent.Name, schemas[0].Name}, CtxVars: []interface{}{[]int64{schemas[0].ID, schemas[1].ID}, []int64{tableID}}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ + {Database: schemas[0].Name.L, Table: oldIdent.Name.L}, + {Database: schemas[1].Name.L, Table: newIdent.Name.L}, + }, } err = d.DoDDLJob(ctx, job) @@ -6852,6 +6882,7 @@ func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id newSchemaIDs := make([]int64, 0, len(oldIdents)) tableIDs := make([]int64, 0, len(oldIdents)) oldSchemaNames := make([]*model.CIStr, 0, len(oldIdents)) + involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(oldIdents)*2) var schemas []*model.DBInfo var tableID int64 @@ -6876,16 +6907,22 @@ func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id oldSchemaIDs = append(oldSchemaIDs, schemas[0].ID) newSchemaIDs = append(newSchemaIDs, schemas[1].ID) oldSchemaNames = append(oldSchemaNames, &schemas[0].Name) + involveSchemaInfo = append(involveSchemaInfo, model.InvolvingSchemaInfo{ + Database: schemas[0].Name.L, Table: oldIdents[i].Name.L, + }, model.InvolvingSchemaInfo{ + Database: schemas[1].Name.L, Table: newIdents[i].Name.L, + }) } job := &model.Job{ - SchemaID: schemas[1].ID, - TableID: tableIDs[0], - SchemaName: schemas[1].Name.L, - Type: model.ActionRenameTables, - BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames}, - CtxVars: []interface{}{append(oldSchemaIDs, newSchemaIDs...), tableIDs}, + SchemaID: schemas[1].ID, + TableID: tableIDs[0], + SchemaName: schemas[1].Name.L, + Type: model.ActionRenameTables, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames}, + CtxVars: []interface{}{append(oldSchemaIDs, newSchemaIDs...), tableIDs}, + InvolvingSchemaInfo: involveSchemaInfo, } err = d.DoDDLJob(ctx, job) @@ -7888,6 +7925,7 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error SessionID: ctx.GetSessionVars().ConnectionID, } uniqueTableID := make(map[int64]struct{}) + involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(stmt.TableLocks)) // Check whether the table was already locked by another. for _, tl := range stmt.TableLocks { tb := tl.Table @@ -7912,6 +7950,10 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error } uniqueTableID[t.Meta().ID] = struct{}{} lockTables = append(lockTables, model.TableLockTpInfo{SchemaID: schema.ID, TableID: t.Meta().ID, Tp: tl.Type}) + involveSchemaInfo = append(involveSchemaInfo, model.InvolvingSchemaInfo{ + Database: schema.Name.L, + Table: t.Meta().Name.L, + }) } unlockTables := ctx.GetAllTableLocks() @@ -7926,6 +7968,8 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error Type: model.ActionLockTable, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{arg}, + + InvolvingSchemaInfo: involveSchemaInfo, } // AddTableLock here is avoiding this job was executed successfully but the session was killed before return. ctx.AddTableLock(lockTables) @@ -8554,6 +8598,10 @@ func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceG Type: model.ActionCreateResourceGroup, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{groupInfo, false}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -8600,6 +8648,10 @@ func (d *ddl) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGr Type: model.ActionDropResourceGroup, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{groupName}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -8652,6 +8704,10 @@ func (d *ddl) AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResource Type: model.ActionAlterResourceGroup, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{newGroupInfo}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -8712,6 +8768,10 @@ func (d *ddl) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacemen Type: model.ActionDropPlacementPolicy, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{policyName}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -8746,6 +8806,10 @@ func (d *ddl) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacem Type: model.ActionAlterPlacementPolicy, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{newPolicyInfo}, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: model.InvolvingNone, + Table: model.InvolvingNone, + }}, } err = d.DoDDLJob(ctx, job) err = d.callHookOnChanged(job, err) @@ -8945,8 +9009,9 @@ func (d *ddl) CreateCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constr } job := &model.Job{ SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tblInfo.ID, SchemaName: schema.Name.L, + TableName: tblInfo.Name.L, Type: model.ActionAddCheckConstraint, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{constraintInfo}, @@ -8968,16 +9033,18 @@ func (d *ddl) DropCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrNa if err != nil { return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)) } + tblInfo := t.Meta() - constraintInfo := t.Meta().FindConstraintInfoByName(constrName.L) + constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L) if constraintInfo == nil { return dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName) } job := &model.Job{ SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tblInfo.ID, SchemaName: schema.Name.L, + TableName: tblInfo.Name.L, Type: model.ActionDropCheckConstraint, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{constrName}, @@ -8998,16 +9065,18 @@ func (d *ddl) AlterCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrN if err != nil { return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)) } + tblInfo := t.Meta() - constraintInfo := t.Meta().FindConstraintInfoByName(constrName.L) + constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L) if constraintInfo == nil { return dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName) } job := &model.Job{ SchemaID: schema.ID, - TableID: t.Meta().ID, + TableID: tblInfo.ID, SchemaName: schema.Name.L, + TableName: tblInfo.Name.L, Type: model.ActionAlterCheckConstraint, BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{constrName, enforced}, diff --git a/pkg/ddl/ddl_api_test.go b/pkg/ddl/ddl_api_test.go index e6dec271e9fac..fa1a4990d6e95 100644 --- a/pkg/ddl/ddl_api_test.go +++ b/pkg/ddl/ddl_api_test.go @@ -17,14 +17,19 @@ package ddl_test import ( "cmp" "context" + "fmt" "slices" + "sync" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/ddl/util/callback" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" ) @@ -145,3 +150,66 @@ func enQueueDDLJobs(t *testing.T, sess session.Session, txn kv.Transaction, jobT require.NoError(t, err) } } + +func TestCreateDropCreateTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + tk.MustExec("create table t (a int);") + + wg := sync.WaitGroup{} + var createErr error + var fpErr error + var createTable bool + + originHook := dom.DDL().GetHook() + onJobUpdated := func(job *model.Job) { + if job.Type == model.ActionDropTable && job.SchemaState == model.StateWriteOnly && !createTable { + fpErr = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockOwnerCheckAllVersionSlow", fmt.Sprintf("return(%d)", job.ID)) + wg.Add(1) + go func() { + _, createErr = tk1.Exec("create table t (b int);") + wg.Done() + }() + createTable = true + } + } + hook := &callback.TestDDLCallback{} + hook.OnJobUpdatedExported.Store(&onJobUpdated) + dom.DDL().SetHook(hook) + tk.MustExec("drop table t;") + dom.DDL().SetHook(originHook) + + wg.Wait() + require.NoError(t, createErr) + require.NoError(t, fpErr) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockOwnerCheckAllVersionSlow")) + + rs := tk.MustQuery("admin show ddl jobs 3;").Rows() + create1JobID := rs[0][0].(string) + dropJobID := rs[1][0].(string) + create0JobID := rs[2][0].(string) + jobRecordSet, err := tk.Exec("select job_meta from mysql.tidb_ddl_history where job_id in (?, ?, ?);", + create1JobID, dropJobID, create0JobID) + require.NoError(t, err) + + var finishTSs []uint64 + req := jobRecordSet.NewChunk(nil) + err = jobRecordSet.Next(context.Background(), req) + require.Greater(t, req.NumRows(), 0) + require.NoError(t, err) + iter := chunk.NewIterator4Chunk(req.CopyConstruct()) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + jobMeta := row.GetBytes(0) + job := model.Job{} + err = job.Decode(jobMeta) + require.NoError(t, err) + finishTSs = append(finishTSs, job.BinlogInfo.FinishedTS) + } + create1TS, dropTS, create0TS := finishTSs[0], finishTSs[1], finishTSs[2] + require.Less(t, create0TS, dropTS, "first create should finish before drop") + require.Less(t, dropTS, create1TS, "second create should finish after drop") +} diff --git a/pkg/ddl/ddl_running_jobs.go b/pkg/ddl/ddl_running_jobs.go new file mode 100644 index 0000000000000..95faa765bee6e --- /dev/null +++ b/pkg/ddl/ddl_running_jobs.go @@ -0,0 +1,113 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2013 The ql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSES/QL-LICENSE file. + +package ddl + +import ( + "strconv" + "strings" + "sync" + + "github.com/pingcap/tidb/pkg/parser/model" +) + +type runningJobs struct { + sync.RWMutex + ids map[int64]struct{} + runningSchema map[string]map[string]struct{} // database -> table -> struct{} + runningJobIDs string +} + +func newRunningJobs() *runningJobs { + return &runningJobs{ + ids: make(map[int64]struct{}), + runningSchema: make(map[string]map[string]struct{}), + } +} + +func (j *runningJobs) add(job *model.Job) { + j.Lock() + defer j.Unlock() + j.ids[job.ID] = struct{}{} + j.updateInternalRunningJobIDs() + for _, info := range job.GetInvolvingSchemaInfo() { + if _, ok := j.runningSchema[info.Database]; !ok { + j.runningSchema[info.Database] = make(map[string]struct{}) + } + j.runningSchema[info.Database][info.Table] = struct{}{} + } +} + +func (j *runningJobs) remove(job *model.Job) { + j.Lock() + defer j.Unlock() + delete(j.ids, job.ID) + j.updateInternalRunningJobIDs() + for _, info := range job.GetInvolvingSchemaInfo() { + if db, ok := j.runningSchema[info.Database]; ok { + delete(db, info.Table) + } + if len(j.runningSchema[info.Database]) == 0 { + delete(j.runningSchema, info.Database) + } + } +} + +func (j *runningJobs) checkRunnable(job *model.Job) bool { + j.RLock() + defer j.RUnlock() + for _, info := range job.GetInvolvingSchemaInfo() { + if _, ok := j.runningSchema[model.InvolvingAll]; ok { + return false + } + if info.Database == model.InvolvingNone { + continue + } + if tbls, ok := j.runningSchema[info.Database]; ok { + if _, ok := tbls[model.InvolvingAll]; ok { + return false + } + if info.Table == model.InvolvingNone { + continue + } + if _, ok := tbls[info.Table]; ok { + return false + } + } + } + return true +} + +func (j *runningJobs) allIDs() string { + j.RLock() + defer j.RUnlock() + return j.runningJobIDs +} + +func (j *runningJobs) updateInternalRunningJobIDs() { + var sb strings.Builder + i := 0 + for id := range j.ids { + sb.WriteString(strconv.Itoa(int(id))) + if i != len(j.ids)-1 { + sb.WriteString(",") + } + i++ + } + j.runningJobIDs = sb.String() +} diff --git a/pkg/ddl/ddl_running_jobs_test.go b/pkg/ddl/ddl_running_jobs_test.go new file mode 100644 index 0000000000000..7fe2f5c46a7b0 --- /dev/null +++ b/pkg/ddl/ddl_running_jobs_test.go @@ -0,0 +1,112 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2013 The ql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSES/QL-LICENSE file. + +package ddl + +import ( + "sort" + "strconv" + "strings" + "testing" + + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/stretchr/testify/require" +) + +func TestRunningJobs(t *testing.T) { + mkJob := func(id int64, schemaTableNames ...string) *model.Job { + var schemaInfos []model.InvolvingSchemaInfo + for _, schemaTableName := range schemaTableNames { + ss := strings.Split(schemaTableName, ".") + schemaInfos = append(schemaInfos, model.InvolvingSchemaInfo{ + Database: ss[0], + Table: ss[1], + }) + } + return &model.Job{ + ID: id, + InvolvingSchemaInfo: schemaInfos, + } + } + orderedAllIDs := func(ids string) string { + ss := strings.Split(ids, ",") + ssid := make([]int, len(ss)) + for i := range ss { + id, err := strconv.Atoi(ss[i]) + require.NoError(t, err) + ssid[i] = id + } + sort.Ints(ssid) + for i := range ssid { + ss[i] = strconv.Itoa(ssid[i]) + } + return strings.Join(ss, ",") + } + + j := newRunningJobs() + require.Equal(t, "", j.allIDs()) + + runnable := j.checkRunnable(mkJob(0, "db1.t1")) + require.True(t, runnable) + job1 := mkJob(1, "db1.t1", "db1.t2") + job2 := mkJob(2, "db2.t3") + j.add(job1) + j.add(job2) + require.Equal(t, "1,2", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db1.t1")) + require.False(t, runnable) + runnable = j.checkRunnable(mkJob(0, "db1.t2")) + require.False(t, runnable) + runnable = j.checkRunnable(mkJob(0, "db3.t4", "db1.t1")) + require.False(t, runnable) + runnable = j.checkRunnable(mkJob(0, "db3.t4", "db4.t5")) + require.True(t, runnable) + + job3 := mkJob(3, "db1.*") + j.add(job3) + require.Equal(t, "1,2,3", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db1.t100")) + require.False(t, runnable) + + job4 := mkJob(4, "db4.") + j.add(job4) + require.Equal(t, "1,2,3,4", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db4.t100")) + require.True(t, runnable) + + job5 := mkJob(5, "*.*") + j.add(job5) + require.Equal(t, "1,2,3,4,5", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db100.t100")) + require.False(t, runnable) + + j.remove(job5) + require.Equal(t, "1,2,3,4", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db100.t100")) + require.True(t, runnable) + + j.remove(job3) + require.Equal(t, "1,2,4", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db1.t100")) + require.True(t, runnable) + + j.remove(job1) + require.Equal(t, "2,4", orderedAllIDs(j.allIDs())) + runnable = j.checkRunnable(mkJob(0, "db1.t1")) + require.True(t, runnable) +} diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 3cf938d33c250..074506df392b5 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -440,6 +440,7 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) e } // Reduce this txn entry size. job.BinlogInfo.Clean() + job.InvolvingSchemaInfo = nil job.Error = toTError(err) job.ErrorCount++ job.SchemaState = model.StateNone diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 10c7f4d7f0bcd..2a90b31b41a98 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -60,35 +60,6 @@ func init() { } } -func (dc *ddlCtx) insertRunningDDLJobMap(id int64) { - dc.runningJobs.Lock() - defer dc.runningJobs.Unlock() - dc.runningJobs.ids[id] = struct{}{} -} - -func (dc *ddlCtx) deleteRunningDDLJobMap(id int64) { - dc.runningJobs.Lock() - defer dc.runningJobs.Unlock() - delete(dc.runningJobs.ids, id) -} - -func (dc *ddlCtx) excludeJobIDs() string { - dc.runningJobs.RLock() - defer dc.runningJobs.RUnlock() - if len(dc.runningJobs.ids) == 0 { - return "" - } - dc.runningJobIDs = dc.runningJobIDs[:0] - for id := range dc.runningJobs.ids { - dc.runningJobIDs = append(dc.runningJobIDs, strconv.Itoa(int(id))) - } - return fmt.Sprintf("and job_id not in (%s)", strings.Join(dc.runningJobIDs, ",")) -} - -const ( - getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) and %s reorg %s order by processing desc, job_id" -) - type jobType int func (t jobType) String() string { @@ -113,7 +84,14 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool not = "" label = "get_job_reorg" } - sql := fmt.Sprintf(getJobSQL, not, d.excludeJobIDs()) + const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in + (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing) + and %s reorg %s order by processing desc, job_id` + var excludedJobIDs string + if ids := d.runningJobs.allIDs(); len(ids) > 0 { + excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids) + } + sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs) rows, err := se.Execute(context.Background(), sql, label) if err != nil { return nil, errors.Trace(err) @@ -160,10 +138,8 @@ func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool } func hasSysDB(job *model.Job) bool { - sNames := job2SchemaNames(job) - // TODO: Handle for the name is empty, like ActionCreatePlacementPolicy. - for _, name := range sNames { - if tidb_util.IsSysDB(name) { + for _, info := range job.GetInvolvingSchemaInfo() { + if tidb_util.IsSysDB(info.Database) { return true } } @@ -225,26 +201,29 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) { return d.getJob(sess, general, func(job *model.Job) (bool, error) { + if !d.runningJobs.checkRunnable(job) { + return false, nil + } if job.Type == model.ActionDropSchema { // Check if there is any reorg job on this schema. sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing limit 1", strconv.Quote(strconv.FormatInt(job.SchemaID, 10))) - return d.NoConflictJob(sess, sql) + rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + return len(rows) == 0, err } // Check if there is any running job works on the same table. sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+ "(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+ "or (type = %d and processing)", job.ID, model.ActionFlashbackCluster) - return d.NoConflictJob(sess, sql) + rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + return len(rows) == 0, err }) } -func (*ddl) NoConflictJob(se *sess.Session, sql string) (bool, error) { - rows, err := se.Execute(context.Background(), sql, "check conflict jobs") - return len(rows) == 0, err -} - func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) { return d.getJob(sess, reorg, func(job *model.Job) (bool, error) { + if !d.runningJobs.checkRunnable(job) { + return false, nil + } if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && job.ReorgMeta != nil && job.ReorgMeta.IsFastReorg && @@ -261,7 +240,8 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) { "or (CONCAT(',', table_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and processing) "+ "or (type = %d and processing) limit 1", strconv.Quote(strconv.FormatInt(job.SchemaID, 10)), model.ActionDropSchema, strconv.Quote(strconv.FormatInt(job.TableID, 10)), model.ActionFlashbackCluster) - return d.NoConflictJob(sess, sql) + rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") + return len(rows) == 0, err }) } @@ -380,11 +360,11 @@ func (d *ddl) loadDDLJobAndRun(se *sess.Session, pool *workerPool, getJob func(* // delivery2worker owns the worker, need to put it back to the pool in this function. func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { injectFailPointForGetJob(job) - d.insertRunningDDLJobMap(job.ID) + d.runningJobs.add(job) d.wg.Run(func() { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() defer func() { - d.deleteRunningDDLJobMap(job.ID) + d.runningJobs.remove(job) asyncNotify(d.ddlJobCh) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() }() @@ -560,23 +540,6 @@ func job2UniqueIDs(job *model.Job, schema bool) string { return strconv.FormatInt(job.TableID, 10) } -func job2SchemaNames(job *model.Job) []string { - if job.Type == model.ActionRenameTable { - var oldSchemaID int64 - var oldSchemaName model.CIStr - var tableName model.CIStr - // TODO: Handle this error - _ = job.DecodeArgs(&oldSchemaID, &tableName, &oldSchemaName) - names := make([]string, 0, 2) - names = append(names, strings.ToLower(job.SchemaName)) - names = append(names, oldSchemaName.O) - return names - } - // TODO: consider about model.ActionRenameTables and model.ActionExchangeTablePartition, which need to get the schema names. - - return []string{job.SchemaName} -} - func (w *worker) deleteDDLJob(job *model.Job) error { sql := fmt.Sprintf("delete from mysql.tidb_ddl_job where job_id = %d", job.ID) _, err := w.sess.Execute(context.Background(), sql, "delete_job") diff --git a/pkg/ddl/mock.go b/pkg/ddl/mock.go index 95071ad080133..edfe2abaa7eba 100644 --- a/pkg/ddl/mock.go +++ b/pkg/ddl/mock.go @@ -115,6 +115,12 @@ func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, jobID int6 ticker := time.NewTicker(mockCheckVersInterval) defer ticker.Stop() + failpoint.Inject("mockOwnerCheckAllVersionSlow", func(val failpoint.Value) { + if v, ok := val.(int); ok && v == int(jobID) { + time.Sleep(2 * time.Second) + } + }) + for { select { case <-ctx.Done(): diff --git a/pkg/parser/model/ddl.go b/pkg/parser/model/ddl.go index f0aa1af73e1c8..86a471d6fda78 100644 --- a/pkg/parser/model/ddl.go +++ b/pkg/parser/model/ddl.go @@ -442,6 +442,11 @@ type Job struct { // Collate is the collation the DDL Job is created. Collate string `json:"collate"` + // InvolvingSchemaInfo indicates the schema info involved in the job. + // nil means fallback to use job.SchemaName/TableName. + // Keep unchanged after initialization. + InvolvingSchemaInfo []InvolvingSchemaInfo `json:"involving_schema_info,omitempty"` + // AdminOperator indicates where the Admin command comes, by the TiDB // itself (AdminCommandBySystem) or by user (AdminCommandByEndUser). AdminOperator AdminCommandOperator `json:"admin_operator"` @@ -450,6 +455,20 @@ type Job struct { TraceInfo *TraceInfo `json:"trace_info"` } +// InvolvingSchemaInfo returns the schema info involved in the job. +// The value should be stored in lower case. +type InvolvingSchemaInfo struct { + Database string `json:"database"` + Table string `json:"table"` +} + +const ( + // InvolvingAll means all schemas/tables are affected. + InvolvingAll = "*" + // InvolvingNone means no schema/table is affected. + InvolvingNone = "" +) + // FinishTableJob is called when a job is finished. // It updates the job's state information and adds tblInfo to the binlog. func (job *Job) FinishTableJob(jobState JobState, schemaState SchemaState, ver int64, tblInfo *TableInfo) { @@ -866,6 +885,16 @@ func (job *Job) IsRollbackable() bool { return true } +// GetInvolvingSchemaInfo returns the schema info involved in the job. +func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo { + if len(job.InvolvingSchemaInfo) > 0 { + return job.InvolvingSchemaInfo + } + return []InvolvingSchemaInfo{ + {Database: job.SchemaName, Table: job.TableName}, + } +} + // JobState is for job state. type JobState int32 diff --git a/pkg/parser/model/ddl_test.go b/pkg/parser/model/ddl_test.go index 66550b407c578..a3fd1c4f56657 100644 --- a/pkg/parser/model/ddl_test.go +++ b/pkg/parser/model/ddl_test.go @@ -50,7 +50,7 @@ func TestJobSize(t *testing.T) { - SubJob.ToProxyJob() ` job := model.Job{} - require.Equal(t, 336, int(unsafe.Sizeof(job)), msg) + require.Equal(t, 360, int(unsafe.Sizeof(job)), msg) } func TestBackfillMetaCodec(t *testing.T) {