Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): timezone fill default value should also consider tz. (#10932) #10949

Open
wants to merge 3 commits into
base: release-8.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@
}

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum,
tableInfo *model.TableInfo, datums map[int64]types.Datum, tz *time.Location,
) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) {
cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
Expand All @@ -368,7 +368,7 @@
if exist {
colValue, size, warn, err = formatColVal(colDatum, colInfo)
} else {
colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz)
}
if err != nil {
return nil, nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -504,7 +504,7 @@
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow)
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -536,7 +536,7 @@
currentChecksum uint32
)
if row.RowExist {
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row)
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, m.tz)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -698,7 +698,9 @@
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
// TODO: Check default expr support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, string, error) {
func getDefaultOrZeroValue(
col *timodel.ColumnInfo, tz *time.Location,
) (types.Datum, any, int, string, error) {
var (
d types.Datum
err error
Expand All @@ -715,6 +717,15 @@
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}
switch col.GetType() {
case mysql.TypeTimestamp:
t := d.GetMysqlTime()
err = t.ConvertTimeZone(time.UTC, tz)
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}

Check warning on line 726 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L725-L726

Added lines #L725 - L726 were not covered by tests
d.SetMysqlTime(t)
}
} else if !mysql.HasNotNullFlag(col.GetFlag()) {
// NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx",
// ref: https://github.com/pingcap/ticdc/issues/3929
Expand Down
55 changes: 45 additions & 10 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,48 +873,50 @@ func TestGetDefaultZeroValue(t *testing.T) {
},
}

tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
require.NoError(t, err)
for _, tc := range testCases {
_, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo)
_, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo, tz)
require.Equal(t, tc.Res, val, tc.Name)
}

colInfo := timodel.ColumnInfo{
OriginDefaultValue: "-3.14", // no float
FieldType: *ftTypeNewDecimalNotNull,
}
_, val, _, _, _ := getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ := getDefaultOrZeroValue(&colInfo, tz)
decimal := new(types.MyDecimal)
err := decimal.FromString([]byte("-3.14"))
err = decimal.FromString([]byte("-3.14"))
require.NoError(t, err)
require.Equal(t, decimal.String(), val, "mysql.TypeNewDecimal + notnull + default")

colInfo = timodel.ColumnInfo{
OriginDefaultValue: "2020-11-19 12:12:12",
FieldType: *ftTypeTimestampNotNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expected, err := types.ParseTimeFromFloatString(
types.DefaultStmtNoWarningContext,
"2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
"2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
require.NoError(t, err)
require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + notnull + default")

colInfo = timodel.ColumnInfo{
OriginDefaultValue: "2020-11-19 12:12:12",
FieldType: *ftTypeTimestampNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expected, err = types.ParseTimeFromFloatString(
types.DefaultStmtNoWarningContext,
"2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
"2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
require.NoError(t, err)
require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + null + default")

colInfo = timodel.ColumnInfo{
OriginDefaultValue: "e1",
FieldType: *ftTypeEnumNotNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expectedEnum, err := types.ParseEnumName(colInfo.FieldType.GetElems(), "e1", colInfo.FieldType.GetCollate())
require.NoError(t, err)
require.Equal(t, expectedEnum.Value, val, "mysql.TypeEnum + notnull + default")
Expand All @@ -923,7 +925,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
OriginDefaultValue: "1,e",
FieldType: *ftTypeSetNotNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expectedSet, err := types.ParseSetName(colInfo.FieldType.GetElems(), "1,e", colInfo.FieldType.GetCollate())
require.NoError(t, err)
require.Equal(t, expectedSet.Value, val, "mysql.TypeSet + notnull + default")
Expand Down Expand Up @@ -1102,6 +1104,37 @@ func TestE2ERowLevelChecksum(t *testing.T) {
require.NoError(t, err)
}

func TestTimezoneDefaultValue(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

_ = helper.DDL2Event(`create table test.t(a int primary key)`)
insertEvent := helper.DML2Event(`insert into test.t values (1)`, "test", "t")
require.NotNil(t, insertEvent)

tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t")
require.True(t, ok)

key, oldValue := helper.getLastKeyValue(tableInfo.ID)

_ = helper.DDL2Event(`alter table test.t add column b timestamp default '2023-02-09 13:00:00'`)
ts := helper.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
OldValue: oldValue,
StartTs: ts - 1,
CRTs: ts + 1,
}
polymorphicEvent := model.NewPolymorphicEvent(rawKV)
err := helper.mounter.DecodeEvent(context.Background(), polymorphicEvent)
require.NoError(t, err)

event := polymorphicEvent.Row
require.NotNil(t, event)
require.Equal(t, "2023-02-09 13:00:00", event.PreColumns[1].Value.(string))
}

func TestVerifyChecksumTime(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
Expand Down Expand Up @@ -1565,14 +1598,16 @@ func TestBuildTableInfo(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
require.NoError(t, err)
p := parser.New()
for i, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz)
require.NoError(t, err)
e := model.RowChangedEvent{
TableInfo: cdcTableInfo,
Expand Down
Loading