Skip to content

Commit

Permalink
cdc: remove some unnecessary fields from RowChangedEvent (#10517)
Browse files Browse the repository at this point in the history
ref #10386
  • Loading branch information
lidezhu authored Jan 24, 2024
1 parent d5ab31e commit 3e4522d
Show file tree
Hide file tree
Showing 97 changed files with 1,736 additions and 858 deletions.
24 changes: 8 additions & 16 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
}
}

schemaName := tableInfo.TableName.Schema
tableName := tableInfo.TableName.Table
var intRowID int64
if row.RecordID.IsInt() {
intRowID = row.RecordID.IntValue()
Expand All @@ -580,23 +578,17 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
}

return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: intRowID,
Table: &model.TableName{
Schema: schemaName,
Table: tableName,
TableID: row.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
ColInfos: extendColumnInfos,
TableInfo: tableInfo,
Columns: cols,
PreColumns: preCols,
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: intRowID,
PhysicalTableID: row.PhysicalTableID,
ColInfos: extendColumnInfos,
TableInfo: tableInfo,
Columns: cols,
PreColumns: preCols,

Checksum: checksum,

IndexColumns: tableInfo.IndexColumnsOffset,
ApproximateDataSize: dataSize,
}, rawRow, nil
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
return
}
rows++
require.Equal(t, row.Table.Table, tc.tableName)
require.Equal(t, row.Table.Schema, "test")
require.Equal(t, row.TableInfo.GetTableName(), tc.tableName)
require.Equal(t, row.TableInfo.GetSchemaName(), "test")
// [TODO] check size and reopen this check
// require.Equal(t, rowBytes[rows-1], row.ApproximateBytes(), row)
t.Log("ApproximateBytes", tc.tableName, rows-1, row.ApproximateBytes())
Expand Down Expand Up @@ -1491,10 +1491,10 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
}
row := pEvent.Row
rows++
require.Equal(t, row.Table.Schema, "test")
require.Equal(t, row.TableInfo.GetSchemaName(), "test")
// Now we only allow filter dml event by table, so we only check row's table.
require.NotContains(t, ignoredTables, row.Table.Table)
require.Contains(t, tables, row.Table.Table)
require.NotContains(t, ignoredTables, row.TableInfo.GetTableName())
require.Contains(t, tables, row.TableInfo.GetTableName())
})
return rows
}
Expand Down
29 changes: 7 additions & 22 deletions cdc/model/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,6 @@ func MarshalRedoLog(r *model.RedoLog, b []byte) (o []byte, err error) {
return
}

// MarshalRowAsRedoLog converts a RowChangedEvent into RedoLog, and then marshals it.
func MarshalRowAsRedoLog(r *model.RowChangedEvent, b []byte) (o []byte, err error) {
log := &model.RedoLog{
RedoRow: model.RedoRowChangedEvent{Row: r},
Type: model.RedoLogTypeRow,
}
return MarshalRedoLog(log, b)
}

// MarshalDDLAsRedoLog converts a DDLEvent into RedoLog, and then marshals it.
func MarshalDDLAsRedoLog(d *model.DDLEvent, b []byte) (o []byte, err error) {
log := &model.RedoLog{
Expand All @@ -187,19 +178,13 @@ func decodeVersion(bts []byte) (uint16, []byte) {
func redoLogFromV1(rv1 *codecv1.RedoLog) (r *model.RedoLog) {
r = &model.RedoLog{Type: (model.RedoLogType)(rv1.Type)}
if rv1.RedoRow != nil && rv1.RedoRow.Row != nil {
r.RedoRow.Row = &model.RowChangedEvent{
StartTs: rv1.RedoRow.Row.StartTs,
CommitTs: rv1.RedoRow.Row.CommitTs,
RowID: rv1.RedoRow.Row.RowID,
Table: tableNameFromV1(rv1.RedoRow.Row.Table),
ColInfos: rv1.RedoRow.Row.ColInfos,
TableInfo: rv1.RedoRow.Row.TableInfo,
Columns: make([]*model.Column, 0, len(rv1.RedoRow.Row.Columns)),
PreColumns: make([]*model.Column, 0, len(rv1.RedoRow.Row.PreColumns)),
IndexColumns: rv1.RedoRow.Row.IndexColumns,
ApproximateDataSize: rv1.RedoRow.Row.ApproximateDataSize,
SplitTxn: rv1.RedoRow.Row.SplitTxn,
ReplicatingTs: rv1.RedoRow.Row.ReplicatingTs,
r.RedoRow.Row = &model.RowChangedEventInRedoLog{
StartTs: rv1.RedoRow.Row.StartTs,
CommitTs: rv1.RedoRow.Row.CommitTs,
Table: tableNameFromV1(rv1.RedoRow.Row.Table),
Columns: make([]*model.Column, 0, len(rv1.RedoRow.Row.Columns)),
PreColumns: make([]*model.Column, 0, len(rv1.RedoRow.Row.PreColumns)),
IndexColumns: rv1.RedoRow.Row.IndexColumns,
}
for _, c := range rv1.RedoRow.Row.Columns {
r.RedoRow.Row.Columns = append(r.RedoRow.Row.Columns, columnFromV1(c))
Expand Down
7 changes: 3 additions & 4 deletions cdc/model/codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestV1toV2(t *testing.T) {

rv2 = &model.RedoLog{
RedoRow: model.RedoRowChangedEvent{
Row: &model.RowChangedEvent{
Row: &model.RowChangedEventInRedoLog{
StartTs: 1,
CommitTs: 2,
Table: &model.TableName{
Expand All @@ -77,7 +77,6 @@ func TestV1toV2(t *testing.T) {
TableID: 1,
IsPartition: false,
},
TableInfo: nil,
Columns: []*model.Column{
{
Name: "column",
Expand Down Expand Up @@ -136,7 +135,7 @@ func TestV1toV2(t *testing.T) {
func TestRowRedoConvert(t *testing.T) {
t.Parallel()

row := &model.RowChangedEvent{
row := &model.RowChangedEventInRedoLog{
StartTs: 100,
CommitTs: 120,
Table: &model.TableName{Schema: "test", Table: "table1", TableID: 57},
Expand Down Expand Up @@ -196,7 +195,7 @@ func TestRowRedoConvert(t *testing.T) {
func TestRowRedoConvertWithEmptySlice(t *testing.T) {
t.Parallel()

row := &model.RowChangedEvent{
row := &model.RowChangedEventInRedoLog{
StartTs: 100,
CommitTs: 120,
Table: &model.TableName{Schema: "test", Table: "table1", TableID: 57},
Expand Down
25 changes: 25 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,31 @@ func (ti *TableInfo) ForceGetColumnIDByName(name string) int64 {
return colID
}

// GetSchemaName returns the schema name of the table
func (ti *TableInfo) GetSchemaName() string {
return ti.TableName.Schema
}

// GetTableName returns the table name of the table
func (ti *TableInfo) GetTableName() string {
return ti.TableName.Table
}

// GetSchemaNamePtr returns the pointer to the schema name of the table
func (ti *TableInfo) GetSchemaNamePtr() *string {
return &ti.TableName.Schema
}

// GetTableNamePtr returns the pointer to the table name of the table
func (ti *TableInfo) GetTableNamePtr() *string {
return &ti.TableName.Table
}

// IsPartitionTable returns whether the table is partition table
func (ti *TableInfo) IsPartitionTable() bool {
return ti.TableName.IsPartition
}

func (ti *TableInfo) String() string {
return fmt.Sprintf("TableInfo, ID: %d, Name:%s, ColNum: %d, IdxNum: %d, PKIsHandle: %t", ti.ID, ti.TableName, len(ti.Columns), len(ti.Indices), ti.PKIsHandle)
}
Expand Down
8 changes: 8 additions & 0 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package model
import (
"testing"

"github.com/pingcap/tidb/pkg/parser/model"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
parser_types "github.com/pingcap/tidb/pkg/parser/types"
Expand Down Expand Up @@ -174,6 +175,9 @@ func TestTableInfoGetterFuncs(t *testing.T) {
},
IsCommonHandle: false,
PKIsHandle: false,
Partition: &model.PartitionInfo{
Enable: true,
},
}
info := WrapTableInfo(1, "test", 0, &tbl)

Expand All @@ -185,6 +189,10 @@ func TestTableInfoGetterFuncs(t *testing.T) {

require.Equal(t, "TableInfo, ID: 1071, Name:test.t1, ColNum: 3, IdxNum: 1, PKIsHandle: false", info.String())

require.Equal(t, "test", info.GetSchemaName())
require.Equal(t, "t1", info.GetTableName())
require.True(t, info.IsPartitionTable())

handleColIDs, fts, colInfos := info.GetRowColInfos()
require.Equal(t, []int64{-1}, handleColIDs)
require.Equal(t, 3, len(fts))
Expand Down
70 changes: 46 additions & 24 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error {

// RedoRowChangedEvent represents the DML event used in RedoLog
type RedoRowChangedEvent struct {
Row *RowChangedEvent `msg:"row"`
Columns []RedoColumn `msg:"columns"`
PreColumns []RedoColumn `msg:"pre-columns"`
Row *RowChangedEventInRedoLog `msg:"row"`
Columns []RedoColumn `msg:"columns"`
PreColumns []RedoColumn `msg:"pre-columns"`
}

// RedoDDLEvent represents DDL event used in redo log persistent
Expand All @@ -291,8 +291,21 @@ type RedoDDLEvent struct {

// ToRedoLog converts row changed event to redo log
func (r *RowChangedEvent) ToRedoLog() *RedoLog {
rowInRedoLog := &RowChangedEventInRedoLog{
StartTs: r.StartTs,
CommitTs: r.CommitTs,
Table: &TableName{
Schema: r.TableInfo.GetSchemaName(),
Table: r.TableInfo.GetTableName(),
TableID: r.PhysicalTableID,
IsPartition: r.TableInfo.IsPartitionTable(),
},
Columns: r.Columns,
PreColumns: r.PreColumns,
IndexColumns: r.TableInfo.IndexColumnsOffset,
}
return &RedoLog{
RedoRow: RedoRowChangedEvent{Row: r},
RedoRow: RedoRowChangedEvent{Row: rowInRedoLog},
Type: RedoLogTypeRow,
}
}
Expand All @@ -312,10 +325,10 @@ type RowChangedEvent struct {

RowID int64 `json:"row-id" msg:"-"` // Deprecated. It is empty when the RowID comes from clustered index table.

// Table contains the table name and table ID.
// NOTICE: We store the physical table ID here, not the logical table ID.
Table *TableName `json:"table" msg:"table"`
PhysicalTableID int64 `json:"physical-tbl-id" msg:"physical-tbl-id"`

ColInfos []rowcodec.ColInfo `json:"column-infos" msg:"-"`

// NOTICE: We probably store the logical ID inside TableInfo's TableName,
// not the physical ID.
// For normal table, there is only one ID, which is the physical ID.
Expand All @@ -328,9 +341,8 @@ type RowChangedEvent struct {
// So be careful when using the TableInfo.
TableInfo *TableInfo `json:"-" msg:"-"`

Columns []*Column `json:"columns" msg:"columns"`
PreColumns []*Column `json:"pre-columns" msg:"pre-columns"`
IndexColumns [][]int `json:"-" msg:"index-columns"`
Columns []*Column `json:"columns" msg:"columns"`
PreColumns []*Column `json:"pre-columns" msg:"pre-columns"`

// Checksum for the event, only not nil if the upstream TiDB enable the row level checksum
// and TiCDC set the integrity check level to the correctness.
Expand All @@ -346,6 +358,20 @@ type RowChangedEvent struct {
ReplicatingTs Ts `json:"-" msg:"-"`
}

// RowChangedEventInRedoLog is used to store RowChangedEvent in redo log v2 format
type RowChangedEventInRedoLog struct {
StartTs uint64 `json:"start-ts" msg:"start-ts"`
CommitTs uint64 `json:"commit-ts" msg:"commit-ts"`

// Table contains the table name and table ID.
// NOTICE: We store the physical table ID here, not the logical table ID.
Table *TableName `json:"table" msg:"table"`

Columns []*Column `json:"columns" msg:"columns"`
PreColumns []*Column `json:"pre-columns" msg:"pre-columns"`
IndexColumns [][]int `json:"-" msg:"index-columns"`
}

// txnRows represents a set of events that belong to the same transaction.
type txnRows []*RowChangedEvent

Expand Down Expand Up @@ -480,12 +506,8 @@ func (r *RowChangedEvent) WithHandlePrimaryFlag(colNames map[string]struct{}) {
// ApproximateBytes returns approximate bytes in memory consumed by the event.
func (r *RowChangedEvent) ApproximateBytes() int {
const sizeOfRowEvent = int(unsafe.Sizeof(*r))
const sizeOfTable = int(unsafe.Sizeof(*r.Table))
const sizeOfIndexes = int(unsafe.Sizeof(r.IndexColumns[0]))
const sizeOfInt = int(unsafe.Sizeof(int(0)))

// Size of table name
size := len(r.Table.Schema) + len(r.Table.Table) + sizeOfTable
size := 0
// Size of cols
for i := range r.Columns {
size += r.Columns[i].ApproximateBytes
Expand All @@ -496,11 +518,6 @@ func (r *RowChangedEvent) ApproximateBytes() int {
size += r.PreColumns[i].ApproximateBytes
}
}
// Size of index columns
for i := range r.IndexColumns {
size += len(r.IndexColumns[i]) * sizeOfInt
size += sizeOfIndexes
}
// Size of an empty row event
size += sizeOfRowEvent
return size
Expand Down Expand Up @@ -858,8 +875,8 @@ func NewBootstrapDDLEvent(tableInfo *TableInfo) *DDLEvent {
//
//msgp:ignore SingleTableTxn
type SingleTableTxn struct {
Table *TableName
TableInfo *TableInfo
PhysicalTableID int64
TableInfo *TableInfo
// TableInfoVersion is the version of the table info, it is used to generate data path
// in storage sink. Generally, TableInfoVersion equals to `SingleTableTxn.TableInfo.Version`.
// Besides, if one table is just scheduled to a new processor, the TableInfoVersion should be
Expand All @@ -876,6 +893,11 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
return t.CommitTs
}

// GetPhysicalTableID returns the physical table id of the table in the transaction
func (t *SingleTableTxn) GetPhysicalTableID() int64 {
return t.PhysicalTableID
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {
if !t.shouldSplitUpdateEvent(scheme) {
Expand Down Expand Up @@ -1000,11 +1022,11 @@ func splitUpdateEvent(

// Append adds a row changed event into SingleTableTxn
func (t *SingleTableTxn) Append(row *RowChangedEvent) {
if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.Table.TableID != t.Table.TableID {
if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.PhysicalTableID != t.GetPhysicalTableID() {
log.Panic("unexpected row change event",
zap.Uint64("startTs", t.StartTs),
zap.Uint64("commitTs", t.CommitTs),
zap.Any("table", t.Table),
zap.Any("table", t.GetPhysicalTableID()),
zap.Any("row", row))
}
t.Rows = append(t.Rows, row)
Expand Down
Loading

0 comments on commit 3e4522d

Please sign in to comment.