Skip to content

Commit

Permalink
delete actionAddColumns and actionDropColumns (pingcap#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 authored Mar 17, 2022
1 parent aadf31f commit c897a72
Show file tree
Hide file tree
Showing 16 changed files with 36 additions and 903 deletions.
271 changes: 0 additions & 271 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,283 +237,12 @@ func locateOffsetForColumn(pos *ast.ColumnPosition, tblInfo *model.TableInfo) (o
}
}

func checkAddColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, []*model.ColumnInfo, []*ast.ColumnPosition, []int, []bool, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}
columns := []*model.ColumnInfo{}
positions := []*ast.ColumnPosition{}
offsets := []int{}
ifNotExists := []bool{}
err = job.DecodeArgs(&columns, &positions, &offsets, &ifNotExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}

columnInfos := make([]*model.ColumnInfo, 0, len(columns))
newColumns := make([]*model.ColumnInfo, 0, len(columns))
newPositions := make([]*ast.ColumnPosition, 0, len(columns))
newOffsets := make([]int, 0, len(columns))
newIfNotExists := make([]bool, 0, len(columns))
for i, col := range columns {
columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
if ifNotExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn("[ddl] check add columns, duplicate column", zap.Stringer("col", col.Name))
continue
}
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
columnInfos = append(columnInfos, columnInfo)
}
newColumns = append(newColumns, columns[i])
newPositions = append(newPositions, positions[i])
newOffsets = append(newOffsets, offsets[i])
newIfNotExists = append(newIfNotExists, ifNotExists[i])
}
return tblInfo, columnInfos, newColumns, newPositions, newOffsets, newIfNotExists, nil
}

func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
for i := range columnInfos {
columnInfos[i].State = state
}
}

func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
for _, indexInfo := range indexInfos {
indexInfo.State = state
}
}

func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
})

tblInfo, columnInfos, columns, positions, offsets, ifNotExists, err := checkAddColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(columnInfos) == 0 {
if len(columns) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}
for i := range columns {
columnInfo, pos, err := createColumnInfo(tblInfo, columns[i], positions[i])
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] run add columns job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo))
positions[i] = pos
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
columnInfos = append(columnInfos, columnInfo)
}
// Set arg to job.
job.Args = []interface{}{columnInfos, positions, offsets, ifNotExists}
}

originalState := columnInfos[0].State
switch columnInfos[0].State {
case model.StateNone:
// none -> delete only
setColumnsState(columnInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> write only
setColumnsState(columnInfos, model.StateWriteOnly)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> reorganization
setColumnsState(columnInfos, model.StateWriteReorganization)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offsets.
for i, newCol := range tblInfo.Columns[len(tblInfo.Columns)-len(positions):] {
offset, err := locateOffsetForColumn(positions[i], tblInfo)
if err != nil {
return ver, errors.Trace(err)
}
if offset != -1 {
tblInfo.MoveColumnInfo(newCol.Offset, offset)
}
}
setColumnsState(columnInfos, model.StatePublic)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &ddlutil.Event{Tp: model.ActionAddColumns, TableInfo: tblInfo, ColumnInfos: columnInfos})
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfos[0].State)
}

return ver, errors.Trace(err)
}

func onDropColumns(t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfos, delCount, idxInfos, err := checkDropColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(colInfos) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}

originalState := colInfos[0].State
switch colInfos[0].State {
case model.StatePublic:
// public -> write only
setColumnsState(colInfos, model.StateWriteOnly)
setIndicesState(idxInfos, model.StateWriteOnly)
for _, colInfo := range colInfos {
err = checkDropColumnForStatePublic(tblInfo, colInfo)
if err != nil {
return ver, errors.Trace(err)
}
}
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> delete only
setColumnsState(colInfos, model.StateDeleteOnly)
if len(idxInfos) > 0 {
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if !indexInfoContains(idx.ID, idxInfos) {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices
}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.Args = append(job.Args, indexInfosToIDList(idxInfos))
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
setColumnsState(colInfos, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-delCount]
setColumnsState(colInfos, model.StateNone)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.Args = append(job.Args, getPartitionIDs(tblInfo))
}
default:
err = dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State)
}
return ver, errors.Trace(err)
}

func checkDropColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, int, []*model.IndexInfo, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, 0, nil, errors.Trace(err)
}

var colNames []model.CIStr
var ifExists []bool
// indexIds is used to make sure we don't truncate args when decoding the rawArgs.
var indexIds []int64
err = job.DecodeArgs(&colNames, &ifExists, &indexIds)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, nil, errors.Trace(err)
}

newColNames := make([]model.CIStr, 0, len(colNames))
colInfos := make([]*model.ColumnInfo, 0, len(colNames))
newIfExists := make([]bool, 0, len(colNames))
indexInfos := make([]*model.IndexInfo, 0)
for i, colName := range colNames {
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil || colInfo.Hidden {
if ifExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn(fmt.Sprintf("column %s doesn't exist", colName))
continue
}
job.State = model.JobStateCancelled
return nil, nil, 0, nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(job.MultiSchemaInfo != nil, tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, nil, errors.Trace(err)
}
newColNames = append(newColNames, colName)
newIfExists = append(newIfExists, ifExists[i])
colInfos = append(colInfos, colInfo)
idxInfos := listIndicesWithColumn(colName.L, tblInfo.Indices)
indexInfos = append(indexInfos, idxInfos...)
}
job.Args = []interface{}{newColNames, newIfExists}
if len(indexIds) > 0 {
job.Args = append(job.Args, indexIds)
}
return tblInfo, colInfos, len(colInfos), indexInfos, nil
}

func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) (err error) {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
Expand Down
Loading

0 comments on commit c897a72

Please sign in to comment.