Skip to content

Commit

Permalink
Avoid error when a column is missing because of drop column
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku committed Nov 14, 2019
1 parent 98ea488 commit df35218
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 10 deletions.
4 changes: 2 additions & 2 deletions drainer/translator/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func genInsert(schema string, table *model.TableInfo, row []byte) (event *pb.Eve

func genUpdate(schema string, table *model.TableInfo, row []byte) (event *pb.Event, err error) {
columns := writableColumns(table)
colsTypeMap := util.ToColumnTypeMap(columns)
colsMap := util.ToColumnMap(columns)

oldColumnValues, newColumnValues, err := DecodeOldAndNewRow(row, colsTypeMap, time.Local)
oldColumnValues, newColumnValues, err := DecodeOldAndNewRow(row, colsMap, time.Local)
if err != nil {
return nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name)
}
Expand Down
41 changes: 33 additions & 8 deletions drainer/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,16 @@ func getDefaultOrZeroValue(col *model.ColumnInfo) types.Datum {

// DecodeOldAndNewRow decodes a byte slice into datums with a existing row map.
// Row layout: colID1, value1, colID2, value2, .....
func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) {
func DecodeOldAndNewRow(b []byte, cols map[int64]*model.ColumnInfo, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) {
if b == nil {
return nil, nil, nil
}
if b[0] == codec.NilFlag {
return nil, nil, nil
}

cnt := 0
var (
cnt int
data []byte
err error
oldRow = make(map[int64]types.Datum, len(cols))
Expand All @@ -134,9 +134,9 @@ func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Loc
return nil, nil, errors.Trace(err)
}
id := cid.GetInt64()
ft, ok := cols[id]
col, ok := cols[id]
if ok {
v, err := tablecodec.DecodeColumnValue(data, ft, loc)
v, err := tablecodec.DecodeColumnValue(data, &col.FieldType, loc)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand All @@ -155,28 +155,53 @@ func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Loc
}
}

if cnt != len(cols)*2 || len(newRow) != len(oldRow) {
parsedCols := cnt / 2
isInvalid := len(newRow) != len(oldRow) || (len(cols) != parsedCols && len(cols)-1 != parsedCols)
if isInvalid {
return nil, nil, errors.Errorf("row data is corrupted %v", b)
}
if parsedCols == len(cols)-1 {
var missingCol *model.ColumnInfo
for colID, col := range cols {
_, inOld := oldRow[colID]
_, inNew := newRow[colID]
if !inOld && !inNew {
missingCol = col
break
}
}
// We can't find a column that's missing in both old and new
if missingCol == nil {
return nil, nil, errors.Errorf("row data is corrupted %v", b)
}
log.Info(
"Fill missing col with default val",
zap.String("name", missingCol.Name.O),
zap.Int64("id", missingCol.ID),
zap.Int("Tp", int(missingCol.FieldType.Tp)),
)
oldRow[missingCol.ID] = getDefaultOrZeroValue(missingCol)
newRow[missingCol.ID] = getDefaultOrZeroValue(missingCol)
}

return oldRow, newRow, nil
}

type updateDecoder struct {
colsTypes map[int64]*types.FieldType
columns map[int64]*model.ColumnInfo
}

func newUpdateDecoder(table *model.TableInfo) updateDecoder {
columns := writableColumns(table)
return updateDecoder{
colsTypes: util.ToColumnTypeMap(columns),
columns: util.ToColumnMap(columns),
}
}

// decode decodes a byte slice into datums with a existing row map.
// Row layout: colID1, value1, colID2, value2, .....
func (ud updateDecoder) decode(b []byte, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) {
return DecodeOldAndNewRow(b, ud.colsTypes, loc)
return DecodeOldAndNewRow(b, ud.columns, loc)
}

func fixType(data types.Datum, col *model.ColumnInfo) types.Datum {
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ func ToColumnTypeMap(columns []*model.ColumnInfo) map[int64]*types.FieldType {
return colTypeMap
}

// ToColumnMap return a map index by column id
func ToColumnMap(columns []*model.ColumnInfo) map[int64]*model.ColumnInfo {
colMap := make(map[int64]*model.ColumnInfo, len(columns))
for _, col := range columns {
colMap[col.ID] = col
}
return colMap
}

// RetryOnError defines a action with retry when fn returns error
func RetryOnError(retryCount int, sleepTime time.Duration, errStr string, fn func() error) error {
var err error
Expand Down

0 comments on commit df35218

Please sign in to comment.