diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index 7ab9b8e72..17ba41938 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -153,9 +153,9 @@ func genInsert(schema string, table *model.TableInfo, row []byte) (event *pb.Eve func genUpdate(schema string, table *model.TableInfo, row []byte) (event *pb.Event, err error) { columns := writableColumns(table) - colsTypeMap := util.ToColumnTypeMap(columns) + colsMap := util.ToColumnMap(columns) - oldColumnValues, newColumnValues, err := DecodeOldAndNewRow(row, colsTypeMap, time.Local) + oldColumnValues, newColumnValues, err := DecodeOldAndNewRow(row, colsMap, time.Local) if err != nil { return nil, errors.Annotatef(err, "table `%s`.`%s`", schema, table.Name) } diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index 9d9659462..f1dc74073 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -103,7 +103,7 @@ func getDefaultOrZeroValue(col *model.ColumnInfo) types.Datum { // DecodeOldAndNewRow decodes a byte slice into datums with a existing row map. // Row layout: colID1, value1, colID2, value2, ..... -func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) { +func DecodeOldAndNewRow(b []byte, cols map[int64]*model.ColumnInfo, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) { if b == nil { return nil, nil, nil } @@ -111,8 +111,8 @@ func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Loc return nil, nil, nil } - cnt := 0 var ( + cnt int data []byte err error oldRow = make(map[int64]types.Datum, len(cols)) @@ -134,9 +134,9 @@ func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Loc return nil, nil, errors.Trace(err) } id := cid.GetInt64() - ft, ok := cols[id] + col, ok := cols[id] if ok { - v, err := tablecodec.DecodeColumnValue(data, ft, loc) + v, err := tablecodec.DecodeColumnValue(data, &col.FieldType, loc) if err != nil { return nil, nil, errors.Trace(err) } @@ -155,28 +155,53 @@ func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Loc } } - if cnt != len(cols)*2 || len(newRow) != len(oldRow) { + parsedCols := cnt / 2 + isInvalid := len(newRow) != len(oldRow) || (len(cols) != parsedCols && len(cols)-1 != parsedCols) + if isInvalid { return nil, nil, errors.Errorf("row data is corrupted %v", b) } + if parsedCols == len(cols)-1 { + var missingCol *model.ColumnInfo + for colID, col := range cols { + _, inOld := oldRow[colID] + _, inNew := newRow[colID] + if !inOld && !inNew { + missingCol = col + break + } + } + // We can't find a column that's missing in both old and new + if missingCol == nil { + return nil, nil, errors.Errorf("row data is corrupted %v", b) + } + log.Info( + "Fill missing col with default val", + zap.String("name", missingCol.Name.O), + zap.Int64("id", missingCol.ID), + zap.Int("Tp", int(missingCol.FieldType.Tp)), + ) + oldRow[missingCol.ID] = getDefaultOrZeroValue(missingCol) + newRow[missingCol.ID] = getDefaultOrZeroValue(missingCol) + } return oldRow, newRow, nil } type updateDecoder struct { - colsTypes map[int64]*types.FieldType + columns map[int64]*model.ColumnInfo } func newUpdateDecoder(table *model.TableInfo) updateDecoder { columns := writableColumns(table) return updateDecoder{ - colsTypes: util.ToColumnTypeMap(columns), + columns: util.ToColumnMap(columns), } } // decode decodes a byte slice into datums with a existing row map. // Row layout: colID1, value1, colID2, value2, ..... func (ud updateDecoder) decode(b []byte, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) { - return DecodeOldAndNewRow(b, ud.colsTypes, loc) + return DecodeOldAndNewRow(b, ud.columns, loc) } func fixType(data types.Datum, col *model.ColumnInfo) types.Datum { diff --git a/pkg/util/util.go b/pkg/util/util.go index 8cee45eda..6c717e9a9 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -143,6 +143,15 @@ func ToColumnTypeMap(columns []*model.ColumnInfo) map[int64]*types.FieldType { return colTypeMap } +// ToColumnMap return a map index by column id +func ToColumnMap(columns []*model.ColumnInfo) map[int64]*model.ColumnInfo { + colMap := make(map[int64]*model.ColumnInfo, len(columns)) + for _, col := range columns { + colMap[col.ID] = col + } + return colMap +} + // RetryOnError defines a action with retry when fn returns error func RetryOnError(retryCount int, sleepTime time.Duration, errStr string, fn func() error) error { var err error