From 617c1e61af74cd9d1d782eee17ea9bd40d9f1acf Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 10 Sep 2024 14:20:35 +0800 Subject: [PATCH] ddl: args v2 for create/drop/modify schema (#55919) ref pingcap/tidb#53930 --- .../restore/ingestrec/ingest_recorder_test.go | 1 + br/pkg/stream/rewrite_meta_rawkv_test.go | 49 ++-- pkg/ddl/delete_range.go | 7 +- pkg/ddl/executor.go | 22 +- pkg/ddl/foreign_key.go | 4 +- pkg/ddl/index_test.go | 6 +- pkg/ddl/job_scheduler.go | 1 + pkg/ddl/job_submitter.go | 11 +- pkg/ddl/job_submitter_test.go | 17 +- pkg/ddl/restart_test.go | 3 +- pkg/ddl/sanity_check.go | 8 +- pkg/ddl/schema.go | 30 +-- pkg/ddl/schema_test.go | 19 +- pkg/ddl/schema_version.go | 2 +- pkg/ddl/stat_test.go | 32 --- pkg/ddl/table.go | 16 +- pkg/ddl/table_test.go | 4 +- pkg/ddl/tests/partition/db_partition_test.go | 4 +- pkg/meta/model/BUILD.bazel | 3 +- pkg/meta/model/job.go | 30 ++- pkg/meta/model/job_args.go | 210 +++++++++++++++--- pkg/meta/model/job_args_test.go | 128 ++++++++--- pkg/meta/model/job_test.go | 14 +- pkg/store/gcworker/gc_worker.go | 12 +- 24 files changed, 437 insertions(+), 196 deletions(-) diff --git a/br/pkg/restore/ingestrec/ingest_recorder_test.go b/br/pkg/restore/ingestrec/ingest_recorder_test.go index 8cefa23108140..2dca9212c97bd 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder_test.go +++ b/br/pkg/restore/ingestrec/ingest_recorder_test.go @@ -39,6 +39,7 @@ const ( func fakeJob(reorgTp model.ReorgType, jobTp model.ActionType, state model.JobState, rowCnt int64, indices []*model.IndexInfo, rawArgs json.RawMessage) *model.Job { return &model.Job{ + Version: model.JobVersion1, SchemaName: SchemaName, TableName: TableName, TableID: TableID, diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index 0c904ab18d489..0a756df2bf1f7 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -669,24 +669,25 @@ var ( ) var ( - dropSchemaJob = &model.Job{Type: model.ActionDropSchema, SchemaID: mDDLJobDBOldID, RawArgs: json.RawMessage(`[[71,72,73,74,75]]`)} - dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)} - dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)} - dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} - reorganizeTable0Partition1Job = &model.Job{Type: model.ActionReorganizePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} - removeTable0Partition1Job = &model.Job{Type: model.ActionRemovePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} - alterTable0Partition1Job = &model.Job{Type: model.ActionAlterTablePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} - rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} - rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} - addTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} - addTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} - dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} - dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} - dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} - dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} - modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} - modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} + dropSchemaJob *model.Job + dropTable0Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)} + dropTable1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)} + dropTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + reorganizeTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionReorganizePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + removeTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionRemovePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + alterTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionAlterTablePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + rollBackTable0IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} + rollBackTable1IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} + addTable0IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} + addTable1IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} + dropTable0IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} + dropTable1IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} + dropTable0ColumnJob = &model.Job{Version: model.JobVersion1, Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} + dropTable1ColumnJob = &model.Job{Version: model.JobVersion1, Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} + modifyTable0ColumnJob = &model.Job{Version: model.JobVersion1, Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} + modifyTable1ColumnJob = &model.Job{Version: model.JobVersion1, Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} multiSchemaChangeJob0 = &model.Job{ + Version: model.JobVersion1, Type: model.ActionMultiSchemaChange, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, @@ -704,6 +705,7 @@ var ( }, } multiSchemaChangeJob1 = &model.Job{ + Version: model.JobVersion1, Type: model.ActionMultiSchemaChange, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, @@ -722,6 +724,19 @@ var ( } ) +func genFinishedJob(job *model.Job, args model.FinishedJobArgs) *model.Job { + job.FillFinishedArgs(args) + bytes, _ := job.Encode(true) + resJob := &model.Job{} + _ = resJob.Decode(bytes) + return resJob +} + +func init() { + dropSchemaJob = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionDropSchema, + SchemaID: mDDLJobDBOldID}, &model.DropSchemaArgs{AllDroppedTableIDs: []int64{71, 72, 73, 74, 75}}) +} + type mockInsertDeleteRange struct { queryCh chan *PreDelRangeQuery } diff --git a/pkg/ddl/delete_range.go b/pkg/ddl/delete_range.go index a5b8df58ddb0e..8f9e103fe0152 100644 --- a/pkg/ddl/delete_range.go +++ b/pkg/ddl/delete_range.go @@ -283,10 +283,11 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap ctx = kv.WithInternalSourceType(ctx, getDDLRequestSource(job.Type)) switch job.Type { case model.ActionDropSchema: - var tableIDs []int64 - if err := job.DecodeArgs(&tableIDs); err != nil { + args, err := model.GetFinishedDropSchemaArgs(job) + if err != nil { return errors.Trace(err) } + tableIDs := args.AllDroppedTableIDs for i := 0; i < len(tableIDs); i += batchInsertDeleteRangeSize { batchEnd := len(tableIDs) if batchEnd > i+batchInsertDeleteRangeSize { @@ -315,7 +316,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID")) case model.ActionTruncateTable: tableID := job.TableID - args, err := model.GetTruncateTableArgsAfterRun(job) + args, err := model.GetFinishedTruncateTableArgs(job) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index f2322fa75dd87..222f26339116d 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -301,10 +301,10 @@ func (e *executor) CreateSchemaWithInfo( } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaName: dbInfo.Name.L, Type: model.ActionCreateSchema, BinlogInfo: &model.HistoryInfo{}, - Args: []any{dbInfo}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Database: dbInfo.Name.L, @@ -312,6 +312,9 @@ func (e *executor) CreateSchemaWithInfo( }}, SQLMode: ctx.GetSessionVars().SQLMode, } + job.FillArgs(&model.CreateSchemaArgs{ + DBInfo: dbInfo, + }) if ref := dbInfo.PlacementPolicyRef; ref != nil { job.InvolvingSchemaInfo = append(job.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ Policy: ref.Name.L, @@ -348,11 +351,11 @@ func (e *executor) ModifySchemaCharsetAndCollate(ctx sessionctx.Context, stmt *a } // Do the DDL job. job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, Type: model.ActionModifySchemaCharsetAndCollate, BinlogInfo: &model.HistoryInfo{}, - Args: []any{toCharset, toCollate}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Database: dbInfo.Name.L, @@ -360,6 +363,10 @@ func (e *executor) ModifySchemaCharsetAndCollate(ctx sessionctx.Context, stmt *a }}, SQLMode: ctx.GetSessionVars().SQLMode, } + job.FillArgs(&model.ModifySchemaArgs{ + ToCharset: toCharset, + ToCollate: toCollate, + }) err = e.DoDDLJob(ctx, job) return errors.Trace(err) } @@ -383,11 +390,11 @@ func (e *executor) ModifySchemaDefaultPlacement(ctx sessionctx.Context, stmt *as // Do the DDL job. job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, Type: model.ActionModifySchemaDefaultPlacement, BinlogInfo: &model.HistoryInfo{}, - Args: []any{placementPolicyRef}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Database: dbInfo.Name.L, @@ -395,6 +402,8 @@ func (e *executor) ModifySchemaDefaultPlacement(ctx sessionctx.Context, stmt *as }}, SQLMode: ctx.GetSessionVars().SQLMode, } + job.FillArgs(&model.ModifySchemaArgs{PolicyRef: placementPolicyRef}) + if placementPolicyRef != nil { job.InvolvingSchemaInfo = append(job.InvolvingSchemaInfo, model.InvolvingSchemaInfo{ Policy: placementPolicyRef.Name.L, @@ -741,12 +750,12 @@ func (e *executor) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt return err } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: old.ID, SchemaName: old.Name.L, SchemaState: old.State, Type: model.ActionDropSchema, BinlogInfo: &model.HistoryInfo{}, - Args: []any{fkCheck}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Database: old.Name.L, @@ -754,6 +763,9 @@ func (e *executor) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt }}, SQLMode: ctx.GetSessionVars().SQLMode, } + job.FillArgs(&model.DropSchemaArgs{ + FKCheck: fkCheck, + }) err = e.DoDDLJob(ctx, job) if err != nil { @@ -6429,7 +6441,7 @@ func getTruncateTableNewTableID(job *model.Job) int64 { if job.Version == model.JobVersion1 { return job.Args[0].(int64) } - return job.ArgsV2.(*model.TruncateTableArgs).NewTableID + return job.Args[0].(*model.TruncateTableArgs).NewTableID } // HandleLockTablesOnSuccessSubmit handles the table lock for the job which is submitted diff --git a/pkg/ddl/foreign_key.go b/pkg/ddl/foreign_key.go index 65a5f01c3ae24..244919a5c7496 100644 --- a/pkg/ddl/foreign_key.go +++ b/pkg/ddl/foreign_key.go @@ -610,12 +610,12 @@ func checkDatabaseHasForeignKeyReferredInOwner(jobCtx *jobContext, job *model.Jo if !variable.EnableForeignKey.Load() { return nil } - var fkCheck bool - err := job.DecodeArgs(&fkCheck) + args, err := model.GetDropSchemaArgs(job) if err != nil { job.State = model.JobStateCancelled return errors.Trace(err) } + fkCheck := args.FKCheck if !fkCheck { return nil } diff --git a/pkg/ddl/index_test.go b/pkg/ddl/index_test.go index 6c5386015c690..eaf0b6850edaa 100644 --- a/pkg/ddl/index_test.go +++ b/pkg/ddl/index_test.go @@ -137,7 +137,11 @@ false]`), } for _, c := range cases { - job := &model.Job{RawArgs: c.raw} + job := &model.Job{ + Version: model.JobVersion1, + Type: model.ActionAddIndex, + RawArgs: c.raw, + } uniques, indexNames, specs, indexOptions, hiddenCols, err := decodeAddIndexArgs(job) require.NoError(t, err) require.Equal(t, c.uniques, uniques) diff --git a/pkg/ddl/job_scheduler.go b/pkg/ddl/job_scheduler.go index d30e9ef0732f7..ba66288fc83fa 100644 --- a/pkg/ddl/job_scheduler.go +++ b/pkg/ddl/job_scheduler.go @@ -399,6 +399,7 @@ func (s *jobScheduler) loadAndDeliverJobs(se *sess.Session) error { if err != nil { return errors.Trace(err) } + intest.Assert(job.Version > 0, "job version should be greater than 0") involving := job.GetInvolvingSchemaInfo() if targetPool.available() == 0 { diff --git a/pkg/ddl/job_submitter.go b/pkg/ddl/job_submitter.go index 060d2ee3e5603..b824ef9059ee4 100644 --- a/pkg/ddl/job_submitter.go +++ b/pkg/ddl/job_submitter.go @@ -523,7 +523,7 @@ func getRequiredGIDCount(jobWs []*JobWrapper) int { partCount := jobW.Args[3].(int) count += 1 + partCount } else { - count += 1 + len(jobW.ArgsV2.(*model.TruncateTableArgs).OldPartitionIDs) + count += 1 + len(jobW.Args[0].(*model.TruncateTableArgs).OldPartitionIDs) } } } @@ -550,7 +550,12 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) { } } case model.ActionCreateSchema: - dbInfo := jobW.Args[0].(*model.DBInfo) + var dbInfo *model.DBInfo + if jobW.Version == model.JobVersion1 { + dbInfo = jobW.Args[0].(*model.DBInfo) + } else { + dbInfo = jobW.Args[0].(*model.CreateSchemaArgs).DBInfo + } if !jobW.IDAllocated { dbInfo.ID = alloc.next() } @@ -603,7 +608,7 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) { } jobW.Args[2] = partIDs } else { - args := jobW.ArgsV2.(*model.TruncateTableArgs) + args := jobW.Args[0].(*model.TruncateTableArgs) args.NewTableID = alloc.next() partIDs := make([]int64, len(args.OldPartitionIDs)) for i := range partIDs { diff --git a/pkg/ddl/job_submitter_test.go b/pkg/ddl/job_submitter_test.go index 5342ae6d1e4eb..cdcddba8c68a7 100644 --- a/pkg/ddl/job_submitter_test.go +++ b/pkg/ddl/job_submitter_test.go @@ -156,11 +156,12 @@ func TestCombinedIDAllocation(t *testing.T) { genCreateDBJob := func() *model.Job { info := &model.DBInfo{} - return &model.Job{ - Version: model.JobVersion1, + j := &model.Job{ + Version: model.GetJobVerInUse(), Type: model.ActionCreateSchema, - Args: []any{info}, } + j.FillArgs(&model.CreateSchemaArgs{DBInfo: info}) + return j } genRGroupJob := func() *model.Job { @@ -413,10 +414,10 @@ func TestCombinedIDAllocation(t *testing.T) { } case model.ActionCreateSchema: require.Greater(t, j.SchemaID, initialGlobalID) - info := &model.DBInfo{} - require.NoError(t, j.DecodeArgs(info)) - uniqueIDs[info.ID] = struct{}{} - require.Equal(t, j.SchemaID, info.ID) + args, err := model.GetCreateSchemaArgs(j) + require.NoError(t, err) + uniqueIDs[args.DBInfo.ID] = struct{}{} + require.Equal(t, j.SchemaID, args.DBInfo.ID) case model.ActionCreateResourceGroup: info := &model.ResourceGroupInfo{} require.NoError(t, j.DecodeArgs(info)) @@ -449,7 +450,7 @@ func TestCombinedIDAllocation(t *testing.T) { checkPartitionInfo(info) checkID(info.NewTableID) case model.ActionTruncateTable: - args, err := model.GetTruncateTableArgsBeforeRun(j) + args, err := model.GetTruncateTableArgs(j) require.NoError(t, err) checkID(args.NewTableID) for _, id := range args.NewPartitionIDs { diff --git a/pkg/ddl/restart_test.go b/pkg/ddl/restart_test.go index 9edc884ff992f..47443ce57e35a 100644 --- a/pkg/ddl/restart_test.go +++ b/pkg/ddl/restart_test.go @@ -114,12 +114,13 @@ func TestSchemaResume(t *testing.T) { dbInfo, err := testSchemaInfo(store, "test_restart") require.NoError(t, err) job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, SchemaName: dbInfo.Name.L, Type: model.ActionCreateSchema, BinlogInfo: &model.HistoryInfo{}, - Args: []any{dbInfo}, } + job.FillArgs(&model.CreateSchemaArgs{DBInfo: dbInfo}) testRunInterruptedJob(t, store, dom, job) testCheckSchemaState(t, store, dbInfo, model.StatePublic) diff --git a/pkg/ddl/sanity_check.go b/pkg/ddl/sanity_check.go index 2fa182dd1a746..cfe4ea197a9dd 100644 --- a/pkg/ddl/sanity_check.go +++ b/pkg/ddl/sanity_check.go @@ -86,11 +86,11 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) { } switch job.Type { case model.ActionDropSchema: - var tableIDs []int64 - if err := job.DecodeArgs(&tableIDs); err != nil { + args, err := model.GetFinishedDropSchemaArgs(job) + if err != nil { return 0, errors.Trace(err) } - return len(tableIDs), nil + return len(args.AllDroppedTableIDs), nil case model.ActionDropTable: var startKey kv.Key var physicalTableIDs []int64 @@ -100,7 +100,7 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) { } return len(physicalTableIDs) + 1, nil case model.ActionTruncateTable: - args, err := model.GetTruncateTableArgsAfterRun(job) + args, err := model.GetFinishedTruncateTableArgs(job) if err != nil { return 0, errors.Trace(err) } diff --git a/pkg/ddl/schema.go b/pkg/ddl/schema.go index 629d2013cc481..d5f5001db55f1 100644 --- a/pkg/ddl/schema.go +++ b/pkg/ddl/schema.go @@ -29,17 +29,17 @@ import ( func onCreateSchema(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID - dbInfo := &model.DBInfo{} - if err := job.DecodeArgs(dbInfo); err != nil { + args, err := model.GetCreateSchemaArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } - + dbInfo := args.DBInfo dbInfo.ID = schemaID dbInfo.State = model.StateNone - err := checkSchemaNotExists(jobCtx.infoCache, schemaID, dbInfo) + err = checkSchemaNotExists(jobCtx.infoCache, schemaID, dbInfo) if err != nil { if infoschema.ErrDatabaseExists.Equal(err) { // The database already exists, can't create it, we should cancel this job now. @@ -86,8 +86,8 @@ func checkSchemaNotExists(infoCache *infoschema.InfoCache, schemaID int64, dbInf } func onModifySchemaCharsetAndCollate(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { - var toCharset, toCollate string - if err := job.DecodeArgs(&toCharset, &toCollate); err != nil { + args, err := model.GetModifySchemaArgs(job) + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -97,13 +97,13 @@ func onModifySchemaCharsetAndCollate(jobCtx *jobContext, t *meta.Meta, job *mode return ver, errors.Trace(err) } - if dbInfo.Charset == toCharset && dbInfo.Collate == toCollate { + if dbInfo.Charset == args.ToCharset && dbInfo.Collate == args.ToCollate { job.FinishDBJob(model.JobStateDone, model.StatePublic, ver, dbInfo) return ver, nil } - dbInfo.Charset = toCharset - dbInfo.Collate = toCollate + dbInfo.Charset = args.ToCharset + dbInfo.Collate = args.ToCollate if err = t.UpdateDatabase(dbInfo); err != nil { return ver, errors.Trace(err) @@ -116,12 +116,13 @@ func onModifySchemaCharsetAndCollate(jobCtx *jobContext, t *meta.Meta, job *mode } func onModifySchemaDefaultPlacement(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { - var placementPolicyRef *model.PolicyRefInfo - if err := job.DecodeArgs(&placementPolicyRef); err != nil { + args, err := model.GetModifySchemaArgs(job) + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + placementPolicyRef := args.PolicyRef dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job) if err != nil { return ver, errors.Trace(err) @@ -210,14 +211,15 @@ func onDropSchema(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, if err != nil { return ver, errors.Trace(err) } + // we only drop meta key of database, but not drop tables' meta keys. if err = t.DropDatabase(dbInfo.ID); err != nil { break } // Finish this job. - if len(tables) > 0 { - job.Args = append(job.Args, getIDs(tables)) - } + job.FillFinishedArgs(&model.DropSchemaArgs{ + AllDroppedTableIDs: getIDs(tables), + }) job.FinishDBJob(model.JobStateDone, model.StateNone, ver, dbInfo) default: // We can't enter here. diff --git a/pkg/ddl/schema_test.go b/pkg/ddl/schema_test.go index 29806bea8bd74..46705075b4842 100644 --- a/pkg/ddl/schema_test.go +++ b/pkg/ddl/schema_test.go @@ -136,15 +136,16 @@ func testSchemaInfo(store kv.Storage, name string) (*model.DBInfo, error) { func testCreateSchema(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTest, dbInfo *model.DBInfo) *model.Job { job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, Type: model.ActionCreateSchema, BinlogInfo: &model.HistoryInfo{}, - Args: []any{dbInfo}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Database: dbInfo.Name.L, Table: model.InvolvingAll, }}, } + job.FillArgs(&model.CreateSchemaArgs{DBInfo: dbInfo}) ctx.SetValue(sessionctx.QueryString, "skip") require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) @@ -156,16 +157,18 @@ func testCreateSchema(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTes } func buildDropSchemaJob(dbInfo *model.DBInfo) *model.Job { - return &model.Job{ + j := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: dbInfo.ID, Type: model.ActionDropSchema, BinlogInfo: &model.HistoryInfo{}, - Args: []any{true}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Database: dbInfo.Name.L, Table: model.InvolvingAll, }}, } + j.FillArgs(&model.DropSchemaArgs{FKCheck: true}) + return j } func testDropSchema(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTest, dbInfo *model.DBInfo) (*model.Job, int64) { @@ -270,6 +273,7 @@ func TestSchema(t *testing.T) { // Drop a non-existent database. job = &model.Job{ + Version: model.JobVersion1, SchemaID: dbInfo.ID, SchemaName: "test_schema", Type: model.ActionDropSchema, @@ -333,7 +337,9 @@ func TestSchemaWaitJob(t *testing.T) { require.NoError(t, err) schemaID := genIDs[0] doDDLJobErr(t, schemaID, 0, "test_schema", "", model.ActionCreateSchema, - []any{dbInfo}, testkit.NewTestKit(t, store).Session(), det2, store) + testkit.NewTestKit(t, store).Session(), det2, store, func(job *model.Job) { + job.FillArgs(&model.CreateSchemaArgs{DBInfo: dbInfo}) + }) } func doDDLJobErr( @@ -341,20 +347,21 @@ func doDDLJobErr( schemaID, tableID int64, schemaName, tableName string, tp model.ActionType, - args []any, ctx sessionctx.Context, d ddl.ExecutorForTest, store kv.Storage, + handler func(job *model.Job), ) *model.Job { job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: schemaID, SchemaName: schemaName, TableID: tableID, TableName: tableName, Type: tp, - Args: args, BinlogInfo: &model.HistoryInfo{}, } + handler(job) // TODO: check error detail ctx.SetValue(sessionctx.QueryString, "skip") require.Error(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true))) diff --git a/pkg/ddl/schema_version.go b/pkg/ddl/schema_version.go index fa603ddf51486..3c3cebb2783f8 100644 --- a/pkg/ddl/schema_version.go +++ b/pkg/ddl/schema_version.go @@ -51,7 +51,7 @@ func SetSchemaDiffForCreateTables(diff *model.SchemaDiff, job *model.Job) error // SetSchemaDiffForTruncateTable set SchemaDiff for ActionTruncateTable. func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job) error { // Truncate table has two table ID, should be handled differently. - args, err := model.GetTruncateTableArgsBeforeRun(job) + args, err := model.GetTruncateTableArgs(job) if err != nil { return errors.Trace(err) } diff --git a/pkg/ddl/stat_test.go b/pkg/ddl/stat_test.go index 45d490e2f1954..413d4e533ef14 100644 --- a/pkg/ddl/stat_test.go +++ b/pkg/ddl/stat_test.go @@ -16,25 +16,18 @@ package ddl_test import ( "context" - "encoding/json" "fmt" "strconv" "testing" - "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser/ast" - pmodel "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/parser/terror" sessiontypes "github.com/pingcap/tidb/pkg/session/types" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" - "github.com/pingcap/tidb/pkg/types" "github.com/stretchr/testify/require" ) @@ -56,10 +49,6 @@ func TestGetDDLInfo(t *testing.T) { SchemaID: dbInfo2.ID, Type: model.ActionCreateSchema, RowCount: 0, - - // although RawArgsV2 is not used in this test, it should be set for compare, - // as json.Unmarshal will set RawArgsV2 to "null". - RawArgsV2: json.RawMessage("null"), } job1 := &model.Job{ Version: model.JobVersion1, @@ -67,9 +56,6 @@ func TestGetDDLInfo(t *testing.T) { SchemaID: dbInfo2.ID, Type: model.ActionAddIndex, RowCount: 0, - - // same as above - RawArgsV2: json.RawMessage("null"), } err = addDDLJobs(sess, txn, job) @@ -105,24 +91,6 @@ func addDDLJobs(sess sessiontypes.Session, txn kv.Transaction, job *model.Job) e return err } -func buildCreateIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job { - return &model.Job{ - SchemaID: dbInfo.ID, - TableID: tblInfo.ID, - Type: model.ActionAddIndex, - BinlogInfo: &model.HistoryInfo{}, - Args: []any{unique, pmodel.NewCIStr(indexName), - []*ast.IndexPartSpecification{{ - Column: &ast.ColumnName{Name: pmodel.NewCIStr(colName)}, - Length: types.UnspecifiedLength}}}, - ReorgMeta: &model.DDLReorgMeta{ // Add index job must have this field. - SQLMode: mysql.SQLMode(0), - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - }, - } -} - func TestIssue42268(t *testing.T) { // issue 42268 missing table name in 'admin show ddl' result during drop table store := testkit.CreateMockStore(t) diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 5223a8cfd9d4b..ed53ce502f9e4 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -426,8 +426,7 @@ func getTableInfo(t *meta.Meta, tableID, schemaID int64) (*model.TableInfo, erro // A background job will be created to delete old data. func (w *worker) onTruncateTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID - tableID := job.TableID - args, err := model.GetTruncateTableArgsBeforeRun(job) + args, err := model.GetTruncateTableArgs(job) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -578,15 +577,10 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, t *meta.Meta, job *model.Jo SchemaChangeEvent: util.NewTruncateTableEvent(tblInfo, oldTblInfo), } asyncNotifyEvent(jobCtx, truncateTableEvent, job) - if job.Version == model.JobVersion1 { - startKey := tablecodec.EncodeTablePrefix(tableID) - job.Args = []any{startKey, oldPartitionIDs} - } else { - // see truncateTableByReassignPartitionIDs for why they might change. - args.OldPartitionIDs = oldPartitionIDs - args.NewPartitionIDs = newPartitionIDs - job.ArgsV2 = args - } + // see truncateTableByReassignPartitionIDs for why they might change. + args.OldPartitionIDs = oldPartitionIDs + args.NewPartitionIDs = newPartitionIDs + job.FillFinishedArgs(args) return ver, nil } diff --git a/pkg/ddl/table_test.go b/pkg/ddl/table_test.go index c938138259ac6..cd60b7636c2da 100644 --- a/pkg/ddl/table_test.go +++ b/pkg/ddl/table_test.go @@ -219,7 +219,9 @@ func TestTable(t *testing.T) { newTblInfo, err := testTableInfo(store, "t", 3) require.NoError(t, err) doDDLJobErr(t, dbInfo.ID, newTblInfo.ID, dbInfo.Name.L, newTblInfo.Name.L, model.ActionCreateTable, - []any{newTblInfo}, ctx, de, store) + ctx, de, store, func(job *model.Job) { + job.Args = []any{newTblInfo} + }) ctx = testkit.NewTestKit(t, store).Session() txn, err := newTxn(ctx) diff --git a/pkg/ddl/tests/partition/db_partition_test.go b/pkg/ddl/tests/partition/db_partition_test.go index 20646e1da046a..a9e554a1ec007 100644 --- a/pkg/ddl/tests/partition/db_partition_test.go +++ b/pkg/ddl/tests/partition/db_partition_test.go @@ -2549,11 +2549,11 @@ func TestDropSchemaWithPartitionTable(t *testing.T) { require.Equal(t, "drop schema", row.GetString(3)) jobID := row.GetInt64(0) - var tableIDs []int64 historyJob, err := ddl.GetHistoryJobByID(tk.Session(), jobID) require.NoError(t, err) - err = historyJob.DecodeArgs(&tableIDs) + args, err := model.GetFinishedDropSchemaArgs(historyJob) require.NoError(t, err) + tableIDs := args.AllDroppedTableIDs // There is 2 partitions. require.Equal(t, 3, len(tableIDs)) diff --git a/pkg/meta/model/BUILD.bazel b/pkg/meta/model/BUILD.bazel index 829b184257af6..3a6b991dd107e 100644 --- a/pkg/meta/model/BUILD.bazel +++ b/pkg/meta/model/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/parser/mysql", "//pkg/parser/terror", "//pkg/parser/types", + "//pkg/util/intest", "@com_github_pingcap_errors//:errors", ], ) @@ -43,7 +44,7 @@ go_test( ], embed = [":model"], flaky = True, - shard_count = 25, + shard_count = 28, deps = [ "//pkg/parser/charset", "//pkg/parser/model", diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 5778e27d9bc5a..04cb601fb9a8c 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/util/intest" ) // ActionType is the type for DDL action. @@ -301,7 +302,6 @@ type Job struct { RowCount int64 `json:"row_count"` Mu sync.Mutex `json:"-"` - // below fields are used in JobVersion1 // CtxVars are variables attached to the job. It is for internal usage. // E.g. passing arguments between functions by one single *Job pointer. // for ExchangeTablePartition, RenameTables, RenameTable, it's [slice-of-db-id, slice-of-table-id] @@ -312,17 +312,13 @@ type Job struct { // - TruncateTable: [new-table-id, foreignKeyCheck, ... // - RenameTable: [old-db-id, new-table-name, old-db-name] // - ExchangeTablePartition: [partition-id, pt-db-id, pt-id, partition-name, with-validation] + // when Version is JobVersion2, Args contains a single element of type JobArgs. + // TODO make it private after we finish the migration to JobVersion2. Args []any `json:"-"` - // RawArgs : We must use json raw message to delay parsing special args. + // we use json raw message to delay parsing special args. + // the args are cleared out unless Job.FillFinishedArgs is called. RawArgs json.RawMessage `json:"raw_args"` - // below fields are used in JobVersion2 - // ArgsV2 is a pointer to a typed XXXArgs struct specific to the job type. - // see structs inside job_args.go. - ArgsV2 JobArgs `json:"-"` - // RawArgsV2 stores the raw json of ArgsV2. - RawArgsV2 json.RawMessage `json:"raw_args_v2"` - SchemaState SchemaState `json:"schema_state"` // SnapshotVer means snapshot version for this job. SnapshotVer uint64 `json:"snapshot_ver"` @@ -484,11 +480,16 @@ func (job *Job) GetWarnings() (map[errors.ErrorID]*terror.Error, map[errors.Erro return w, wc } -// FillArgs fills job args. +// FillArgs fills args for new job. func (job *Job) FillArgs(args JobArgs) { args.fillJob(job) } +// FillFinishedArgs fills args for finished job. +func (job *Job) FillFinishedArgs(args FinishedJobArgs) { + args.fillFinishedJob(job) +} + // Encode encodes job with json format. // updateRawArgs is used to determine whether to update the raw args. func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { @@ -512,11 +513,15 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { } } } else { - job.RawArgsV2, err = json.Marshal(job.ArgsV2) + var arg any + if len(job.Args) > 0 { + arg = job.Args[0] + } + job.RawArgs, err = json.Marshal(arg) if err != nil { return nil, errors.Trace(err) } - // TODO remember update sub-jobs' RawArgsV2 when we do it. + // TODO remember update sub-jobs' RawArgs when we do it. } } @@ -538,6 +543,7 @@ func (job *Job) Decode(b []byte) error { // DecodeArgs decodes serialized job arguments from job.RawArgs into the given // variables, and also save the result in job.Args. It's for JobVersion1. func (job *Job) DecodeArgs(args ...any) error { + intest.Assert(job.Version == JobVersion1, "Job.DecodeArgs is only used for JobVersion1") var rawArgs []json.RawMessage if err := json.Unmarshal(job.RawArgs, &rawArgs); err != nil { return errors.Trace(err) diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 9f26c1aba3f16..9c769bdd0730b 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -18,19 +18,21 @@ import ( "encoding/json" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/util/intest" ) // getOrDecodeArgsV2 get the argsV2 from job, if the argsV2 is nil, decode rawArgsV2 // and fill argsV2. func getOrDecodeArgsV2[T JobArgs](job *Job) (T, error) { - if job.ArgsV2 != nil { - return job.ArgsV2.(T), nil + intest.Assert(job.Version == JobVersion2, "job version is not v2") + if len(job.Args) > 0 { + return job.Args[0].(T), nil } var v T - if err := json.Unmarshal(job.RawArgsV2, &v); err != nil { + if err := json.Unmarshal(job.RawArgs, &v); err != nil { return v, errors.Trace(err) } - job.ArgsV2 = v + job.Args = []any{v} return v, nil } @@ -41,6 +43,146 @@ type JobArgs interface { fillJob(job *Job) } +// FinishedJobArgs is the interface for finished job arguments. +// in most cases, job args are cleared out after the job is finished, but some jobs +// will write some args back to the job for other components. +type FinishedJobArgs interface { + // fillFinishedJob fills the job args for finished job. we make it private to avoid + // calling it directly, use Job.FillFinishedArgs to fill the job args. + fillFinishedJob(job *Job) +} + +// CreateSchemaArgs is the arguments for create schema job. +type CreateSchemaArgs struct { + DBInfo *DBInfo `json:"db_info,omitempty"` +} + +func (a *CreateSchemaArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + job.Args = []any{a.DBInfo} + return + } + job.Args = []any{a} +} + +// GetCreateSchemaArgs gets the args for create schema job. +func GetCreateSchemaArgs(job *Job) (*CreateSchemaArgs, error) { + if job.Version == JobVersion1 { + dbInfo := &DBInfo{} + err := job.DecodeArgs(dbInfo) + if err != nil { + return nil, errors.Trace(err) + } + return &CreateSchemaArgs{DBInfo: dbInfo}, nil + } + + argsV2, err := getOrDecodeArgsV2[*CreateSchemaArgs](job) + if err != nil { + return nil, errors.Trace(err) + } + return argsV2, nil +} + +// DropSchemaArgs is the arguments for drop schema job. +type DropSchemaArgs struct { + // this is the args for job submission, it's invalid if the job is finished. + FKCheck bool `json:"fk_check,omitempty"` + // this is the args for finished job. this list include all partition IDs too. + AllDroppedTableIDs []int64 `json:"all_dropped_table_ids,omitempty"` +} + +func (a *DropSchemaArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + job.Args = []any{a.FKCheck} + return + } + job.Args = []any{a} +} + +func (a *DropSchemaArgs) fillFinishedJob(job *Job) { + if job.Version == JobVersion1 { + job.Args = []any{a.AllDroppedTableIDs} + return + } + job.Args = []any{a} +} + +// GetDropSchemaArgs gets the args for drop schema job. +func GetDropSchemaArgs(job *Job) (*DropSchemaArgs, error) { + return getDropSchemaArgs(job, false) +} + +// GetFinishedDropSchemaArgs gets the args for drop schema job after the job is finished. +func GetFinishedDropSchemaArgs(job *Job) (*DropSchemaArgs, error) { + return getDropSchemaArgs(job, true) +} + +func getDropSchemaArgs(job *Job, argsOfFinished bool) (*DropSchemaArgs, error) { + if job.Version == JobVersion1 { + if argsOfFinished { + var physicalTableIDs []int64 + if err := job.DecodeArgs(&physicalTableIDs); err != nil { + return nil, err + } + return &DropSchemaArgs{AllDroppedTableIDs: physicalTableIDs}, nil + } + var fkCheck bool + if err := job.DecodeArgs(&fkCheck); err != nil { + return nil, err + } + return &DropSchemaArgs{FKCheck: fkCheck}, nil + } + return getOrDecodeArgsV2[*DropSchemaArgs](job) +} + +// ModifySchemaArgs is the arguments for modify schema job. +type ModifySchemaArgs struct { + // below 2 are used for modify schema charset and collate. + ToCharset string `json:"to_charset,omitempty"` + ToCollate string `json:"to_collate,omitempty"` + // used for modify schema placement policy. + // might be nil, means set it to default. + PolicyRef *PolicyRefInfo `json:"policy_ref,omitempty"` +} + +func (a *ModifySchemaArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + if job.Type == ActionModifySchemaCharsetAndCollate { + job.Args = []any{a.ToCharset, a.ToCollate} + } else if job.Type == ActionModifySchemaDefaultPlacement { + job.Args = []any{a.PolicyRef} + } + return + } + job.Args = []any{a} +} + +// GetModifySchemaArgs gets the modify schema args. +func GetModifySchemaArgs(job *Job) (*ModifySchemaArgs, error) { + if job.Version == JobVersion1 { + var ( + toCharset string + toCollate string + policyRef *PolicyRefInfo + ) + if job.Type == ActionModifySchemaCharsetAndCollate { + if err := job.DecodeArgs(&toCharset, &toCollate); err != nil { + return nil, errors.Trace(err) + } + } else if job.Type == ActionModifySchemaDefaultPlacement { + if err := job.DecodeArgs(&policyRef); err != nil { + return nil, errors.Trace(err) + } + } + return &ModifySchemaArgs{ + ToCharset: toCharset, + ToCollate: toCollate, + PolicyRef: policyRef, + }, nil + } + return getOrDecodeArgsV2[*ModifySchemaArgs](job) +} + // TruncateTableArgs is the arguments for truncate table job. type TruncateTableArgs struct { FKCheck bool `json:"fk_check,omitempty"` @@ -62,13 +204,41 @@ func (a *TruncateTableArgs) fillJob(job *Job) { job.Args = []any{a.NewTableID, a.FKCheck, a.NewPartitionIDs, len(a.OldPartitionIDs)} return } - job.ArgsV2 = a + job.Args = []any{a} +} + +func (a *TruncateTableArgs) fillFinishedJob(job *Job) { + if job.Version == JobVersion1 { + // the first param is the start key of the old table, it's not used anywhere + // now, so we fill an empty byte slice here. + // we can call tablecodec.EncodeTablePrefix(tableID) to get it. + job.Args = []any{[]byte{}, a.OldPartitionIDs} + return + } + job.Args = []any{a} } -// GetTruncateTableArgsBeforeRun gets the truncate table args that we set before -// running the job. the args might be changed after the job run on JobVersion1. -func GetTruncateTableArgsBeforeRun(job *Job) (*TruncateTableArgs, error) { +// GetTruncateTableArgs gets the truncate table args. +func GetTruncateTableArgs(job *Job) (*TruncateTableArgs, error) { + return getTruncateTableArgs(job, false) +} + +// GetFinishedTruncateTableArgs gets the truncate table args after the job is finished. +func GetFinishedTruncateTableArgs(job *Job) (*TruncateTableArgs, error) { + return getTruncateTableArgs(job, true) +} + +func getTruncateTableArgs(job *Job, argsOfFinished bool) (*TruncateTableArgs, error) { if job.Version == JobVersion1 { + if argsOfFinished { + var startKey []byte + var oldPartitionIDs []int64 + if err := job.DecodeArgs(&startKey, &oldPartitionIDs); err != nil { + return nil, errors.Trace(err) + } + return &TruncateTableArgs{OldPartitionIDs: oldPartitionIDs}, nil + } + var ( newTableID int64 fkCheck bool @@ -85,27 +255,5 @@ func GetTruncateTableArgsBeforeRun(job *Job) (*TruncateTableArgs, error) { }, nil } - argsV2, err := getOrDecodeArgsV2[*TruncateTableArgs](job) - if err != nil { - return nil, errors.Trace(err) - } - return argsV2, nil -} - -// GetTruncateTableArgsAfterRun gets the truncate table args after running the job. -func GetTruncateTableArgsAfterRun(job *Job) (*TruncateTableArgs, error) { - if job.Version == JobVersion1 { - var startKey []byte - var oldPartitionIDs []int64 - if err := job.DecodeArgs(&startKey, &oldPartitionIDs); err != nil { - return nil, errors.Trace(err) - } - return &TruncateTableArgs{OldPartitionIDs: oldPartitionIDs}, nil - } - - argsV2, err := getOrDecodeArgsV2[*TruncateTableArgs](job) - if err != nil { - return nil, errors.Trace(err) - } - return argsV2, nil + return getOrDecodeArgsV2[*TruncateTableArgs](job) } diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index 8a7df15099449..4156a6c63ffe0 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -24,45 +24,122 @@ func TestGetOrDecodeArgsV2(t *testing.T) { j := &Job{ Version: JobVersion2, Type: ActionTruncateTable, - ArgsV2: &TruncateTableArgs{ + Args: []any{&TruncateTableArgs{ FKCheck: true, - }, + }}, } _, err := j.Encode(true) require.NoError(t, err) - require.NotNil(t, j.RawArgsV2) + require.NotNil(t, j.RawArgs) // return existing argsV2 argsV2, err := getOrDecodeArgsV2[*TruncateTableArgs](j) require.NoError(t, err) - require.Same(t, j.ArgsV2, argsV2) + require.Same(t, j.Args[0], argsV2) // unmarshal from json var argsBak *TruncateTableArgs - argsBak, j.ArgsV2 = j.ArgsV2.(*TruncateTableArgs), nil + argsBak, j.Args = j.Args[0].(*TruncateTableArgs), nil argsV2, err = getOrDecodeArgsV2[*TruncateTableArgs](j) require.NoError(t, err) require.NotNil(t, argsV2) require.NotSame(t, argsBak, argsV2) } -func TestGetTruncateTableArgs(t *testing.T) { +func getJobBytes(t *testing.T, inArgs JobArgs, ver JobVersion, tp ActionType) []byte { + j := &Job{ + Version: ver, + Type: tp, + } + j.FillArgs(inArgs) + bytes, err := j.Encode(true) + require.NoError(t, err) + return bytes +} + +func getFinishedJobBytes(t *testing.T, inArgs FinishedJobArgs, ver JobVersion, tp ActionType) []byte { + j := &Job{ + Version: ver, + Type: tp, + } + j.FillFinishedArgs(inArgs) + bytes, err := j.Encode(true) + require.NoError(t, err) + return bytes +} + +func TestCreateSchemaArgs(t *testing.T) { + inArgs := &CreateSchemaArgs{ + DBInfo: &DBInfo{ID: 100}, + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionCreateSchema))) + args, err := GetCreateSchemaArgs(j2) + require.NoError(t, err) + require.EqualValues(t, inArgs.DBInfo, args.DBInfo) + } +} + +func TestDropSchemaArgs(t *testing.T) { + inArgs := &DropSchemaArgs{ + FKCheck: true, + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionDropSchema))) + args, err := GetDropSchemaArgs(j2) + require.NoError(t, err) + require.EqualValues(t, inArgs.FKCheck, args.FKCheck) + } + + inArgs = &DropSchemaArgs{ + AllDroppedTableIDs: []int64{1, 2}, + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getFinishedJobBytes(t, inArgs, v, ActionDropSchema))) + args, err := GetFinishedDropSchemaArgs(j2) + require.NoError(t, err) + require.Equal(t, []int64{1, 2}, args.AllDroppedTableIDs) + } +} + +func TestModifySchemaArgs(t *testing.T) { + inArgs := &ModifySchemaArgs{ + ToCharset: "aa", + ToCollate: "bb", + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionModifySchemaCharsetAndCollate))) + args, err := GetModifySchemaArgs(j2) + require.NoError(t, err) + require.Equal(t, "aa", args.ToCharset) + require.Equal(t, "bb", args.ToCollate) + } + for _, inArgs = range []*ModifySchemaArgs{ + {PolicyRef: &PolicyRefInfo{ID: 123}}, + {}, + } { + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionModifySchemaDefaultPlacement))) + args, err := GetModifySchemaArgs(j2) + require.NoError(t, err) + require.EqualValues(t, inArgs.PolicyRef, args.PolicyRef) + } + } +} + +func TestTruncateTableArgs(t *testing.T) { inArgs := &TruncateTableArgs{ NewTableID: 1, FKCheck: true, NewPartitionIDs: []int64{2, 3}, } for _, v := range []JobVersion{JobVersion1, JobVersion2} { - j := &Job{ - Version: v, - Type: ActionTruncateTable, - } - j.FillArgs(inArgs) - bytes, err := j.Encode(true) - require.NoError(t, err) - j2 := &Job{} - err = j2.Decode(bytes) - require.NoError(t, err) - args, err := GetTruncateTableArgsBeforeRun(j2) + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionTruncateTable))) + args, err := GetTruncateTableArgs(j2) require.NoError(t, err) require.Equal(t, int64(1), args.NewTableID) require.Equal(t, true, args.FKCheck) @@ -73,22 +150,9 @@ func TestGetTruncateTableArgs(t *testing.T) { OldPartitionIDs: []int64{5, 6}, } for _, v := range []JobVersion{JobVersion1, JobVersion2} { - j := &Job{ - Version: v, - Type: ActionTruncateTable, - } - if v == JobVersion1 { - j.Args = []any{[]byte{}, inArgs.OldPartitionIDs} - } else { - j.ArgsV2 = inArgs - } - bytes, err := j.Encode(true) - require.NoError(t, err) - j2 := &Job{} - err = j2.Decode(bytes) - require.NoError(t, err) - args, err := GetTruncateTableArgsAfterRun(j2) + require.NoError(t, j2.Decode(getFinishedJobBytes(t, inArgs, v, ActionTruncateTable))) + args, err := GetFinishedTruncateTableArgs(j2) require.NoError(t, err) require.Equal(t, []int64{5, 6}, args.OldPartitionIDs) } diff --git a/pkg/meta/model/job_test.go b/pkg/meta/model/job_test.go index 185c617827e33..4d8247255bfa7 100644 --- a/pkg/meta/model/job_test.go +++ b/pkg/meta/model/job_test.go @@ -407,7 +407,7 @@ func TestJobSize(t *testing.T) { - SubJob.ToProxyJob() ` job := Job{} - require.Equal(t, 440, int(unsafe.Sizeof(job)), msg) + require.Equal(t, 400, int(unsafe.Sizeof(job)), msg) } func TestBackfillMetaCodec(t *testing.T) { @@ -527,17 +527,17 @@ func TestJobEncodeV2(t *testing.T) { j := &Job{ Version: JobVersion2, Type: ActionTruncateTable, - ArgsV2: &TruncateTableArgs{ + Args: []any{&TruncateTableArgs{ FKCheck: true, - }, + }}, } _, err := j.Encode(false) require.NoError(t, err) - require.Nil(t, j.RawArgsV2) + require.Nil(t, j.RawArgs) _, err = j.Encode(true) require.NoError(t, err) - require.NotNil(t, j.RawArgsV2) + require.NotNil(t, j.RawArgs) args := &TruncateTableArgs{} - require.NoError(t, json.Unmarshal(j.RawArgsV2, args)) - require.EqualValues(t, j.ArgsV2, args) + require.NoError(t, json.Unmarshal(j.RawArgs, args)) + require.EqualValues(t, j.Args[0], args) } diff --git a/pkg/store/gcworker/gc_worker.go b/pkg/store/gcworker/gc_worker.go index 0cbd2bc2868fd..d6dd8d4658266 100644 --- a/pkg/store/gcworker/gc_worker.go +++ b/pkg/store/gcworker/gc_worker.go @@ -1574,6 +1574,7 @@ func doGCPlacementRules(se sessiontypes.Session, _ uint64, return } historyJob = &model.Job{ + Version: model.JobVersion1, ID: dr.JobID, Type: model.ActionDropTable, TableID: int64(v.(int)), @@ -1601,17 +1602,23 @@ func doGCPlacementRules(se sessiontypes.Session, _ uint64, physicalTableIDs = append(physicalTableIDs, historyJob.TableID) case model.ActionTruncateTable: var args *model.TruncateTableArgs - args, err = model.GetTruncateTableArgsAfterRun(historyJob) + args, err = model.GetFinishedTruncateTableArgs(historyJob) if err != nil { return } physicalTableIDs = append(args.OldPartitionIDs, historyJob.TableID) - case model.ActionDropSchema, model.ActionDropTablePartition, model.ActionTruncateTablePartition, + case model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: if err = historyJob.DecodeArgs(&physicalTableIDs); err != nil { return } + case model.ActionDropSchema: + args, err2 := model.GetFinishedDropSchemaArgs(historyJob) + if err2 != nil { + return err2 + } + physicalTableIDs = args.AllDroppedTableIDs } // Skip table ids that's already successfully handled. @@ -1655,6 +1662,7 @@ func (w *GCWorker) doGCLabelRules(dr util.DelRangeTask) (err error) { return } historyJob = &model.Job{ + Version: model.JobVersion1, ID: dr.JobID, Type: model.ActionDropTable, RawArgs: args,