Skip to content

Commit

Permalink
*(ticdc): optimize memory usage of RowChangedEvent (#10483)
Browse files Browse the repository at this point in the history
close #10386
  • Loading branch information
lidezhu authored Feb 20, 2024
1 parent e51f517 commit a8c7563
Show file tree
Hide file tree
Showing 75 changed files with 2,628 additions and 2,473 deletions.
78 changes: 27 additions & 51 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,27 +342,22 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum,
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) {
cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))

// columnInfos and rowColumnInfos hold different column metadata,
// they should have the same length and order.
// columnInfos should have the same length and order with cols
columnInfos := make([]*timodel.ColumnInfo, len(tableInfo.RowColumnsOffset))
rowColumnInfos := make([]rowcodec.ColInfo, len(tableInfo.RowColumnsOffset))

_, _, extendColumnInfos := tableInfo.GetRowColInfos()

for idx, colInfo := range tableInfo.Columns {
for _, colInfo := range tableInfo.Columns {
if !model.IsColCDCVisible(colInfo) {
log.Debug("skip the column which is not visible",
zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O))
continue
}

colName := colInfo.Name.O
colID := colInfo.ID
colDatums, exist := datums[colID]
colDatum, exist := datums[colID]

var (
colValue interface{}
Expand All @@ -371,36 +366,29 @@ func datum2Column(
err error
)
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
colValue, size, warn, err = formatColVal(colDatum, colInfo)
} else {
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()),
zap.String("column", colInfo.Name.String()))
}

defaultValue := GetColumnDefaultValue(colInfo)
offset := tableInfo.RowColumnsOffset[colID]
rawCols[offset] = colDatums
cols[offset] = &model.Column{
Name: colName,
Type: colInfo.GetType(),
Charset: colInfo.GetCharset(),
Collation: colInfo.GetCollate(),
Value: colValue,
Default: defaultValue,
Flag: tableInfo.ColumnsFlag[colID],
rawCols[offset] = colDatum
cols[offset] = &model.ColumnData{
ColumnID: colID,
Value: colValue,
// ApproximateBytes = column data size + column struct size
ApproximateBytes: size + sizeOfEmptyColumn,
}
columnInfos[offset] = colInfo
rowColumnInfos[offset] = extendColumnInfos[idx]
}
return cols, rawCols, columnInfos, rowColumnInfos, nil
return cols, rawCols, columnInfos, nil
}

// return error if cannot get the expected checksum from the decoder
Expand Down Expand Up @@ -486,11 +474,10 @@ func (m *mounter) verifyChecksum(

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
extendColumnInfos []rowcodec.ColInfo
matched bool
err error
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
matched bool
err error

checksum *integrity.Checksum

Expand All @@ -500,14 +487,14 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d

// Decode previous columns.
var (
preCols []*model.Column
preCols []*model.ColumnData
preRawCols []types.Datum
preChecksum uint32
)
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow)
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand All @@ -531,23 +518,23 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
}

var (
cols []*model.Column
rawCols []types.Datum
current uint32
cols []*model.ColumnData
rawCols []types.Datum
currentChecksum uint32
)
if row.RowExist {
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row)
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row)
if err != nil {
return nil, rawRow, errors.Trace(err)
}

current, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
currentChecksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
Expand All @@ -568,9 +555,9 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d

// if both are 0, it means the checksum is not enabled
// so the checksum is nil to reduce memory allocation.
if preChecksum != 0 || current != 0 {
if preChecksum != 0 || currentChecksum != 0 {
checksum = &integrity.Checksum{
Current: current,
Current: currentChecksum,
Previous: preChecksum,
Corrupted: corrupted,
Version: checksumVersion,
Expand All @@ -583,7 +570,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
RowID: intRowID,
HandleKey: row.RecordID,
PhysicalTableID: row.PhysicalTableID,
ColInfos: extendColumnInfos,
TableInfo: tableInfo,
Columns: cols,
PreColumns: preCols,
Expand Down Expand Up @@ -739,16 +725,6 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, stri
return d, v, size, warn, err
}

// GetColumnDefaultValue returns the default definition of a column.
func GetColumnDefaultValue(col *timodel.ColumnInfo) interface{} {
defaultValue := col.GetDefaultValue()
if defaultValue == nil {
defaultValue = col.GetOriginDefaultValue()
}
defaultDatum := types.NewDatum(defaultValue)
return defaultDatum.GetValue()
}

// DecodeTableID decodes the raw key to a table ID
func DecodeTableID(key []byte) (model.TableID, error) {
_, physicalTableID, err := decodeTableID(key)
Expand Down
13 changes: 9 additions & 4 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
t.Log("ApproximateBytes", tc.tableName, rows-1, row.ApproximateBytes())
// TODO: test column flag, column type and index columns
if len(row.Columns) != 0 {
checkSQL, params := prepareCheckSQL(t, tc.tableName, row.Columns)
checkSQL, params := prepareCheckSQL(t, tc.tableName, row.GetColumns())
result := tk.MustQuery(checkSQL, params...)
result.Check([][]interface{}{{"1"}})
}
if len(row.PreColumns) != 0 {
checkSQL, params := prepareCheckSQL(t, tc.tableName, row.PreColumns)
checkSQL, params := prepareCheckSQL(t, tc.tableName, row.GetPreColumns())
result := tk.MustQuery(checkSQL, params...)
result.Check([][]interface{}{{"1"}})
}
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
for _, tc := range testCases {
_, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo)
require.Equal(t, tc.Res, val, tc.Name)
val = GetColumnDefaultValue(&tc.ColInfo)
val = model.GetColumnDefaultValue(&tc.ColInfo)
require.Equal(t, tc.Default, val, tc.Name)
}
}
Expand Down Expand Up @@ -1641,8 +1641,13 @@ func TestBuildTableInfo(t *testing.T) {
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
require.NoError(t, err)
e := model.RowChangedEvent{
TableInfo: cdcTableInfo,
Columns: colDatas,
}
cols := e.GetColumns()
recoveredTI := model.BuildTiDBTableInfo(cdcTableInfo.TableName.Table, cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
Expand Down
61 changes: 51 additions & 10 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/table/tables"
datumTypes "github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -89,6 +90,11 @@ type TableInfo struct {
// it's the same length and order with the model.TableInfo.Columns
rowColInfos []rowcodec.ColInfo
rowColFieldTps map[int64]*types.FieldType

// number of virtual columns
virtualColumnCount int
// rowColInfosWithoutVirtualCols is the same as rowColInfos, but without virtual columns
rowColInfosWithoutVirtualCols *[]rowcodec.ColInfo
}

// WrapTableInfo creates a TableInfo from a timodel.TableInfo
Expand Down Expand Up @@ -117,6 +123,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode

rowColumnsCurrentOffset := 0

ti.virtualColumnCount = 0
for i, col := range ti.Columns {
ti.columnsOffset[col.ID] = i
pkIsHandle := false
Expand All @@ -140,6 +147,8 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
ti.handleColID = append(ti.handleColID, id)
}
}
} else {
ti.virtualColumnCount += 1
}
ti.rowColInfos[i] = rowcodec.ColInfo{
ID: col.ID,
Expand Down Expand Up @@ -169,11 +178,32 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
}
}

ti.initRowColInfosWithoutVirtualCols()
ti.findHandleIndex()
ti.initColumnsFlag()
return ti
}

func (ti *TableInfo) initRowColInfosWithoutVirtualCols() {
if ti.virtualColumnCount == 0 {
ti.rowColInfosWithoutVirtualCols = &ti.rowColInfos
return
}
colInfos := make([]rowcodec.ColInfo, 0, len(ti.rowColInfos)-ti.virtualColumnCount)
for i, col := range ti.Columns {
if IsColCDCVisible(col) {
colInfos = append(colInfos, ti.rowColInfos[i])
}
}
if len(colInfos) != len(ti.rowColInfos)-ti.virtualColumnCount {
log.Panic("invalid rowColInfosWithoutVirtualCols",
zap.Int("len(colInfos)", len(colInfos)),
zap.Int("len(ti.rowColInfos)", len(ti.rowColInfos)),
zap.Int("ti.virtualColumnCount", ti.virtualColumnCount))
}
ti.rowColInfosWithoutVirtualCols = &colInfos
}

func (ti *TableInfo) findHandleIndex() {
if ti.HandleIndexID == HandleIndexPKIsHandle {
// pk is handle
Expand Down Expand Up @@ -295,16 +325,6 @@ func (ti *TableInfo) ForceGetColumnName(colID int64) string {
return ti.ForceGetColumnInfo(colID).Name.O
}

// ForceGetExtraColumnInfo return the extra column info by ID
// Caller must ensure `colID` exists
func (ti *TableInfo) ForceGetExtraColumnInfo(colID int64) rowcodec.ColInfo {
colOffset, ok := ti.columnsOffset[colID]
if !ok {
log.Panic("invalid column id", zap.Int64("columnID", colID))
}
return ti.rowColInfos[colOffset]
}

// ForceGetColumnIDByName return column ID by column name
// Caller must ensure `colID` exists
func (ti *TableInfo) ForceGetColumnIDByName(name string) int64 {
Expand Down Expand Up @@ -349,6 +369,12 @@ func (ti *TableInfo) GetRowColInfos() ([]int64, map[int64]*types.FieldType, []ro
return ti.handleColID, ti.rowColFieldTps, ti.rowColInfos
}

// GetColInfosForRowChangedEvent return column infos for non-virtual columns
// The column order in the result is the same as the order in its corresponding RowChangedEvent
func (ti *TableInfo) GetColInfosForRowChangedEvent() []rowcodec.ColInfo {
return *ti.rowColInfosWithoutVirtualCols
}

// IsColCDCVisible returns whether the col is visible for CDC
func IsColCDCVisible(col *model.ColumnInfo) bool {
// this column is a virtual generated column
Expand All @@ -363,6 +389,11 @@ func (ti *TableInfo) HasUniqueColumn() bool {
return ti.hasUniqueColumn
}

// HasVirtualColumns returns whether the table has virtual columns
func (ti *TableInfo) HasVirtualColumns() bool {
return ti.virtualColumnCount > 0
}

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
Expand Down Expand Up @@ -475,3 +506,13 @@ func (ti *TableInfo) GetPrimaryKeyColumnNames() []string {
}
return result
}

// GetColumnDefaultValue returns the default definition of a column.
func GetColumnDefaultValue(col *model.ColumnInfo) interface{} {
defaultValue := col.GetDefaultValue()
if defaultValue == nil {
defaultValue = col.GetOriginDefaultValue()
}
defaultDatum := datumTypes.NewDatum(defaultValue)
return defaultDatum.GetValue()
}
Loading

0 comments on commit a8c7563

Please sign in to comment.