Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*(ticdc): optimize memory usage of RowChangedEvent #10483

Merged
merged 8 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 27 additions & 51 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,27 +342,22 @@

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum,
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
) ([]*model.ColumnData, []types.Datum, []*timodel.ColumnInfo, error) {
cols := make([]*model.ColumnData, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))

// columnInfos and rowColumnInfos hold different column metadata,
// they should have the same length and order.
// columnInfos should have the same length and order with cols
columnInfos := make([]*timodel.ColumnInfo, len(tableInfo.RowColumnsOffset))
rowColumnInfos := make([]rowcodec.ColInfo, len(tableInfo.RowColumnsOffset))

_, _, extendColumnInfos := tableInfo.GetRowColInfos()

for idx, colInfo := range tableInfo.Columns {
for _, colInfo := range tableInfo.Columns {
if !model.IsColCDCVisible(colInfo) {
log.Debug("skip the column which is not visible",
zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O))
continue
}

colName := colInfo.Name.O
colID := colInfo.ID
colDatums, exist := datums[colID]
colDatum, exist := datums[colID]

var (
colValue interface{}
Expand All @@ -371,36 +366,29 @@
err error
)
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
colValue, size, warn, err = formatColVal(colDatum, colInfo)
} else {
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
colDatum, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)

Check warning on line 374 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L374

Added line #L374 was not covered by tests
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()),
zap.String("column", colInfo.Name.String()))
}

defaultValue := GetColumnDefaultValue(colInfo)
offset := tableInfo.RowColumnsOffset[colID]
rawCols[offset] = colDatums
cols[offset] = &model.Column{
Name: colName,
Type: colInfo.GetType(),
Charset: colInfo.GetCharset(),
Collation: colInfo.GetCollate(),
Value: colValue,
Default: defaultValue,
Flag: tableInfo.ColumnsFlag[colID],
rawCols[offset] = colDatum
cols[offset] = &model.ColumnData{
ColumnID: colID,
Value: colValue,
// ApproximateBytes = column data size + column struct size
ApproximateBytes: size + sizeOfEmptyColumn,
}
columnInfos[offset] = colInfo
rowColumnInfos[offset] = extendColumnInfos[idx]
}
return cols, rawCols, columnInfos, rowColumnInfos, nil
return cols, rawCols, columnInfos, nil
}

// return error if cannot get the expected checksum from the decoder
Expand Down Expand Up @@ -486,11 +474,10 @@

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
extendColumnInfos []rowcodec.ColInfo
matched bool
err error
rawRow model.RowChangedDatums
columnInfos []*timodel.ColumnInfo
matched bool
err error

checksum *integrity.Checksum

Expand All @@ -500,14 +487,14 @@

// Decode previous columns.
var (
preCols []*model.Column
preCols []*model.ColumnData
preRawCols []types.Datum
preChecksum uint32
)
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow)
preCols, preRawCols, columnInfos, err = datum2Column(tableInfo, row.PreRow)

Check warning on line 497 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L497

Added line #L497 was not covered by tests
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand All @@ -531,23 +518,23 @@
}

var (
cols []*model.Column
rawCols []types.Datum
current uint32
cols []*model.ColumnData
rawCols []types.Datum
currentChecksum uint32
)
if row.RowExist {
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row)
cols, rawCols, columnInfos, err = datum2Column(tableInfo, row.Row)
if err != nil {
return nil, rawRow, errors.Trace(err)
}

current, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
currentChecksum, checksumVersion, matched, err = m.verifyChecksum(columnInfos, rawCols, false)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
Expand All @@ -568,9 +555,9 @@

// if both are 0, it means the checksum is not enabled
// so the checksum is nil to reduce memory allocation.
if preChecksum != 0 || current != 0 {
if preChecksum != 0 || currentChecksum != 0 {
checksum = &integrity.Checksum{
Current: current,
Current: currentChecksum,
Previous: preChecksum,
Corrupted: corrupted,
Version: checksumVersion,
Expand All @@ -583,7 +570,6 @@
RowID: intRowID,
HandleKey: row.RecordID,
PhysicalTableID: row.PhysicalTableID,
ColInfos: extendColumnInfos,
TableInfo: tableInfo,
Columns: cols,
PreColumns: preCols,
Expand Down Expand Up @@ -739,16 +725,6 @@
return d, v, size, warn, err
}

// GetColumnDefaultValue returns the default definition of a column.
func GetColumnDefaultValue(col *timodel.ColumnInfo) interface{} {
defaultValue := col.GetDefaultValue()
if defaultValue == nil {
defaultValue = col.GetOriginDefaultValue()
}
defaultDatum := types.NewDatum(defaultValue)
return defaultDatum.GetValue()
}

// DecodeTableID decodes the raw key to a table ID
func DecodeTableID(key []byte) (model.TableID, error) {
_, physicalTableID, err := decodeTableID(key)
Expand Down
13 changes: 9 additions & 4 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
t.Log("ApproximateBytes", tc.tableName, rows-1, row.ApproximateBytes())
// TODO: test column flag, column type and index columns
if len(row.Columns) != 0 {
checkSQL, params := prepareCheckSQL(t, tc.tableName, row.Columns)
checkSQL, params := prepareCheckSQL(t, tc.tableName, row.GetColumns())
result := tk.MustQuery(checkSQL, params...)
result.Check([][]interface{}{{"1"}})
}
if len(row.PreColumns) != 0 {
checkSQL, params := prepareCheckSQL(t, tc.tableName, row.PreColumns)
checkSQL, params := prepareCheckSQL(t, tc.tableName, row.GetPreColumns())
result := tk.MustQuery(checkSQL, params...)
result.Check([][]interface{}{{"1"}})
}
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
for _, tc := range testCases {
_, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo)
require.Equal(t, tc.Res, val, tc.Name)
val = GetColumnDefaultValue(&tc.ColInfo)
val = model.GetColumnDefaultValue(&tc.ColInfo)
require.Equal(t, tc.Default, val, tc.Name)
}
}
Expand Down Expand Up @@ -1641,8 +1641,13 @@ func TestBuildTableInfo(t *testing.T) {
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
lidezhu marked this conversation as resolved.
Show resolved Hide resolved
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{})
require.NoError(t, err)
e := model.RowChangedEvent{
TableInfo: cdcTableInfo,
Columns: colDatas,
}
cols := e.GetColumns()
recoveredTI := model.BuildTiDBTableInfo(cdcTableInfo.TableName.Table, cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
Expand Down
61 changes: 51 additions & 10 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/table/tables"
datumTypes "github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -89,6 +90,11 @@
// it's the same length and order with the model.TableInfo.Columns
rowColInfos []rowcodec.ColInfo
rowColFieldTps map[int64]*types.FieldType

// number of virtual columns
virtualColumnCount int
// rowColInfosWithoutVirtualCols is the same as rowColInfos, but without virtual columns
rowColInfosWithoutVirtualCols *[]rowcodec.ColInfo
}

// WrapTableInfo creates a TableInfo from a timodel.TableInfo
Expand Down Expand Up @@ -117,6 +123,7 @@

rowColumnsCurrentOffset := 0

ti.virtualColumnCount = 0
for i, col := range ti.Columns {
ti.columnsOffset[col.ID] = i
pkIsHandle := false
Expand All @@ -140,6 +147,8 @@
ti.handleColID = append(ti.handleColID, id)
}
}
} else {
ti.virtualColumnCount += 1
}
ti.rowColInfos[i] = rowcodec.ColInfo{
ID: col.ID,
Expand Down Expand Up @@ -169,11 +178,32 @@
}
}

ti.initRowColInfosWithoutVirtualCols()
ti.findHandleIndex()
ti.initColumnsFlag()
return ti
}

func (ti *TableInfo) initRowColInfosWithoutVirtualCols() {
if ti.virtualColumnCount == 0 {
ti.rowColInfosWithoutVirtualCols = &ti.rowColInfos
return
}
colInfos := make([]rowcodec.ColInfo, 0, len(ti.rowColInfos)-ti.virtualColumnCount)
for i, col := range ti.Columns {
if IsColCDCVisible(col) {
colInfos = append(colInfos, ti.rowColInfos[i])
}
}
if len(colInfos) != len(ti.rowColInfos)-ti.virtualColumnCount {
log.Panic("invalid rowColInfosWithoutVirtualCols",
zap.Int("len(colInfos)", len(colInfos)),
zap.Int("len(ti.rowColInfos)", len(ti.rowColInfos)),
zap.Int("ti.virtualColumnCount", ti.virtualColumnCount))
}

Check warning on line 203 in cdc/model/schema_storage.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/schema_storage.go#L199-L203

Added lines #L199 - L203 were not covered by tests
ti.rowColInfosWithoutVirtualCols = &colInfos
}

func (ti *TableInfo) findHandleIndex() {
if ti.HandleIndexID == HandleIndexPKIsHandle {
// pk is handle
Expand Down Expand Up @@ -295,16 +325,6 @@
return ti.ForceGetColumnInfo(colID).Name.O
}

// ForceGetExtraColumnInfo return the extra column info by ID
// Caller must ensure `colID` exists
func (ti *TableInfo) ForceGetExtraColumnInfo(colID int64) rowcodec.ColInfo {
colOffset, ok := ti.columnsOffset[colID]
if !ok {
log.Panic("invalid column id", zap.Int64("columnID", colID))
}
return ti.rowColInfos[colOffset]
}

// ForceGetColumnIDByName return column ID by column name
// Caller must ensure `colID` exists
func (ti *TableInfo) ForceGetColumnIDByName(name string) int64 {
Expand Down Expand Up @@ -349,6 +369,12 @@
return ti.handleColID, ti.rowColFieldTps, ti.rowColInfos
}

// GetColInfosForRowChangedEvent return column infos for non-virtual columns
// The column order in the result is the same as the order in its corresponding RowChangedEvent
func (ti *TableInfo) GetColInfosForRowChangedEvent() []rowcodec.ColInfo {
return *ti.rowColInfosWithoutVirtualCols

Check warning on line 375 in cdc/model/schema_storage.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/schema_storage.go#L374-L375

Added lines #L374 - L375 were not covered by tests
}

// IsColCDCVisible returns whether the col is visible for CDC
func IsColCDCVisible(col *model.ColumnInfo) bool {
// this column is a virtual generated column
Expand All @@ -363,6 +389,11 @@
return ti.hasUniqueColumn
}

// HasVirtualColumns returns whether the table has virtual columns
func (ti *TableInfo) HasVirtualColumns() bool {
return ti.virtualColumnCount > 0

Check warning on line 394 in cdc/model/schema_storage.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/schema_storage.go#L393-L394

Added lines #L393 - L394 were not covered by tests
}

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
Expand Down Expand Up @@ -475,3 +506,13 @@
}
return result
}

// GetColumnDefaultValue returns the default definition of a column.
func GetColumnDefaultValue(col *model.ColumnInfo) interface{} {
defaultValue := col.GetDefaultValue()
if defaultValue == nil {
defaultValue = col.GetOriginDefaultValue()
}
defaultDatum := datumTypes.NewDatum(defaultValue)
return defaultDatum.GetValue()
}
Loading
Loading