Skip to content

Commit

Permalink
ddl: refine cancel/rollback ddl job code. (pingcap#8858)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Jan 3, 2019
1 parent 91cdbf2 commit 0147e0c
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 164 deletions.
92 changes: 55 additions & 37 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,42 +121,52 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *
return colInfo, position, nil
}

func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, *ast.ColumnPosition, int, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
return nil, nil, nil, nil, 0, errors.Trace(err)
}
// gofail: var errorBeforeDecodeArgs bool
// if errorBeforeDecodeArgs {
// return ver, errors.New("occur an error before decode args")
// }
col := &model.ColumnInfo{}
pos := &ast.ColumnPosition{}
offset := 0
err = job.DecodeArgs(col, pos, &offset)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
return nil, nil, nil, nil, 0, errors.Trace(err)
}

columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
return nil, nil, nil, nil, 0, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
} else {
}
return tblInfo, columnInfo, col, pos, offset, nil
}

func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

// gofail: var errorBeforeDecodeArgs bool
// if errorBeforeDecodeArgs {
// return ver, errors.New("occur an error before decode args")
// }

tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if columnInfo == nil {
columnInfo, offset, err = createColumnInfo(tblInfo, col, pos)
if err != nil {
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -211,26 +221,8 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}

func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}

var colName model.CIStr
err = job.DecodeArgs(&colName)
tblInfo, colInfo, err := checkDropColumn(t, job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand Down Expand Up @@ -275,6 +267,32 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func checkDropColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return nil, nil, errors.Trace(err)
}

var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, errors.Trace(err)
}

colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobStateCancelled
return nil, nil, ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return nil, nil, errors.Trace(err)
}
return tblInfo, colInfo, nil
}

func onSetDefaultValue(t *meta.Meta, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
err := job.DecodeArgs(newCol)
Expand Down
38 changes: 23 additions & 15 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,24 +360,11 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int
}

func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
tblInfo, indexInfo, err := checkDropIndex(t, job)
if err != nil {
return ver, errors.Trace(err)
}

var indexName model.CIStr
if err = job.DecodeArgs(&indexName); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

originalState := indexInfo.State
switch indexInfo.State {
case model.StatePublic:
Expand All @@ -399,7 +386,7 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// reorganization -> absent
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if idx.Name.L != indexName.L {
if idx.Name.L != indexInfo.Name.L {
newIndices = append(newIndices, idx)
}
}
Expand Down Expand Up @@ -428,6 +415,27 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return nil, nil, errors.Trace(err)
}

var indexName model.CIStr
if err = job.DecodeArgs(&indexName); err != nil {
job.State = model.JobStateCancelled
return nil, nil, errors.Trace(err)
}

indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return nil, nil, ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}
return tblInfo, indexInfo, nil
}

const (
// DefaultTaskHandleCnt is default batch size of adding indices.
DefaultTaskHandleCnt = 128
Expand Down
102 changes: 43 additions & 59 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/util/schemautil"
Expand Down Expand Up @@ -78,34 +77,15 @@ func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredE

func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
job.State = model.JobStateRollingback
col := &model.ColumnInfo{}
pos := &ast.ColumnPosition{}
offset := 0
err = job.DecodeArgs(col, pos, &offset)
tblInfo, columnInfo, col, _, _, err := checkAddColumn(t, job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo == nil {
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
}

if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}

originalState := columnInfo.State
columnInfo.State = model.StateDeleteOnly
job.SchemaState = model.StateDeleteOnly
Expand All @@ -119,57 +99,29 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
}

func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
tblInfo, colInfo, err := checkDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}

var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}

// StatePublic means when the job is not running yet.
if colInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
} else {
// In the state of drop column `write only -> delete only -> reorganization`,
// We can not rollback now, so just continue to drop column.
job.State = model.JobStateRunning
return ver, errors.Trace(nil)
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errors.Trace(errCancelledDDLJob)
// In the state of drop column `write only -> delete only -> reorganization`,
// We can not rollback now, so just continue to drop column.
job.State = model.JobStateRunning
return ver, nil
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}

var indexName model.CIStr
err = job.DecodeArgs(&indexName)
tblInfo, indexInfo, err := checkDropIndex(t, job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := schemautil.FindIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

originalState := indexInfo.State
switch indexInfo.State {
case model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone:
Expand Down Expand Up @@ -208,6 +160,36 @@ func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return
}

func rollingbackDropTable(t *meta.Meta, job *model.Job) error {
tblInfo, err := checkTableExist(t, job, job.SchemaID)
if err != nil {
return errors.Trace(err)
}
// To simplify the rollback logic, cannot be canceled after job start to run.
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
if tblInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
return errCancelledDDLJob
}
job.State = model.JobStateRunning
return nil
}

func rollingbackDropSchema(t *meta.Meta, job *model.Job) error {
dbInfo, err := checkDropSchema(t, job)
if err != nil {
return errors.Trace(err)
}
// To simplify the rollback logic, cannot be canceled after job start to run.
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
if dbInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
return errCancelledDDLJob
}
job.State = model.JobStateRunning
return nil
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
Expand All @@ -218,8 +200,10 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
ver, err = rollingbackDropColumn(t, job)
case model.ActionDropIndex:
ver, err = rollingbackDropIndex(t, job)
case model.ActionDropTable, model.ActionDropSchema:
job.State = model.JobStateRollingback
case model.ActionDropTable:
err = rollingbackDropTable(t, job)
case model.ActionDropSchema:
err = rollingbackDropSchema(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
28 changes: 13 additions & 15 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,10 @@ func onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
dbInfo, err := t.GetDatabase(job.SchemaID)
dbInfo, err := checkDropSchema(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if dbInfo == nil {
job.State = model.JobStateCancelled
return ver, infoschema.ErrDatabaseDropExists.GenWithStackByArgs("")
}

if job.IsRollingback() {
// To simplify the rollback logic, cannot be canceled after job start to run.
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
if dbInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
}
job.State = model.JobStateRunning
}

ver, err = updateSchemaVersion(t, job)
if err != nil {
Expand Down Expand Up @@ -134,6 +120,18 @@ func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func checkDropSchema(t *meta.Meta, job *model.Job) (*model.DBInfo, error) {
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return nil, errors.Trace(err)
}
if dbInfo == nil {
job.State = model.JobStateCancelled
return nil, infoschema.ErrDatabaseDropExists.GenWithStackByArgs("")
}
return dbInfo, nil
}

func getIDs(tables []*model.TableInfo) []int64 {
ids := make([]int64, 0, len(tables))
for _, t := range tables {
Expand Down
Loading

0 comments on commit 0147e0c

Please sign in to comment.