Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-8.1' into cherry-pick-…
Browse files Browse the repository at this point in the history
…10915-to-release-8.1
  • Loading branch information
sdojjy committed Apr 28, 2024
2 parents 24ac330 + d9b74fe commit 02841dd
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 95 deletions.
21 changes: 16 additions & 5 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
}

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum,
tableInfo *model.TableInfo, datums map[int64]types.Datum, tz *time.Location,
) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) {
cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
Expand All @@ -368,7 +368,7 @@ func datum2Column(
if exist {
colValue, size, warn, err = formatColVal(colDatum, colInfo)
} else {
colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz)
}
if err != nil {
return nil, nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -504,7 +504,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow)
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
currentChecksum uint32
)
if row.RowExist {
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row)
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row, m.tz)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -698,7 +698,9 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) (
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
// TODO: Check default expr support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, string, error) {
func getDefaultOrZeroValue(
col *timodel.ColumnInfo, tz *time.Location,
) (types.Datum, any, int, string, error) {
var (
d types.Datum
err error
Expand All @@ -715,6 +717,15 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, stri
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}
switch col.GetType() {
case mysql.TypeTimestamp:
t := d.GetMysqlTime()
err = t.ConvertTimeZone(time.UTC, tz)
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}
d.SetMysqlTime(t)
}
} else if !mysql.HasNotNullFlag(col.GetFlag()) {
// NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx",
// ref: https://github.com/pingcap/ticdc/issues/3929
Expand Down
55 changes: 45 additions & 10 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,48 +873,50 @@ func TestGetDefaultZeroValue(t *testing.T) {
},
}

tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
require.NoError(t, err)
for _, tc := range testCases {
_, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo)
_, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo, tz)
require.Equal(t, tc.Res, val, tc.Name)
}

colInfo := timodel.ColumnInfo{
OriginDefaultValue: "-3.14", // no float
FieldType: *ftTypeNewDecimalNotNull,
}
_, val, _, _, _ := getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ := getDefaultOrZeroValue(&colInfo, tz)
decimal := new(types.MyDecimal)
err := decimal.FromString([]byte("-3.14"))
err = decimal.FromString([]byte("-3.14"))
require.NoError(t, err)
require.Equal(t, decimal.String(), val, "mysql.TypeNewDecimal + notnull + default")

colInfo = timodel.ColumnInfo{
OriginDefaultValue: "2020-11-19 12:12:12",
FieldType: *ftTypeTimestampNotNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expected, err := types.ParseTimeFromFloatString(
types.DefaultStmtNoWarningContext,
"2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
"2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
require.NoError(t, err)
require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + notnull + default")

colInfo = timodel.ColumnInfo{
OriginDefaultValue: "2020-11-19 12:12:12",
FieldType: *ftTypeTimestampNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expected, err = types.ParseTimeFromFloatString(
types.DefaultStmtNoWarningContext,
"2020-11-19 12:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
"2020-11-19 20:12:12", colInfo.FieldType.GetType(), colInfo.FieldType.GetDecimal())
require.NoError(t, err)
require.Equal(t, expected.String(), val, "mysql.TypeTimestamp + null + default")

colInfo = timodel.ColumnInfo{
OriginDefaultValue: "e1",
FieldType: *ftTypeEnumNotNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expectedEnum, err := types.ParseEnumName(colInfo.FieldType.GetElems(), "e1", colInfo.FieldType.GetCollate())
require.NoError(t, err)
require.Equal(t, expectedEnum.Value, val, "mysql.TypeEnum + notnull + default")
Expand All @@ -923,7 +925,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
OriginDefaultValue: "1,e",
FieldType: *ftTypeSetNotNull,
}
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo)
_, val, _, _, _ = getDefaultOrZeroValue(&colInfo, tz)
expectedSet, err := types.ParseSetName(colInfo.FieldType.GetElems(), "1,e", colInfo.FieldType.GetCollate())
require.NoError(t, err)
require.Equal(t, expectedSet.Value, val, "mysql.TypeSet + notnull + default")
Expand Down Expand Up @@ -1102,6 +1104,37 @@ func TestE2ERowLevelChecksum(t *testing.T) {
require.NoError(t, err)
}

func TestTimezoneDefaultValue(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

_ = helper.DDL2Event(`create table test.t(a int primary key)`)
insertEvent := helper.DML2Event(`insert into test.t values (1)`, "test", "t")
require.NotNil(t, insertEvent)

tableInfo, ok := helper.schemaStorage.GetLastSnapshot().TableByName("test", "t")
require.True(t, ok)

key, oldValue := helper.getLastKeyValue(tableInfo.ID)

_ = helper.DDL2Event(`alter table test.t add column b timestamp default '2023-02-09 13:00:00'`)
ts := helper.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
OldValue: oldValue,
StartTs: ts - 1,
CRTs: ts + 1,
}
polymorphicEvent := model.NewPolymorphicEvent(rawKV)
err := helper.mounter.DecodeEvent(context.Background(), polymorphicEvent)
require.NoError(t, err)

event := polymorphicEvent.Row
require.NotNil(t, event)
require.Equal(t, "2023-02-09 13:00:00", event.PreColumns[1].Value.(string))
}

func TestVerifyChecksumTime(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
Expand Down Expand Up @@ -1565,14 +1598,16 @@ func TestBuildTableInfo(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
require.NoError(t, err)
p := parser.New()
for i, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz)
require.NoError(t, err)
e := model.RowChangedEvent{
TableInfo: cdcTableInfo,
Expand Down
6 changes: 1 addition & 5 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,11 +633,7 @@ LOOP2:
return errors.Trace(err)
}
c.latestInfo.Config.Sink.TiDBSourceID = sourceID
log.Info("set source id",
zap.Uint64("sourceID", sourceID),
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
)
log.Info("get sourceID from PD", zap.Uint64("sourceID", sourceID), zap.Stringer("changefeedID", c.id))

c.ddlSink = c.newSink(c.id, c.latestInfo, c.Throw(ctx), func(err error) {
select {
Expand Down
17 changes: 0 additions & 17 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,24 +223,7 @@ func (m *ddlManager) tick(
}

for _, event := range events {
// TODO: find a better place to do this check
// check if the ddl event is belong to an ineligible table.
// If so, we should ignore it.
if !filter.IsSchemaDDL(event.Type) {
ignore, err := m.schema.
IsIneligibleTable(ctx, event.TableInfo.TableName.TableID, event.CommitTs)
if err != nil {
return nil, nil, errors.Trace(err)
}
if ignore {
log.Warn("ignore the DDL event of ineligible table",
zap.String("changefeed", m.changfeedID.ID), zap.Any("ddl", event))
continue
}
}

tableName := event.TableInfo.TableName
// Add all valid DDL events to the pendingDDLs.
m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event)
}

Expand Down
20 changes: 12 additions & 8 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,6 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
if p.initialized {
return nil
}

// Here we use a separated context for sub-components, so we can custom the
// order of stopping all sub-components when closing the processor.
prcCtx := context.Background()
Expand All @@ -589,7 +588,11 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
if err != nil {
return errors.Trace(err)
}
p.filter, err = filter.NewFilter(p.latestInfo.Config, util.GetTimeZoneName(tz))

// Clone the config to avoid data race
cfConfig := p.latestInfo.Config.Clone()

p.filter, err = filter.NewFilter(cfConfig, util.GetTimeZoneName(tz))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -602,8 +605,8 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
p.ddlHandler.spawn(prcCtx)

p.mg.r = entry.NewMounterGroup(p.ddlHandler.r.schemaStorage,
p.latestInfo.Config.Mounter.WorkerNum,
p.filter, tz, p.changefeedID, p.latestInfo.Config.Integrity)
cfConfig.Mounter.WorkerNum,
p.filter, tz, p.changefeedID, cfConfig.Integrity)
p.mg.name = "MounterGroup"
p.mg.changefeedID = p.changefeedID
p.mg.spawn(prcCtx)
Expand All @@ -612,9 +615,10 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
if err != nil {
return errors.Trace(err)
}
p.latestInfo.Config.Sink.TiDBSourceID = sourceID
log.Info("get sourceID from PD", zap.Uint64("sourceID", sourceID), zap.Stringer("changefeedID", p.changefeedID))
cfConfig.Sink.TiDBSourceID = sourceID

p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent)
p.redo.r = redo.NewDMLManager(p.changefeedID, cfConfig.Consistent)
p.redo.name = "RedoManager"
p.redo.changefeedID = p.changefeedID
p.redo.spawn(prcCtx)
Expand All @@ -630,8 +634,8 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {

p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode),
util.GetOrZero(p.latestInfo.Config.EnableTableMonitor))
sortEngine, util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor))
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)
Expand Down
Loading

0 comments on commit 02841dd

Please sign in to comment.