Skip to content

Commit ac0fc22

Browse files
joechenrhti-chi-bot
authored andcommitted
ddl: move auto id handling from submitter to executor (#62449)
close #60804
1 parent e4e814f commit ac0fc22

File tree

4 files changed

+95
-70
lines changed

4 files changed

+95
-70
lines changed

pkg/ddl/create_table.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import (
5656
// DANGER: it is an internal function used by onCreateTable and onCreateTables, for reusing code. Be careful.
5757
// 1. it expects the argument of job has been deserialized.
5858
// 2. it won't call updateSchemaVersion, FinishTableJob and asyncNotifyEvent.
59-
func createTable(jobCtx *jobContext, job *model.Job, args *model.CreateTableArgs) (*model.TableInfo, error) {
59+
func createTable(jobCtx *jobContext, job *model.Job, r autoid.Requirement, args *model.CreateTableArgs) (*model.TableInfo, error) {
6060
schemaID := job.SchemaID
6161
tbInfo, fkCheck := args.TableInfo, args.FKCheck
6262

@@ -146,12 +146,59 @@ func createTable(jobCtx *jobContext, job *model.Job, args *model.CreateTableArgs
146146
return tbInfo, errors.Wrapf(err, "failed to notify PD the placement rules")
147147
}
148148

149+
// Updating auto id meta kv is done in a separate txn.
150+
// It's ok as these data are bind with table ID, and we won't use these
151+
// table IDs until info schema version is updated.
152+
if err := handleAutoIncID(r, job, tbInfo); err != nil {
153+
return tbInfo, errors.Trace(err)
154+
}
155+
149156
return tbInfo, nil
150157
default:
151158
return tbInfo, dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tbInfo.State)
152159
}
153160
}
154161

162+
type autoIDType struct {
163+
End int64
164+
Tp autoid.AllocatorType
165+
}
166+
167+
// handleAutoIncID handles auto_increment option in DDL. It creates a ID counter for the table and initiates the counter to a proper value.
168+
// For example if the option sets auto_increment to 10. The counter will be set to 9. So the next allocated ID will be 10.
169+
func handleAutoIncID(r autoid.Requirement, job *model.Job, tbInfo *model.TableInfo) error {
170+
allocs := autoid.NewAllocatorsFromTblInfo(r, job.SchemaID, tbInfo)
171+
172+
hs := make([]autoIDType, 0, 3)
173+
if tbInfo.AutoIncID > 1 {
174+
// Default tableAutoIncID base is 0.
175+
// If the first ID is expected to greater than 1, we need to do rebase.
176+
if tbInfo.SepAutoInc() {
177+
hs = append(hs, autoIDType{tbInfo.AutoIncID - 1, autoid.AutoIncrementType})
178+
} else {
179+
hs = append(hs, autoIDType{tbInfo.AutoIncID - 1, autoid.RowIDAllocType})
180+
}
181+
}
182+
if tbInfo.AutoIncIDExtra != 0 {
183+
hs = append(hs, autoIDType{tbInfo.AutoIncIDExtra - 1, autoid.RowIDAllocType})
184+
}
185+
if tbInfo.AutoRandID > 1 {
186+
// Default tableAutoRandID base is 0.
187+
// If the first ID is expected to greater than 1, we need to do rebase.
188+
hs = append(hs, autoIDType{tbInfo.AutoRandID - 1, autoid.AutoRandomType})
189+
}
190+
191+
for _, h := range hs {
192+
if alloc := allocs.Get(h.Tp); alloc != nil {
193+
if err := alloc.Rebase(context.Background(), h.End, false); err != nil {
194+
return errors.Trace(err)
195+
}
196+
}
197+
}
198+
199+
return nil
200+
}
201+
155202
func (w *worker) onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
156203
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
157204
if val.(bool) {
@@ -173,7 +220,10 @@ func (w *worker) onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _
173220
return w.createTableWithForeignKeys(jobCtx, job, args)
174221
}
175222

176-
tbInfo, err = createTable(jobCtx, job, args)
223+
tbInfo, err = createTable(jobCtx, job, &asAutoIDRequirement{
224+
store: w.store,
225+
autoidCli: w.autoidCli,
226+
}, args)
177227
if err != nil {
178228
return ver, errors.Trace(err)
179229
}
@@ -201,7 +251,10 @@ func (w *worker) createTableWithForeignKeys(jobCtx *jobContext, job *model.Job,
201251
// the `tbInfo.State` with `model.StateNone`, so it's fine to just call the `createTable` with
202252
// public state.
203253
// when `br` restores table, the state of `tbInfo` will be public.
204-
tbInfo, err = createTable(jobCtx, job, args)
254+
tbInfo, err = createTable(jobCtx, job, &asAutoIDRequirement{
255+
store: w.store,
256+
autoidCli: w.autoidCli,
257+
}, args)
205258
if err != nil {
206259
return ver, errors.Trace(err)
207260
}
@@ -266,7 +319,10 @@ func (w *worker) onCreateTables(jobCtx *jobContext, job *model.Job) (int64, erro
266319
}
267320
tableInfos = append(tableInfos, tableInfo)
268321
} else {
269-
tbInfo, err := createTable(jobCtx, stubJob, tblArgs)
322+
tbInfo, err := createTable(jobCtx, stubJob, &asAutoIDRequirement{
323+
store: w.store,
324+
autoidCli: w.autoidCli,
325+
}, tblArgs)
270326
if err != nil {
271327
job.State = model.JobStateCancelled
272328
return ver, errors.Trace(err)

pkg/ddl/executor.go

Lines changed: 11 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,47 +1145,6 @@ func getSharedInvolvingSchemaInfo(info *model.TableInfo) []model.InvolvingSchema
11451145
return ret
11461146
}
11471147

1148-
func (e *executor) createTableWithInfoPost(
1149-
ctx sessionctx.Context,
1150-
tbInfo *model.TableInfo,
1151-
schemaID int64,
1152-
scatterScope string,
1153-
) error {
1154-
var err error
1155-
var partitions []model.PartitionDefinition
1156-
if pi := tbInfo.GetPartitionInfo(); pi != nil {
1157-
partitions = pi.Definitions
1158-
}
1159-
preSplitAndScatter(ctx, e.store, tbInfo, partitions, scatterScope)
1160-
if tbInfo.AutoIncID > 1 {
1161-
// Default tableAutoIncID base is 0.
1162-
// If the first ID is expected to greater than 1, we need to do rebase.
1163-
newEnd := tbInfo.AutoIncID - 1
1164-
var allocType autoid.AllocatorType
1165-
if tbInfo.SepAutoInc() {
1166-
allocType = autoid.AutoIncrementType
1167-
} else {
1168-
allocType = autoid.RowIDAllocType
1169-
}
1170-
if err = e.handleAutoIncID(tbInfo, schemaID, newEnd, allocType); err != nil {
1171-
return errors.Trace(err)
1172-
}
1173-
}
1174-
// For issue https://github.com/pingcap/tidb/issues/46093
1175-
if tbInfo.AutoIncIDExtra != 0 {
1176-
if err = e.handleAutoIncID(tbInfo, schemaID, tbInfo.AutoIncIDExtra-1, autoid.RowIDAllocType); err != nil {
1177-
return errors.Trace(err)
1178-
}
1179-
}
1180-
if tbInfo.AutoRandID > 1 {
1181-
// Default tableAutoRandID base is 0.
1182-
// If the first ID is expected to greater than 1, we need to do rebase.
1183-
newEnd := tbInfo.AutoRandID - 1
1184-
err = e.handleAutoIncID(tbInfo, schemaID, newEnd, autoid.AutoRandomType)
1185-
}
1186-
return err
1187-
}
1188-
11891148
func (e *executor) CreateTableWithInfo(
11901149
ctx sessionctx.Context,
11911150
dbName pmodel.CIStr,
@@ -1215,9 +1174,9 @@ func (e *executor) CreateTableWithInfo(
12151174
if val, ok := jobW.GetSessionVars(variable.TiDBScatterRegion); ok {
12161175
scatterScope = val
12171176
}
1218-
err = e.createTableWithInfoPost(ctx, tbInfo, jobW.SchemaID, scatterScope)
1219-
}
12201177

1178+
preSplitAndScatterTable(ctx, e.store, tbInfo, scatterScope)
1179+
}
12211180
return errors.Trace(err)
12221181
}
12231182

@@ -1312,9 +1271,7 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context,
13121271
scatterScope = val
13131272
}
13141273
for _, tblArgs := range args.Tables {
1315-
if err = e.createTableWithInfoPost(ctx, tblArgs.TableInfo, jobW.SchemaID, scatterScope); err != nil {
1316-
return errors.Trace(err)
1317-
}
1274+
preSplitAndScatterTable(ctx, e.store, tblArgs.TableInfo, scatterScope)
13181275
}
13191276

13201277
return nil
@@ -1397,6 +1354,14 @@ func preSplitAndScatter(ctx sessionctx.Context, store kv.Storage, tbInfo *model.
13971354
}
13981355
}
13991356

1357+
func preSplitAndScatterTable(ctx sessionctx.Context, store kv.Storage, tbInfo *model.TableInfo, scatterScope string) {
1358+
var partitions []model.PartitionDefinition
1359+
if pi := tbInfo.GetPartitionInfo(); pi != nil {
1360+
partitions = pi.Definitions
1361+
}
1362+
preSplitAndScatter(ctx, store, tbInfo, partitions, scatterScope)
1363+
}
1364+
14001365
func (e *executor) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error {
14011366
logutil.DDLLogger().Info("get flashback cluster job", zap.Stringer("flashbackTS", oracle.GetTimeFromTS(flashbackTS)))
14021367
nowTS, err := ctx.GetStore().GetOracle().GetTimestamp(e.ctx, &oracle.Option{})
@@ -1531,19 +1496,6 @@ func checkCharsetAndCollation(cs string, co string) error {
15311496
return nil
15321497
}
15331498

1534-
// handleAutoIncID handles auto_increment option in DDL. It creates a ID counter for the table and initiates the counter to a proper value.
1535-
// For example if the option sets auto_increment to 10. The counter will be set to 9. So the next allocated ID will be 10.
1536-
func (e *executor) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64, newEnd int64, tp autoid.AllocatorType) error {
1537-
allocs := autoid.NewAllocatorsFromTblInfo(e.getAutoIDRequirement(), schemaID, tbInfo)
1538-
if alloc := allocs.Get(tp); alloc != nil {
1539-
err := alloc.Rebase(context.Background(), newEnd, false)
1540-
if err != nil {
1541-
return errors.Trace(err)
1542-
}
1543-
}
1544-
return nil
1545-
}
1546-
15471499
func (e *executor) getAutoIDRequirement() autoid.Requirement {
15481500
return &asAutoIDRequirement{
15491501
store: e.store,

pkg/ddl/table.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -577,15 +577,11 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64,
577577
}
578578
})
579579

580-
var partitions []model.PartitionDefinition
581-
if pi := tblInfo.GetPartitionInfo(); pi != nil {
582-
partitions = tblInfo.GetPartitionInfo().Definitions
583-
}
584580
var scatterScope string
585581
if val, ok := job.GetSessionVars(variable.TiDBScatterRegion); ok {
586582
scatterScope = val
587583
}
588-
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, partitions, scatterScope)
584+
preSplitAndScatterTable(w.sess.Context, jobCtx.store, tblInfo, scatterScope)
589585

590586
ver, err = updateSchemaVersion(jobCtx, job)
591587
if err != nil {

pkg/ddl/tests/fastcreatetable/fastcreatetable_test.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,20 +130,41 @@ func TestMergedJob(t *testing.T) {
130130
wg.Run(func() {
131131
tk1 := testkit.NewTestKit(t, store)
132132
tk1.MustExec("use test")
133-
tk1.MustExecToErr("create table t(a int)")
133+
tk1.MustExecToErr("create table t1(id int AUTO_INCREMENT, c int) AUTO_INCREMENT 1000")
134134
})
135135
wg.Run(func() {
136136
tk1 := testkit.NewTestKit(t, store)
137137
tk1.MustExec("use test")
138-
tk1.MustExecToErr("create table t1(a int)")
138+
tk1.MustExecToErr("create table t(a int)")
139139
})
140140
require.Eventually(t, func() bool {
141141
gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session())
142142
require.NoError(t, err)
143143
return len(gotJobs) == 2 && gotJobs[1].Type == model.ActionCreateTables
144144
}, 10*time.Second, 100*time.Millisecond)
145145

146+
// below 2 jobs are merged into the third group, they will succeed together.
147+
wg.Run(func() {
148+
tk1 := testkit.NewTestKit(t, store)
149+
tk1.MustExec("use test")
150+
tk1.MustExec("create table t1(id int AUTO_INCREMENT, c int) AUTO_INCREMENT 100")
151+
})
152+
wg.Run(func() {
153+
tk1 := testkit.NewTestKit(t, store)
154+
tk1.MustExec("use test")
155+
tk1.MustExec("create table t2(id int AUTO_INCREMENT, c int) AUTO_INCREMENT 100")
156+
})
157+
require.Eventually(t, func() bool {
158+
gotJobs, err := ddl.GetAllDDLJobs(ctx, tk.Session())
159+
require.NoError(t, err)
160+
return len(gotJobs) == 3 && gotJobs[2].Type == model.ActionCreateTables
161+
}, 10*time.Second, 100*time.Millisecond)
162+
146163
// start to run the jobs
147164
close(startSchedule)
148165
wg.Wait()
166+
167+
// Test the correctness of auto id after failed merge job 2
168+
tk.MustExec("insert into test.t1(c) values(1)")
169+
tk.MustQuery("select * from test.t1").Check(testkit.Rows("100 1"))
149170
}

0 commit comments

Comments
 (0)