Skip to content

Commit

Permalink
ddl: remove onDropIndexes (pingcap#35960)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Jul 6, 2022
1 parent 637d00c commit d5898d0
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 331 deletions.
41 changes: 1 addition & 40 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,38 +51,6 @@ import (
"go.uber.org/zap"
)

// adjustColumnInfoInDropColumn is used to set the correct position of column info when dropping column.
// 1. The offset of column should to be set to the last of the columns.
// 2. The dropped column is moved to the end of tblInfo.Columns, due to it was not public any more.
func adjustColumnInfoInDropColumn(tblInfo *model.TableInfo, offset int) {
oldCols := tblInfo.Columns
// Adjust column offset.
offsetChanged := make(map[int]int, len(oldCols)-offset-1)
for i := offset + 1; i < len(oldCols); i++ {
offsetChanged[oldCols[i].Offset] = i - 1
oldCols[i].Offset = i - 1
}
oldCols[offset].Offset = len(oldCols) - 1
// For expression index, we drop hidden columns and index simultaneously.
// So we need to change the offset of expression index.
offsetChanged[offset] = len(oldCols) - 1
// Update index column offset info.
// TODO: There may be some corner cases for index column offsets, we may check this later.
for _, idx := range tblInfo.Indices {
for _, col := range idx.Columns {
newOffset, ok := offsetChanged[col.Offset]
if ok {
col.Offset = newOffset
}
}
}
newCols := make([]*model.ColumnInfo, 0, len(oldCols))
newCols = append(newCols, oldCols[:offset]...)
newCols = append(newCols, oldCols[offset+1:]...)
newCols = append(newCols, oldCols[offset])
tblInfo.Columns = newCols
}

func createColumnInfoWithPosCheck(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) {
// Check column name duplicate.
cols := tblInfo.Columns
Expand Down Expand Up @@ -250,12 +218,6 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
for i := range columnInfos {
columnInfos[i].State = state
}
}

// checkAfterPositionExists makes sure the column specified in AFTER clause is exists.
// For example, ALTER TABLE t ADD COLUMN c3 INT AFTER c1.
func checkAfterPositionExists(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error {
Expand All @@ -275,8 +237,6 @@ func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
}

func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) (err error) {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
// When the dropping column has not-null flag and it hasn't the default value, we can backfill the column value like "add column".
// NOTE: If the state of StateWriteOnly can be rollbacked, we'd better reconsider the original default value.
// And we need consider the column without not-null flag.
Expand Down Expand Up @@ -320,6 +280,7 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
// public -> write only
colInfo.State = model.StateWriteOnly
setIndicesState(idxInfos, model.StateWriteOnly)
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
err = checkDropColumnForStatePublic(tblInfo, colInfo)
if err != nil {
return ver, errors.Trace(err)
Expand Down
10 changes: 0 additions & 10 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,16 +833,6 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
logutil.BgLogger().Info("[ddl] DDL job is failed", zap.Int64("jobID", jobID))
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID))
return nil
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
}
Expand Down
50 changes: 1 addition & 49 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4031,6 +4031,7 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
if err != nil {
return err
}

var multiSchemaInfo *model.MultiSchemaInfo
if variable.EnableChangeMultiSchema.Load() {
multiSchemaInfo = &model.MultiSchemaInfo{}
Expand Down Expand Up @@ -6137,55 +6138,6 @@ func (d *ddl) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
return errors.Trace(err)
}

func (d *ddl) DropIndexes(ctx sessionctx.Context, ti ast.Ident, specs []*ast.AlterTableSpec) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return err
}

if t.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
return errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Indexes"))
}
indexNames := make([]model.CIStr, 0, len(specs))
ifExists := make([]bool, 0, len(specs))
for _, spec := range specs {
var indexName model.CIStr
if spec.Tp == ast.AlterTableDropPrimaryKey {
indexName = model.NewCIStr(mysql.PrimaryKeyName)
} else {
indexName = model.NewCIStr(spec.Name)
}

indexInfo := t.Meta().FindIndexByName(indexName.L)
if indexInfo != nil {
_, err := checkIsDropPrimaryKey(indexName, indexInfo, t)
if err != nil {
return err
}
if err := checkDropIndexOnAutoIncrementColumn(t.Meta(), indexInfo); err != nil {
return errors.Trace(err)
}
}

indexNames = append(indexNames, indexName)
ifExists = append(ifExists, spec.IfExists)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionDropIndexes,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{indexNames, ifExists},
}

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

func checkIsDropPrimaryKey(indexName model.CIStr, indexInfo *model.IndexInfo, t table.Table) (bool, error) {
var isPK bool
if indexName.L == strings.ToLower(mysql.PrimaryKeyName) &&
Expand Down
1 change: 0 additions & 1 deletion ddl/ddl_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func TestIsJobRollbackable(t *testing.T) {
{model.ActionDropIndex, model.StateDeleteOnly, false},
{model.ActionDropSchema, model.StateDeleteOnly, false},
{model.ActionDropColumn, model.StateDeleteOnly, false},
{model.ActionDropIndexes, model.StateDeleteOnly, false},
}
job := &model.Job{}
for _, ca := range cases {
Expand Down
4 changes: 1 addition & 3 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func jobNeedGC(job *model.Job) bool {
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
return true
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn, model.ActionDropIndexes:
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn:
return true
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand Down Expand Up @@ -933,8 +933,6 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = w.onCreateIndex(d, t, job, true)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
ver, err = onDropIndex(d, t, job)
case model.ActionDropIndexes:
ver, err = onDropIndexes(d, t, job)
case model.ActionRenameIndex:
ver, err = onRenameIndex(d, t, job)
case model.ActionAddForeignKey:
Expand Down
18 changes: 0 additions & 18 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,24 +368,6 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
elemID := ea.allocForIndexID(tableID, indexID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID))
}
case model.ActionDropIndexes:
var indexIDs []int64
var partitionIDs []int64
if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil {
return errors.Trace(err)
}
// Remove data in TiKV.
if len(indexIDs) == 0 {
return nil
}
if len(partitionIDs) == 0 {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, ea)
}
for _, pID := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now, ea); err != nil {
return errors.Trace(err)
}
}
case model.ActionDropColumn:
var colName model.CIStr
var ifExists bool
Expand Down
159 changes: 0 additions & 159 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,165 +784,6 @@ func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Inde
return tblInfo, indexInfo, false, nil
}

func onDropIndexes(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, indexNames, ifExists, err := getSchemaInfos(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable {
return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Indexes"))
}

indexInfos, err := checkDropIndexes(tblInfo, job, indexNames, ifExists)
if err != nil {
return ver, errors.Trace(err)
}

if len(indexInfos) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}

dependentHiddenCols := make([]*model.ColumnInfo, 0)
for _, indexInfo := range indexInfos {
for _, indexColumn := range indexInfo.Columns {
if tblInfo.Columns[indexColumn.Offset].Hidden {
dependentHiddenCols = append(dependentHiddenCols, tblInfo.Columns[indexColumn.Offset])
}
}
}

originalState := indexInfos[0].State
switch indexInfos[0].State {
case model.StatePublic:
// public -> write only
setIndicesState(indexInfos, model.StateWriteOnly)
setColumnsState(dependentHiddenCols, model.StateWriteOnly)
for _, colInfo := range dependentHiddenCols {
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
}
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> delete only
setIndicesState(indexInfos, model.StateDeleteOnly)
setColumnsState(dependentHiddenCols, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
setIndicesState(indexInfos, model.StateDeleteReorganization)
setColumnsState(dependentHiddenCols, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
indexIDs := make([]int64, 0, len(indexInfos))
indexNames := make(map[string]bool, len(indexInfos))
for _, indexInfo := range indexInfos {
indexNames[indexInfo.Name.L] = true
indexIDs = append(indexIDs, indexInfo.ID)
}

newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if _, ok := indexNames[idx.Name.L]; !ok {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices

// Set column index flag.
for _, indexInfo := range indexInfos {
dropIndexColumnFlag(tblInfo, indexInfo)
}

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]

ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != model.StateNone)
if err != nil {
return ver, errors.Trace(err)
}

job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.Args = append(job.Args, indexIDs, getPartitionIDs(tblInfo))
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", indexInfos[0].State)
}

return ver, errors.Trace(err)
}

func getSchemaInfos(t *meta.Meta, job *model.Job) (*model.TableInfo, []model.CIStr, []bool, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

var indexNames []model.CIStr
var ifExists []bool
if err = job.DecodeArgs(&indexNames, &ifExists); err != nil {
return nil, nil, nil, errors.Trace(err)
}

return tblInfo, indexNames, ifExists, nil
}

func checkDropIndexes(tblInfo *model.TableInfo, job *model.Job, indexNames []model.CIStr, ifExists []bool) ([]*model.IndexInfo, error) {
var warnings []*errors.Error
indexInfos := make([]*model.IndexInfo, 0, len(indexNames))
UniqueIndexNames := make(map[model.CIStr]bool, len(indexNames))
for i, indexName := range indexNames {
// Double check the index is exists.
indexInfo := tblInfo.FindIndexByName(indexName.L)
if indexInfo == nil {
if ifExists[i] {
warnings = append(warnings, toTError(dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)))
continue
}
job.State = model.JobStateCancelled
return nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}

// Double check for drop index on auto_increment column.
if err := checkDropIndexOnAutoIncrementColumn(tblInfo, indexInfo); err != nil {
job.State = model.JobStateCancelled
return nil, autoid.ErrWrongAutoKey
}

// Check for dropping duplicate indexes.
if UniqueIndexNames[indexName] {
if !ifExists[i] {
job.State = model.JobStateCancelled
return nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}
warnings = append(warnings, toTError(dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)))
}
UniqueIndexNames[indexName] = true

indexInfos = append(indexInfos, indexInfo)
}

// Check that drop primary index will not cause invisible implicit primary index.
if err := checkInvisibleIndexesOnPK(tblInfo, indexInfos, job); err != nil {
return nil, errors.Trace(err)
}

job.MultiSchemaInfo = &model.MultiSchemaInfo{Warnings: warnings}

return indexInfos, nil
}

func checkInvisibleIndexesOnPK(tblInfo *model.TableInfo, indexInfos []*model.IndexInfo, job *model.Job) error {
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, oidx := range tblInfo.Indices {
Expand Down
Loading

0 comments on commit d5898d0

Please sign in to comment.