From 89920c066a5258f0da22beb42852ecb2894a9c98 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 23 Apr 2024 12:27:10 +0800 Subject: [PATCH] This is an automated cherry-pick of #10932 Signed-off-by: ti-chi-bot --- cdc/entry/mounter.go | 32 +++ cdc/entry/mounter_test.go | 449 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 481 insertions(+) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 4a7d0f8320a..b1eb82c93bf 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -279,8 +279,13 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { func datum2Column( tableInfo *model.TableInfo, datums map[int64]types.Datum, tz *time.Location, +<<<<<<< HEAD ) ([]*model.Column, []types.Datum, []rowcodec.ColInfo, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) +======= +) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) { + cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset)) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) // columnInfos and rowColumnInfos hold different column metadata, @@ -307,7 +312,11 @@ func datum2Column( if exist { colValue, size, warn, err = formatColVal(colDatums, colInfo) } else { +<<<<<<< HEAD colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz) +======= + colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) } if err != nil { return nil, nil, nil, errors.Trace(err) @@ -349,7 +358,11 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info +<<<<<<< HEAD preCols, preRawCols, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz) +======= + preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -358,7 +371,11 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d var cols []*model.Column var rawCols []types.Datum if row.RowExist { +<<<<<<< HEAD cols, rawCols, extendColumnInfos, err = datum2Column(tableInfo, row.Row, m.tz) +======= + cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, m.tz) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -494,7 +511,13 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) ( // https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 // Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support // TODO: Check default expr support +<<<<<<< HEAD func getDefaultOrZeroValue(col *timodel.ColumnInfo, tz *time.Location) (types.Datum, any, int, string, error) { +======= +func getDefaultOrZeroValue( + col *timodel.ColumnInfo, tz *time.Location, +) (types.Datum, any, int, string, error) { +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) var ( d types.Datum err error @@ -513,6 +536,15 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo, tz *time.Location) (types.Da if err != nil { return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err) } + switch col.GetType() { + case mysql.TypeTimestamp: + t := d.GetMysqlTime() + err = t.ConvertTimeZone(time.UTC, tz) + if err != nil { + return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err) + } + d.SetMysqlTime(t) + } } else if !mysql.HasNotNullFlag(col.GetFlag()) { // NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx", // ref: https://github.com/pingcap/ticdc/issues/3929 diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index c446a1b4e30..fec0353773d 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -871,11 +871,17 @@ func TestGetDefaultZeroValue(t *testing.T) { FieldType: *ftTypeTimestampNotNull, } _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) +<<<<<<< HEAD sc := new(stmtctx.StatementContext) sc.TimeZone = tz expected, err := types.ParseTimeFromFloatString( sc, "2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) +======= + expected, err := types.ParseTimeFromFloatString( + types.DefaultStmtNoWarningContext, + "2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + notnull + default") @@ -885,8 +891,13 @@ func TestGetDefaultZeroValue(t *testing.T) { } _, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz) expected, err = types.ParseTimeFromFloatString( +<<<<<<< HEAD sc, "2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) +======= + types.DefaultStmtNoWarningContext, + "2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal()) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + null + default") @@ -909,6 +920,440 @@ func TestGetDefaultZeroValue(t *testing.T) { require.Equal(t, expectedSet.Value, val, "mysql.TypeSet + notnull + default") } +<<<<<<< HEAD +======= +func TestE2ERowLevelChecksum(t *testing.T) { + helper := NewSchemaTestHelper(t) + defer helper.Close() + + tk := helper.Tk() + // upstream TiDB enable checksum functionality + tk.MustExec("set global tidb_enable_row_level_checksum = 1") + helper.Tk().MustExec("use test") + + // changefeed enable checksum functionality + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness + filter, err := filter.NewFilter(replicaConfig, "") + require.NoError(t, err) + + ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) + require.NoError(t, err) + + changefeed := model.DefaultChangeFeedID("changefeed-test-decode-row") + schemaStorage, err := NewSchemaStorage(helper.Storage(), + ver.Ver, false, changefeed, util.RoleTester, filter) + require.NoError(t, err) + require.NotNil(t, schemaStorage) + + createTableSQL := `create table t ( + id int primary key auto_increment, + + c_tinyint tinyint null, + c_smallint smallint null, + c_mediumint mediumint null, + c_int int null, + c_bigint bigint null, + + c_unsigned_tinyint tinyint unsigned null, + c_unsigned_smallint smallint unsigned null, + c_unsigned_mediumint mediumint unsigned null, + c_unsigned_int int unsigned null, + c_unsigned_bigint bigint unsigned null, + + c_float float null, + c_double double null, + c_decimal decimal null, + c_decimal_2 decimal(10, 4) null, + + c_unsigned_float float unsigned null, + c_unsigned_double double unsigned null, + c_unsigned_decimal decimal unsigned null, + c_unsigned_decimal_2 decimal(10, 4) unsigned null, + + c_date date null, + c_datetime datetime null, + c_timestamp timestamp null, + c_time time null, + c_year year null, + + c_tinytext tinytext null, + c_text text null, + c_mediumtext mediumtext null, + c_longtext longtext null, + + c_tinyblob tinyblob null, + c_blob blob null, + c_mediumblob mediumblob null, + c_longblob longblob null, + + c_char char(16) null, + c_varchar varchar(16) null, + c_binary binary(16) null, + c_varbinary varbinary(16) null, + + c_enum enum ('a','b','c') null, + c_set set ('a','b','c') null, + c_bit bit(64) null, + c_json json null, + +-- gbk dmls + name varchar(128) CHARACTER SET gbk, + country char(32) CHARACTER SET gbk, + city varchar(64), + description text CHARACTER SET gbk, + image tinyblob +);` + job := helper.DDL2Job(createTableSQL) + err = schemaStorage.HandleDDLJob(job) + require.NoError(t, err) + + ts := schemaStorage.GetLastSnapshot().CurrentTs() + schemaStorage.AdvanceResolvedTs(ver.Ver) + + mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, replicaConfig.Integrity).(*mounter) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "t") + require.True(t, ok) + + tk.Session().GetSessionVars().EnableRowLevelChecksum = true + + insertDataSQL := `insert into t values ( + 2, + 1, 2, 3, 4, 5, + 1, 2, 3, 4, 5, + 2020.0202, 2020.0303, 2020.0404, 2021.1208, + 3.1415, 2.7182, 8000, 179394.233, + '2020-02-20', '2020-02-20 02:20:20', '2020-02-20 02:20:20', '02:20:20', '2020', + '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', '89504E470D0A1A0A', + x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + '89504E470D0A1A0A', '89504E470D0A1A0A', x'89504E470D0A1A0A', x'89504E470D0A1A0A', + 'b', 'b,c', b'1000001', '{ +"key1": "value1", +"key2": "value2", +"key3": "123" +}', + '测试', "中国", "上海", "你好,世界", 0xC4E3BAC3CAC0BDE7 +);` + tk.MustExec(insertDataSQL) + + key, value := getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID) + rawKV := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + Value: value, + StartTs: ts - 1, + CRTs: ts + 1, + } + row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV) + require.NoError(t, err) + require.NotNil(t, row) + require.NotNil(t, row.Checksum) + + expected, ok := mounter.decoder.GetChecksum() + require.True(t, ok) + require.Equal(t, expected, row.Checksum.Current) + require.False(t, row.Checksum.Corrupted) + + // avro encoder enable checksum functionality. + codecConfig := codecCommon.NewConfig(config.ProtocolAvro) + codecConfig.EnableTiDBExtension = true + codecConfig.EnableRowChecksum = true + codecConfig.AvroDecimalHandlingMode = "string" + codecConfig.AvroBigintUnsignedHandlingMode = "string" + + avroEncoder, err := avro.SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig) + defer avro.TeardownEncoderAndSchemaRegistry4Testing() + require.NoError(t, err) + + topic := "test.t" + + err = avroEncoder.AppendRowChangedEvent(ctx, topic, row, func() {}) + require.NoError(t, err) + msg := avroEncoder.Build() + require.Len(t, msg, 1) + + schemaM, err := avro.NewConfluentSchemaManager( + ctx, "http://127.0.0.1:8081", nil) + require.NoError(t, err) + + // decoder enable checksum functionality. + decoder := avro.NewDecoder(codecConfig, schemaM, topic) + err = decoder.AddKeyValue(msg[0].Key, msg[0].Value) + require.NoError(t, err) + + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, messageType) + + row, err = decoder.NextRowChangedEvent() + // no error, checksum verification passed. + require.NoError(t, err) +} + +func TestTimezoneDefaultValue(t *testing.T) { + helper := NewSchemaTestHelper(t) + defer helper.Close() + + _ = helper.DDL2Event(`create table test.t(a int primary key)`) + insertEvent := helper.DML2Event(`insert into test.t values (1)`, "test", "t") + require.NotNil(t, insertEvent) + + tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t") + require.True(t, ok) + + key, oldValue := helper.getLastKeyValue(tableInfo.ID) + + _ = helper.DDL2Event(`alter table test.t add column b timestamp default '2023-02-09 13:00:00'`) + ts := helper.schemaStorage.GetLastSnapshot().CurrentTs() + rawKV := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + OldValue: oldValue, + StartTs: ts - 1, + CRTs: ts + 1, + } + polymorphicEvent := model.NewPolymorphicEvent(rawKV) + err := helper.mounter.DecodeEvent(context.Background(), polymorphicEvent) + require.NoError(t, err) + + event := polymorphicEvent.Row + require.NotNil(t, event) + require.Equal(t, "2023-02-09 13:00:00", event.PreColumns[1].Value.(string)) +} + +func TestVerifyChecksumTime(t *testing.T) { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness + replicaConfig.Integrity.CorruptionHandleLevel = integrity.CorruptionHandleLevelError + + helper := NewSchemaTestHelperWithReplicaConfig(t, replicaConfig) + defer helper.Close() + + helper.Tk().MustExec("set global tidb_enable_row_level_checksum = 1") + helper.Tk().MustExec("use test") + + helper.Tk().MustExec("set global time_zone = '-5:00'") + _ = helper.DDL2Event(`CREATE table TBL2 (a int primary key, b TIMESTAMP)`) + event := helper.DML2Event(`INSERT INTO TBL2 VALUES (1, '2023-02-09 13:00:00')`, "test", "TBL2") + require.NotNil(t, event) + + _ = helper.DDL2Event("create table t (a timestamp primary key, b int)") + event = helper.DML2Event("insert into t values ('2023-02-09 13:00:00', 1)", "test", "t") + require.NotNil(t, event) +} + +func TestDecodeRowEnableChecksum(t *testing.T) { + helper := NewSchemaTestHelper(t) + defer helper.Close() + + tk := helper.Tk() + + tk.MustExec("set global tidb_enable_row_level_checksum = 1") + helper.Tk().MustExec("use test") + + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness + filter, err := filter.NewFilter(replicaConfig, "") + require.NoError(t, err) + + ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) + require.NoError(t, err) + + changefeed := model.DefaultChangeFeedID("changefeed-test-decode-row") + schemaStorage, err := NewSchemaStorage(helper.Storage(), + ver.Ver, false, changefeed, util.RoleTester, filter) + require.NoError(t, err) + require.NotNil(t, schemaStorage) + + createTableDDL := "create table t (id int primary key, a int)" + job := helper.DDL2Job(createTableDDL) + err = schemaStorage.HandleDDLJob(job) + require.NoError(t, err) + + ts := schemaStorage.GetLastSnapshot().CurrentTs() + schemaStorage.AdvanceResolvedTs(ver.Ver) + + mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, replicaConfig.Integrity).(*mounter) + + ctx := context.Background() + + tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "t") + require.True(t, ok) + + // row without checksum + tk.Session().GetSessionVars().EnableRowLevelChecksum = false + tk.MustExec("insert into t values (1, 10)") + + key, value := getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID) + rawKV := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + Value: value, + StartTs: ts - 1, + CRTs: ts + 1, + } + + row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV) + require.NoError(t, err) + require.NotNil(t, row) + // the upstream tidb does not enable checksum, so the checksum is nil + require.Nil(t, row.Checksum) + + // row with one checksum + tk.Session().GetSessionVars().EnableRowLevelChecksum = true + tk.MustExec("insert into t values (2, 20)") + + key, value = getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID) + rawKV = &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + Value: value, + StartTs: ts - 1, + CRTs: ts + 1, + } + row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV) + require.NoError(t, err) + require.NotNil(t, row) + require.NotNil(t, row.Checksum) + + expected, ok := mounter.decoder.GetChecksum() + require.True(t, ok) + require.Equal(t, expected, row.Checksum.Current) + require.False(t, row.Checksum.Corrupted) + + // row with 2 checksum + tk.MustExec("insert into t values (3, 30)") + job = helper.DDL2Job("alter table t change column a a varchar(10)") + err = schemaStorage.HandleDDLJob(job) + require.NoError(t, err) + + key, value = getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID) + rawKV = &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + Value: value, + StartTs: ts - 1, + CRTs: ts + 1, + } + row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV) + require.NoError(t, err) + require.NotNil(t, row) + require.NotNil(t, row.Checksum) + + first, ok := mounter.decoder.GetChecksum() + require.True(t, ok) + + extra, ok := mounter.decoder.GetExtraChecksum() + require.True(t, ok) + + if row.Checksum.Current != first { + require.Equal(t, extra, row.Checksum.Current) + } else { + require.Equal(t, first, row.Checksum.Current) + } + require.False(t, row.Checksum.Corrupted) + + // hack the table info to make the checksum corrupted + tableInfo.Columns[0].FieldType = *types.NewFieldType(mysql.TypeVarchar) + + // corrupt-handle-level default to warn, so no error, but the checksum is corrupted + row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV) + require.NoError(t, err) + require.NotNil(t, row.Checksum) + require.True(t, row.Checksum.Corrupted) + + mounter.integrity.CorruptionHandleLevel = integrity.CorruptionHandleLevelError + _, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV) + require.Error(t, err) + require.ErrorIs(t, err, cerror.ErrCorruptedDataMutation) + + job = helper.DDL2Job("drop table t") + err = schemaStorage.HandleDDLJob(job) + require.NoError(t, err) +} + +func TestDecodeRow(t *testing.T) { + helper := NewSchemaTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("set @@tidb_enable_clustered_index=1;") + helper.Tk().MustExec("use test;") + + changefeed := model.DefaultChangeFeedID("changefeed-test-decode-row") + + ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) + require.NoError(t, err) + + cfg := config.GetDefaultReplicaConfig() + + filter, err := filter.NewFilter(cfg, "") + require.NoError(t, err) + + schemaStorage, err := NewSchemaStorage(helper.Storage(), + ver.Ver, false, changefeed, util.RoleTester, filter) + require.NoError(t, err) + + // apply ddl to schemaStorage + ddl := "create table test.student(id int primary key, name char(50), age int, gender char(10))" + job := helper.DDL2Job(ddl) + err = schemaStorage.HandleDDLJob(job) + require.NoError(t, err) + + ts := schemaStorage.GetLastSnapshot().CurrentTs() + + schemaStorage.AdvanceResolvedTs(ver.Ver) + + mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, cfg.Integrity).(*mounter) + + helper.Tk().MustExec(`insert into student values(1, "dongmen", 20, "male")`) + helper.Tk().MustExec(`update student set age = 27 where id = 1`) + + ctx := context.Background() + decodeAndCheckRowInTable := func(tableID int64, f func(key []byte, value []byte) *model.RawKVEntry) { + walkTableSpanInStore(t, helper.Storage(), tableID, func(key []byte, value []byte) { + rawKV := f(key, value) + + row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV) + require.NoError(t, err) + require.NotNil(t, row) + + if row.Columns != nil { + require.NotNil(t, mounter.decoder) + } + + if row.PreColumns != nil { + require.NotNil(t, mounter.preDecoder) + } + }) + } + + toRawKV := func(key []byte, value []byte) *model.RawKVEntry { + return &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + Value: value, + StartTs: ts - 1, + CRTs: ts + 1, + } + } + + tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "student") + require.True(t, ok) + + decodeAndCheckRowInTable(tableInfo.ID, toRawKV) + decodeAndCheckRowInTable(tableInfo.ID, toRawKV) + + job = helper.DDL2Job("drop table student") + err = schemaStorage.HandleDDLJob(job) + require.NoError(t, err) +} + +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) // TestDecodeEventIgnoreRow tests a PolymorphicEvent.Row is nil // if this event should be filter out by filter. func TestDecodeEventIgnoreRow(t *testing.T) { @@ -1154,7 +1599,11 @@ func TestBuildTableInfo(t *testing.T) { originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) require.NoError(t, err) cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) +<<<<<<< HEAD cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz) +======= + colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz) +>>>>>>> e61d080e34 (mounter(ticdc): timezone fill default value should also consider tz. (#10932)) require.NoError(t, err) recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)