Skip to content

Commit

Permalink
ddl: Add dbInfo and tblInfo in some operations of DDL (pingcap#1870)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Oct 26, 2016
1 parent 04b97b6 commit b80ba78
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 49 deletions.
9 changes: 5 additions & 4 deletions ddl/bg_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ func (d *ddl) prepareBgJob(t *meta.Meta, ddlJob *model.Job) error {
Type: ddlJob.Type,
}

if len(ddlJob.Args) > 0 {
// ddlJob.Args[0] is the schema version that isn't necessary in background job and it will make
// the background job of dropping schema become more complicated to handle.
job.Args = ddlJob.Args[1:]
if len(ddlJob.Args) >= 2 {
// ddlJob.Args[0] is the schema version that isn't necessary in background job and
// ddlJob.Args[1] is the table information or the database information.
// They will make the background job of dropping schema become more complicated to handle.
job.Args = ddlJob.Args[2:]
}

err := t.EnQueueBgJob(job)
Expand Down
4 changes: 2 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error {
// finish this job
job.SchemaState = model.StatePublic
job.State = model.JobDone
addFinishInfo(job, ver, tblInfo)
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidColumnState.Gen("invalid column state %v", columnInfo.State)
Expand Down Expand Up @@ -292,7 +292,7 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error {
// finish this job
job.SchemaState = model.StateNone
job.State = model.JobDone
addFinishInfo(job, ver, tblInfo)
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (d *ddl) checkOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
if flag == bgJobFlag {
// Background job is serial processing, so we can extend the owner timeout to make sure
// a batch of rows will be processed before timeout. So here we use 20 * lease to check its timeout.
maxTimeout := int64(20 * d.lease)
maxTimeout = int64(20 * d.lease)
// If 20 * lease is greater than maxBgOwnerTimeout, we will use default maxBgOwnerTimeout.
if maxTimeout > maxBgOwnerTimeout {
maxTimeout = maxBgOwnerTimeout
Expand Down
4 changes: 2 additions & 2 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) error {
}
// finish this job
job.State = model.JobDone
addFinishInfo(job, ver, nil)
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidForeignKeyState.Gen("invalid fk state %v", fkInfo.State)
Expand Down Expand Up @@ -113,7 +113,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) error {
}
// finish this job
job.State = model.JobDone
addFinishInfo(job, ver, nil)
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidForeignKeyState.Gen("invalid fk state %v", fkInfo.State)
Expand Down
4 changes: 2 additions & 2 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func testDropForeignKey(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo,
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v})
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
c.Assert(err, IsNil)
c.Assert(checkOK, IsTrue)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v})
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})

checkOK = false
tc.onJobUpdated = func(job *model.Job) {
Expand Down
4 changes: 2 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
// finish this job
job.SchemaState = model.StatePublic
job.State = model.JobDone
addFinishInfo(job, ver, tblInfo)
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidIndexState.Gen("invalid index state %v", tblInfo.State)
Expand Down Expand Up @@ -343,7 +343,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
} else {
job.State = model.JobDone
}
addFinishInfo(job, ver, tblInfo)
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
Expand Down
16 changes: 9 additions & 7 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,15 @@ func (d *ddl) delKeysWithPrefix(prefix kv.Key, jobType JobType, job *model.Job,
return count, nil
}

// addFinishInfo adds schema version and table information that are used for binlog.
// tblInfo is added in the following operations: add column, drop column, add index, drop index.
func addFinishInfo(job *model.Job, ver int64, tblInfo *model.TableInfo) {
if tblInfo == nil {
job.Args = []interface{}{ver}
return
}
// addDBHistoryInfo adds schema version and schema information that are used for binlog.
// dbInfo is added in the following operations: create database, drop database.
func addDBHistoryInfo(job *model.Job, ver int64, dbInfo *model.DBInfo) {
job.Args = []interface{}{ver, dbInfo}
}

// addTableHistoryInfo adds schema version and table information that are used for binlog.
// tblInfo is added except for the following operations: create database, drop database.
func addTableHistoryInfo(job *model.Job, ver int64, tblInfo *model.TableInfo) {
job.Args = []interface{}{ver, tblInfo}
}

Expand Down
10 changes: 4 additions & 6 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error {
}
// finish this job
job.State = model.JobDone
addFinishInfo(job, ver, nil)
addDBHistoryInfo(job, ver, dbInfo)
return nil
default:
// we can't enter here.
Expand Down Expand Up @@ -99,7 +99,7 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error {
dbInfo.State = model.StateDeleteOnly
err = t.UpdateDatabase(dbInfo)
case model.StateDeleteOnly:
dbInfo.State = model.StateDeleteReorganization
dbInfo.State = model.StateNone
tables, err := t.ListTables(job.SchemaID)
if err != nil {
return errors.Trace(err)
Expand All @@ -111,11 +111,9 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error {
}

// finish this job
addDBHistoryInfo(job, ver, dbInfo)
if len(tables) > 0 {
ids := getIDs(tables)
job.Args = []interface{}{ver, ids}
} else {
addFinishInfo(job, ver, nil)
job.Args = append(job.Args, getIDs(tables))
}
job.State = model.JobDone
job.SchemaState = model.StateNone
Expand Down
37 changes: 27 additions & 10 deletions ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ func testCreateSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) *
Type: model.ActionCreateSchema,
Args: []interface{}{dbInfo},
}

err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)

v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v})
dbInfo.State = model.StatePublic
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, db: dbInfo})
dbInfo.State = model.StateNone
return job
}

Expand All @@ -63,9 +65,9 @@ func testDropSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) (*m
SchemaID: dbInfo.ID,
Type: model.ActionDropSchema,
}

err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)

ver := getSchemaVer(c, ctx)
return job, ver
}
Expand Down Expand Up @@ -148,10 +150,21 @@ func getSchemaVer(c *C, ctx context.Context) int64 {

type historyJobArgs struct {
ver int64
db *model.DBInfo
tbl *model.TableInfo
tblIDs map[int64]struct{}
}

func checkEqualTable(c *C, t1, t2 *model.TableInfo) {
c.Assert(t1.ID, Equals, t2.ID)
c.Assert(t1.Name, Equals, t2.Name)
c.Assert(t1.Charset, Equals, t2.Charset)
c.Assert(t1.Collate, Equals, t2.Collate)
c.Assert(t1.PKIsHandle, DeepEquals, t2.PKIsHandle)
c.Assert(t1.Comment, DeepEquals, t2.Comment)
c.Assert(t1.AutoIncID, DeepEquals, t2.AutoIncID)
}

func checkHistoryJobArgs(c *C, ctx context.Context, id int64, args *historyJobArgs) {
txn, err := ctx.GetTxn(true)
c.Assert(err, IsNil)
Expand All @@ -162,20 +175,24 @@ func checkHistoryJobArgs(c *C, ctx context.Context, id int64, args *historyJobAr
var v int64
var ids []int64
tbl := &model.TableInfo{}
if args.tbl == nil && len(args.tblIDs) == 0 {
historyJob.DecodeArgs(&v)
if args.tbl != nil {
historyJob.DecodeArgs(&v, &tbl)
c.Assert(v, Equals, args.ver)
checkEqualTable(c, tbl, args.tbl)
return
}
// alter table DDL
if args.tbl != nil {
historyJob.DecodeArgs(&v, &tbl)
// only for create schema job
db := &model.DBInfo{}
if args.db != nil && len(args.tblIDs) == 0 {
historyJob.DecodeArgs(&v, &db)
c.Assert(v, Equals, args.ver)
c.Assert(db, DeepEquals, args.db)
return
}
// only for drop schema job
historyJob.DecodeArgs(&v, &ids)
historyJob.DecodeArgs(&v, &db, &ids)
c.Assert(v, Equals, args.ver)
c.Assert(db, DeepEquals, args.db)
for _, id := range ids {
c.Assert(args.tblIDs, HasKey, id)
delete(args.tblIDs, id)
Expand Down Expand Up @@ -243,7 +260,7 @@ func (s *testSchemaSuite) TestSchema(c *C) {
ids := make(map[int64]struct{})
ids[tblInfo1.ID] = struct{}{}
ids[tblInfo2.ID] = struct{}{}
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tblIDs: ids})
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, db: dbInfo, tblIDs: ids})
// check background ddl info
time.Sleep(testLease * 400)
verifyBgJobState(c, d, job, model.JobDone)
Expand Down
6 changes: 3 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error {
}
// finish this job
job.State = model.JobDone
addFinishInfo(job, ver, nil)
addTableHistoryInfo(job, ver, tbInfo)
return nil
default:
return ErrInvalidTableState.Gen("invalid table state %v", tbInfo.State)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error {
// finish this job
job.State = model.JobDone
job.SchemaState = model.StateNone
addFinishInfo(job, ver, nil)
addTableHistoryInfo(job, ver, tblInfo)
default:
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
}
Expand Down Expand Up @@ -215,6 +215,6 @@ func (d *ddl) onTruncateTable(t *meta.Meta, job *model.Job) error {
return errors.Trace(err)
}
job.State = model.JobDone
addFinishInfo(job, ver, nil)
addTableHistoryInfo(job, ver, tblInfo)
return nil
}
46 changes: 36 additions & 10 deletions ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ func testCreateTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tb
Type: model.ActionCreateTable,
Args: []interface{}{tblInfo},
}

err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)

v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v})
tblInfo.State = model.StatePublic
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
tblInfo.State = model.StateNone
return job
}

Expand All @@ -91,11 +93,29 @@ func testDropTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tblI
TableID: tblInfo.ID,
Type: model.ActionDropTable,
}

err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)

v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v})
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testTruncateTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job {
newTableID, err := d.genGlobalID()
c.Assert(err, IsNil)
job := &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionTruncateTable,
Args: []interface{}{newTableID},
}
err = d.doDDLJob(ctx, job)
c.Assert(err, IsNil)

v := getSchemaVer(c, ctx)
tblInfo.ID = newTableID
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

Expand Down Expand Up @@ -164,7 +184,7 @@ func (s *testTableSuite) TestTable(c *C) {
testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic)
testCheckJobDone(c, d, job, true)

// create an existing table.
// Create an existing table.
newTblInfo := testTableInfo(c, d, "t", 3)
job = &model.Job{
SchemaID: s.dbInfo.ID,
Expand All @@ -176,7 +196,7 @@ func (s *testTableSuite) TestTable(c *C) {
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)

// to drop a table with defaultBatchSize+10 records.
// To drop a table with defaultBatchSize+10 records.
tbl := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
for i := 1; i <= defaultBatchSize+10; i++ {
_, err = tbl.AddRecord(ctx, types.MakeDatums(i, i, i))
Expand Down Expand Up @@ -206,11 +226,20 @@ func (s *testTableSuite) TestTable(c *C) {
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)

// check background ddl info
// Check background ddl info.
time.Sleep(testLease * 200)
verifyBgJobState(c, d, job, model.JobDone)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
c.Assert(updatedCount, Equals, 2)

// for truncate table
tblInfo = testTableInfo(c, d, "tt", 3)
job = testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic)
testCheckJobDone(c, d, job, true)
job = testTruncateTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic)
testCheckJobDone(c, d, job, false)
}

func (s *testTableSuite) TestTableResume(c *C) {
Expand All @@ -220,14 +249,12 @@ func (s *testTableSuite) TestTableResume(c *C) {
testCheckOwner(c, d, true, ddlJobFlag)

tblInfo := testTableInfo(c, d, "t1", 3)

job := &model.Job{
SchemaID: s.dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionCreateTable,
Args: []interface{}{tblInfo},
}

testRunInterruptedJob(c, d, job)
testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic)

Expand All @@ -236,7 +263,6 @@ func (s *testTableSuite) TestTableResume(c *C) {
TableID: tblInfo.ID,
Type: model.ActionDropTable,
}

testRunInterruptedJob(c, d, job)
testCheckTableState(c, d, s.dbInfo, tblInfo, model.StateNone)
}

0 comments on commit b80ba78

Please sign in to comment.