diff --git a/ddl/bg_worker.go b/ddl/bg_worker.go index 0efa7055922d6..55e8fa3b2ee16 100644 --- a/ddl/bg_worker.go +++ b/ddl/bg_worker.go @@ -109,10 +109,11 @@ func (d *ddl) prepareBgJob(t *meta.Meta, ddlJob *model.Job) error { Type: ddlJob.Type, } - if len(ddlJob.Args) > 0 { - // ddlJob.Args[0] is the schema version that isn't necessary in background job and it will make - // the background job of dropping schema become more complicated to handle. - job.Args = ddlJob.Args[1:] + if len(ddlJob.Args) >= 2 { + // ddlJob.Args[0] is the schema version that isn't necessary in background job and + // ddlJob.Args[1] is the table information or the database information. + // They will make the background job of dropping schema become more complicated to handle. + job.Args = ddlJob.Args[2:] } err := t.EnQueueBgJob(job) diff --git a/ddl/column.go b/ddl/column.go index 18b9b076f7578..5ec5b7444086a 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -193,7 +193,7 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error { // finish this job job.SchemaState = model.StatePublic job.State = model.JobDone - addFinishInfo(job, ver, tblInfo) + addTableHistoryInfo(job, ver, tblInfo) return nil default: return ErrInvalidColumnState.Gen("invalid column state %v", columnInfo.State) @@ -292,7 +292,7 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error { // finish this job job.SchemaState = model.StateNone job.State = model.JobDone - addFinishInfo(job, ver, tblInfo) + addTableHistoryInfo(job, ver, tblInfo) return nil default: return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index cfd38539b241e..cefa3a3c190d5 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -174,7 +174,7 @@ func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) { if flag == bgJobFlag { // Background job is serial processing, so we can extend the owner timeout to make sure // a batch of rows will be processed before timeout. So here we use 20 * lease to check its timeout. - maxTimeout := int64(20 * d.lease) + maxTimeout = int64(20 * d.lease) // If 20 * lease is greater than maxBgOwnerTimeout, we will use default maxBgOwnerTimeout. if maxTimeout > maxBgOwnerTimeout { maxTimeout = maxBgOwnerTimeout diff --git a/ddl/foreign_key.go b/ddl/foreign_key.go index a939d37cf3d62..987bebb6f925d 100644 --- a/ddl/foreign_key.go +++ b/ddl/foreign_key.go @@ -52,7 +52,7 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) error { } // finish this job job.State = model.JobDone - addFinishInfo(job, ver, nil) + addTableHistoryInfo(job, ver, tblInfo) return nil default: return ErrInvalidForeignKeyState.Gen("invalid fk state %v", fkInfo.State) @@ -113,7 +113,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) error { } // finish this job job.State = model.JobDone - addFinishInfo(job, ver, nil) + addTableHistoryInfo(job, ver, tblInfo) return nil default: return ErrInvalidForeignKeyState.Gen("invalid fk state %v", fkInfo.State) diff --git a/ddl/foreign_key_test.go b/ddl/foreign_key_test.go index 80fcbe6201f83..a04923faf8660 100644 --- a/ddl/foreign_key_test.go +++ b/ddl/foreign_key_test.go @@ -90,7 +90,7 @@ func testDropForeignKey(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) return job } @@ -157,7 +157,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { c.Assert(err, IsNil) c.Assert(checkOK, IsTrue) v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) checkOK = false tc.onJobUpdated = func(job *model.Job) { diff --git a/ddl/index.go b/ddl/index.go index e0dbd48773f83..020369222dd18 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -229,7 +229,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error { // finish this job job.SchemaState = model.StatePublic job.State = model.JobDone - addFinishInfo(job, ver, tblInfo) + addTableHistoryInfo(job, ver, tblInfo) return nil default: return ErrInvalidIndexState.Gen("invalid index state %v", tblInfo.State) @@ -343,7 +343,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error { } else { job.State = model.JobDone } - addFinishInfo(job, ver, tblInfo) + addTableHistoryInfo(job, ver, tblInfo) return nil default: return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) diff --git a/ddl/reorg.go b/ddl/reorg.go index 6192ccd8d116a..2a157fce66e10 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -242,13 +242,15 @@ func (d *ddl) delKeysWithPrefix(prefix kv.Key, jobType JobType, job *model.Job, return count, nil } -// addFinishInfo adds schema version and table information that are used for binlog. -// tblInfo is added in the following operations: add column, drop column, add index, drop index. -func addFinishInfo(job *model.Job, ver int64, tblInfo *model.TableInfo) { - if tblInfo == nil { - job.Args = []interface{}{ver} - return - } +// addDBHistoryInfo adds schema version and schema information that are used for binlog. +// dbInfo is added in the following operations: create database, drop database. +func addDBHistoryInfo(job *model.Job, ver int64, dbInfo *model.DBInfo) { + job.Args = []interface{}{ver, dbInfo} +} + +// addTableHistoryInfo adds schema version and table information that are used for binlog. +// tblInfo is added except for the following operations: create database, drop database. +func addTableHistoryInfo(job *model.Job, ver int64, tblInfo *model.TableInfo) { job.Args = []interface{}{ver, tblInfo} } diff --git a/ddl/schema.go b/ddl/schema.go index 4e9645d0551f8..250b1bf249547 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -64,7 +64,7 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error { } // finish this job job.State = model.JobDone - addFinishInfo(job, ver, nil) + addDBHistoryInfo(job, ver, dbInfo) return nil default: // we can't enter here. @@ -99,7 +99,7 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error { dbInfo.State = model.StateDeleteOnly err = t.UpdateDatabase(dbInfo) case model.StateDeleteOnly: - dbInfo.State = model.StateDeleteReorganization + dbInfo.State = model.StateNone tables, err := t.ListTables(job.SchemaID) if err != nil { return errors.Trace(err) @@ -111,11 +111,9 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error { } // finish this job + addDBHistoryInfo(job, ver, dbInfo) if len(tables) > 0 { - ids := getIDs(tables) - job.Args = []interface{}{ver, ids} - } else { - addFinishInfo(job, ver, nil) + job.Args = append(job.Args, getIDs(tables)) } job.State = model.JobDone job.SchemaState = model.StateNone diff --git a/ddl/schema_test.go b/ddl/schema_test.go index 8208cf64ac8f2..05c24338a8480 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -50,11 +50,13 @@ func testCreateSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) * Type: model.ActionCreateSchema, Args: []interface{}{dbInfo}, } - err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) + v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + dbInfo.State = model.StatePublic + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, db: dbInfo}) + dbInfo.State = model.StateNone return job } @@ -63,9 +65,9 @@ func testDropSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) (*m SchemaID: dbInfo.ID, Type: model.ActionDropSchema, } - err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) + ver := getSchemaVer(c, ctx) return job, ver } @@ -148,10 +150,21 @@ func getSchemaVer(c *C, ctx context.Context) int64 { type historyJobArgs struct { ver int64 + db *model.DBInfo tbl *model.TableInfo tblIDs map[int64]struct{} } +func checkEqualTable(c *C, t1, t2 *model.TableInfo) { + c.Assert(t1.ID, Equals, t2.ID) + c.Assert(t1.Name, Equals, t2.Name) + c.Assert(t1.Charset, Equals, t2.Charset) + c.Assert(t1.Collate, Equals, t2.Collate) + c.Assert(t1.PKIsHandle, DeepEquals, t2.PKIsHandle) + c.Assert(t1.Comment, DeepEquals, t2.Comment) + c.Assert(t1.AutoIncID, DeepEquals, t2.AutoIncID) +} + func checkHistoryJobArgs(c *C, ctx context.Context, id int64, args *historyJobArgs) { txn, err := ctx.GetTxn(true) c.Assert(err, IsNil) @@ -162,20 +175,24 @@ func checkHistoryJobArgs(c *C, ctx context.Context, id int64, args *historyJobAr var v int64 var ids []int64 tbl := &model.TableInfo{} - if args.tbl == nil && len(args.tblIDs) == 0 { - historyJob.DecodeArgs(&v) + if args.tbl != nil { + historyJob.DecodeArgs(&v, &tbl) c.Assert(v, Equals, args.ver) + checkEqualTable(c, tbl, args.tbl) return } - // alter table DDL - if args.tbl != nil { - historyJob.DecodeArgs(&v, &tbl) + // only for create schema job + db := &model.DBInfo{} + if args.db != nil && len(args.tblIDs) == 0 { + historyJob.DecodeArgs(&v, &db) c.Assert(v, Equals, args.ver) + c.Assert(db, DeepEquals, args.db) return } // only for drop schema job - historyJob.DecodeArgs(&v, &ids) + historyJob.DecodeArgs(&v, &db, &ids) c.Assert(v, Equals, args.ver) + c.Assert(db, DeepEquals, args.db) for _, id := range ids { c.Assert(args.tblIDs, HasKey, id) delete(args.tblIDs, id) @@ -243,7 +260,7 @@ func (s *testSchemaSuite) TestSchema(c *C) { ids := make(map[int64]struct{}) ids[tblInfo1.ID] = struct{}{} ids[tblInfo2.ID] = struct{}{} - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tblIDs: ids}) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, db: dbInfo, tblIDs: ids}) // check background ddl info time.Sleep(testLease * 400) verifyBgJobState(c, d, job, model.JobDone) diff --git a/ddl/table.go b/ddl/table.go index 74760dfc0ab4f..9706902e4fef8 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -71,7 +71,7 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error { } // finish this job job.State = model.JobDone - addFinishInfo(job, ver, nil) + addTableHistoryInfo(job, ver, tbInfo) return nil default: return ErrInvalidTableState.Gen("invalid table state %v", tbInfo.State) @@ -134,7 +134,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error { // finish this job job.State = model.JobDone job.SchemaState = model.StateNone - addFinishInfo(job, ver, nil) + addTableHistoryInfo(job, ver, tblInfo) default: err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) } @@ -215,6 +215,6 @@ func (d *ddl) onTruncateTable(t *meta.Meta, job *model.Job) error { return errors.Trace(err) } job.State = model.JobDone - addFinishInfo(job, ver, nil) + addTableHistoryInfo(job, ver, tblInfo) return nil } diff --git a/ddl/table_test.go b/ddl/table_test.go index f952fb32a95e0..3ec142322784d 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -77,11 +77,13 @@ func testCreateTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tb Type: model.ActionCreateTable, Args: []interface{}{tblInfo}, } - err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) + v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + tblInfo.State = model.StatePublic + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + tblInfo.State = model.StateNone return job } @@ -91,11 +93,29 @@ func testDropTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tblI TableID: tblInfo.ID, Type: model.ActionDropTable, } - err := d.doDDLJob(ctx, job) c.Assert(err, IsNil) + v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + return job +} + +func testTruncateTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { + newTableID, err := d.genGlobalID() + c.Assert(err, IsNil) + job := &model.Job{ + SchemaID: dbInfo.ID, + TableID: tblInfo.ID, + Type: model.ActionTruncateTable, + Args: []interface{}{newTableID}, + } + err = d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + + v := getSchemaVer(c, ctx) + tblInfo.ID = newTableID + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) return job } @@ -164,7 +184,7 @@ func (s *testTableSuite) TestTable(c *C) { testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic) testCheckJobDone(c, d, job, true) - // create an existing table. + // Create an existing table. newTblInfo := testTableInfo(c, d, "t", 3) job = &model.Job{ SchemaID: s.dbInfo.ID, @@ -176,7 +196,7 @@ func (s *testTableSuite) TestTable(c *C) { c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) - // to drop a table with defaultBatchSize+10 records. + // To drop a table with defaultBatchSize+10 records. tbl := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) for i := 1; i <= defaultBatchSize+10; i++ { _, err = tbl.AddRecord(ctx, types.MakeDatums(i, i, i)) @@ -206,11 +226,20 @@ func (s *testTableSuite) TestTable(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - // check background ddl info + // Check background ddl info. time.Sleep(testLease * 200) verifyBgJobState(c, d, job, model.JobDone) c.Assert(errors.ErrorStack(checkErr), Equals, "") c.Assert(updatedCount, Equals, 2) + + // for truncate table + tblInfo = testTableInfo(c, d, "tt", 3) + job = testCreateTable(c, ctx, d, s.dbInfo, tblInfo) + testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic) + testCheckJobDone(c, d, job, true) + job = testTruncateTable(c, ctx, d, s.dbInfo, tblInfo) + testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic) + testCheckJobDone(c, d, job, false) } func (s *testTableSuite) TestTableResume(c *C) { @@ -220,14 +249,12 @@ func (s *testTableSuite) TestTableResume(c *C) { testCheckOwner(c, d, true, ddlJobFlag) tblInfo := testTableInfo(c, d, "t1", 3) - job := &model.Job{ SchemaID: s.dbInfo.ID, TableID: tblInfo.ID, Type: model.ActionCreateTable, Args: []interface{}{tblInfo}, } - testRunInterruptedJob(c, d, job) testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic) @@ -236,7 +263,6 @@ func (s *testTableSuite) TestTableResume(c *C) { TableID: tblInfo.ID, Type: model.ActionDropTable, } - testRunInterruptedJob(c, d, job) testCheckTableState(c, d, s.dbInfo, tblInfo, model.StateNone) }