Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (
// DANGER: it is an internal function used by onCreateTable and onCreateTables, for reusing code. Be careful.
// 1. it expects the argument of job has been deserialized.
// 2. it won't call updateSchemaVersion, FinishTableJob and asyncNotifyEvent.
func createTable(jobCtx *jobContext, job *model.Job, args *model.CreateTableArgs) (*model.TableInfo, error) {
func createTable(jobCtx *jobContext, job *model.Job, r autoid.Requirement, args *model.CreateTableArgs) (*model.TableInfo, error) {
schemaID := job.SchemaID
tbInfo, fkCheck := args.TableInfo, args.FKCheck

Expand Down Expand Up @@ -146,12 +146,59 @@ func createTable(jobCtx *jobContext, job *model.Job, args *model.CreateTableArgs
return tbInfo, errors.Wrapf(err, "failed to notify PD the placement rules")
}

// Updating auto id meta kv is done in a separate txn.
// It's ok as these data are bind with table ID, and we won't use these
// table IDs until info schema version is updated.
if err := handleAutoIncID(r, job, tbInfo); err != nil {
return tbInfo, errors.Trace(err)
}

return tbInfo, nil
default:
return tbInfo, dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tbInfo.State)
}
}

type autoIDType struct {
End int64
Tp autoid.AllocatorType
}

// handleAutoIncID handles auto_increment option in DDL. It creates a ID counter for the table and initiates the counter to a proper value.
// For example if the option sets auto_increment to 10. The counter will be set to 9. So the next allocated ID will be 10.
func handleAutoIncID(r autoid.Requirement, job *model.Job, tbInfo *model.TableInfo) error {
allocs := autoid.NewAllocatorsFromTblInfo(r, job.SchemaID, tbInfo)

hs := make([]autoIDType, 0, 3)
if tbInfo.AutoIncID > 1 {
// Default tableAutoIncID base is 0.
// If the first ID is expected to greater than 1, we need to do rebase.
if tbInfo.SepAutoInc() {
hs = append(hs, autoIDType{tbInfo.AutoIncID - 1, autoid.AutoIncrementType})
} else {
hs = append(hs, autoIDType{tbInfo.AutoIncID - 1, autoid.RowIDAllocType})
}
}
if tbInfo.AutoIncIDExtra != 0 {
hs = append(hs, autoIDType{tbInfo.AutoIncIDExtra - 1, autoid.RowIDAllocType})
}
if tbInfo.AutoRandID > 1 {
// Default tableAutoRandID base is 0.
// If the first ID is expected to greater than 1, we need to do rebase.
hs = append(hs, autoIDType{tbInfo.AutoRandID - 1, autoid.AutoRandomType})
}

for _, h := range hs {
if alloc := allocs.Get(h.Tp); alloc != nil {
if err := alloc.Rebase(context.Background(), h.End, false); err != nil {
return errors.Trace(err)
}
}
}

return nil
}

func (w *worker) onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -173,7 +220,10 @@ func (w *worker) onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _
return w.createTableWithForeignKeys(jobCtx, job, args)
}

tbInfo, err = createTable(jobCtx, job, args)
tbInfo, err = createTable(jobCtx, job, &asAutoIDRequirement{
store: w.store,
autoidCli: w.autoidCli,
}, args)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -201,7 +251,10 @@ func (w *worker) createTableWithForeignKeys(jobCtx *jobContext, job *model.Job,
// the `tbInfo.State` with `model.StateNone`, so it's fine to just call the `createTable` with
// public state.
// when `br` restores table, the state of `tbInfo` will be public.
tbInfo, err = createTable(jobCtx, job, args)
tbInfo, err = createTable(jobCtx, job, &asAutoIDRequirement{
store: w.store,
autoidCli: w.autoidCli,
}, args)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -266,7 +319,10 @@ func (w *worker) onCreateTables(jobCtx *jobContext, job *model.Job) (int64, erro
}
tableInfos = append(tableInfos, tableInfo)
} else {
tbInfo, err := createTable(jobCtx, stubJob, tblArgs)
tbInfo, err := createTable(jobCtx, stubJob, &asAutoIDRequirement{
store: w.store,
autoidCli: w.autoidCli,
}, tblArgs)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down
70 changes: 11 additions & 59 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,47 +1145,6 @@ func getSharedInvolvingSchemaInfo(info *model.TableInfo) []model.InvolvingSchema
return ret
}

func (e *executor) createTableWithInfoPost(
ctx sessionctx.Context,
tbInfo *model.TableInfo,
schemaID int64,
scatterScope string,
) error {
var err error
var partitions []model.PartitionDefinition
if pi := tbInfo.GetPartitionInfo(); pi != nil {
partitions = pi.Definitions
}
preSplitAndScatter(ctx, e.store, tbInfo, partitions, scatterScope)
if tbInfo.AutoIncID > 1 {
// Default tableAutoIncID base is 0.
// If the first ID is expected to greater than 1, we need to do rebase.
newEnd := tbInfo.AutoIncID - 1
var allocType autoid.AllocatorType
if tbInfo.SepAutoInc() {
allocType = autoid.AutoIncrementType
} else {
allocType = autoid.RowIDAllocType
}
if err = e.handleAutoIncID(tbInfo, schemaID, newEnd, allocType); err != nil {
return errors.Trace(err)
}
}
// For issue https://github.com/pingcap/tidb/issues/46093
if tbInfo.AutoIncIDExtra != 0 {
if err = e.handleAutoIncID(tbInfo, schemaID, tbInfo.AutoIncIDExtra-1, autoid.RowIDAllocType); err != nil {
return errors.Trace(err)
}
}
if tbInfo.AutoRandID > 1 {
// Default tableAutoRandID base is 0.
// If the first ID is expected to greater than 1, we need to do rebase.
newEnd := tbInfo.AutoRandID - 1
err = e.handleAutoIncID(tbInfo, schemaID, newEnd, autoid.AutoRandomType)
}
return err
}

func (e *executor) CreateTableWithInfo(
ctx sessionctx.Context,
dbName pmodel.CIStr,
Expand Down Expand Up @@ -1215,9 +1174,9 @@ func (e *executor) CreateTableWithInfo(
if val, ok := jobW.GetSessionVars(variable.TiDBScatterRegion); ok {
scatterScope = val
}
err = e.createTableWithInfoPost(ctx, tbInfo, jobW.SchemaID, scatterScope)
}

preSplitAndScatterTable(ctx, e.store, tbInfo, scatterScope)
}
return errors.Trace(err)
}

Expand Down Expand Up @@ -1312,9 +1271,7 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context,
scatterScope = val
}
for _, tblArgs := range args.Tables {
if err = e.createTableWithInfoPost(ctx, tblArgs.TableInfo, jobW.SchemaID, scatterScope); err != nil {
return errors.Trace(err)
}
preSplitAndScatterTable(ctx, e.store, tblArgs.TableInfo, scatterScope)
}

return nil
Expand Down Expand Up @@ -1397,6 +1354,14 @@ func preSplitAndScatter(ctx sessionctx.Context, store kv.Storage, tbInfo *model.
}
}

func preSplitAndScatterTable(ctx sessionctx.Context, store kv.Storage, tbInfo *model.TableInfo, scatterScope string) {
var partitions []model.PartitionDefinition
if pi := tbInfo.GetPartitionInfo(); pi != nil {
partitions = pi.Definitions
}
preSplitAndScatter(ctx, store, tbInfo, partitions, scatterScope)
}

func (e *executor) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error {
logutil.DDLLogger().Info("get flashback cluster job", zap.Stringer("flashbackTS", oracle.GetTimeFromTS(flashbackTS)))
nowTS, err := ctx.GetStore().GetOracle().GetTimestamp(e.ctx, &oracle.Option{})
Expand Down Expand Up @@ -1531,19 +1496,6 @@ func checkCharsetAndCollation(cs string, co string) error {
return nil
}

// handleAutoIncID handles auto_increment option in DDL. It creates a ID counter for the table and initiates the counter to a proper value.
// For example if the option sets auto_increment to 10. The counter will be set to 9. So the next allocated ID will be 10.
func (e *executor) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64, newEnd int64, tp autoid.AllocatorType) error {
allocs := autoid.NewAllocatorsFromTblInfo(e.getAutoIDRequirement(), schemaID, tbInfo)
if alloc := allocs.Get(tp); alloc != nil {
err := alloc.Rebase(context.Background(), newEnd, false)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func (e *executor) getAutoIDRequirement() autoid.Requirement {
return &asAutoIDRequirement{
store: e.store,
Expand Down
6 changes: 1 addition & 5 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,15 +577,11 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64,
}
})

var partitions []model.PartitionDefinition
if pi := tblInfo.GetPartitionInfo(); pi != nil {
partitions = tblInfo.GetPartitionInfo().Definitions
}
var scatterScope string
if val, ok := job.GetSessionVars(variable.TiDBScatterRegion); ok {
scatterScope = val
}
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, partitions, scatterScope)
preSplitAndScatterTable(w.sess.Context, jobCtx.store, tblInfo, scatterScope)

ver, err = updateSchemaVersion(jobCtx, job)
if err != nil {
Expand Down
25 changes: 23 additions & 2 deletions pkg/ddl/tests/fastcreatetable/fastcreatetable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,41 @@ func TestMergedJob(t *testing.T) {
wg.Run(func() {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExecToErr("create table t(a int)")
tk1.MustExecToErr("create table t1(id int AUTO_INCREMENT, c int) AUTO_INCREMENT 1000")
})
wg.Run(func() {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExecToErr("create table t1(a int)")
tk1.MustExecToErr("create table t(a int)")
})
require.Eventually(t, func() bool {
gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session())
require.NoError(t, err)
return len(gotJobs) == 2 && gotJobs[1].Type == model.ActionCreateTables
}, 10*time.Second, 100*time.Millisecond)

// below 2 jobs are merged into the third group, they will succeed together.
wg.Run(func() {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("create table t1(id int AUTO_INCREMENT, c int) AUTO_INCREMENT 100")
})
wg.Run(func() {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("create table t2(id int AUTO_INCREMENT, c int) AUTO_INCREMENT 100")
})
require.Eventually(t, func() bool {
gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session())
require.NoError(t, err)
return len(gotJobs) == 3 && gotJobs[2].Type == model.ActionCreateTables
}, 10*time.Second, 100*time.Millisecond)

// start to run the jobs
close(startSchedule)
wg.Wait()

// Test the correctness of auto id after failed merge job 2
tk.MustExec("insert into test.t1(c) values(1)")
tk.MustQuery("select * from test.t1").Check(testkit.Rows("100 1"))
}