Skip to content

Commit

Permalink
implement memory writer
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Feb 11, 2023
1 parent be8b0c4 commit fd49fb3
Show file tree
Hide file tree
Showing 25 changed files with 1,705 additions and 468 deletions.
4 changes: 4 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -387,6 +388,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -420,6 +422,7 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
MaxLogSize: 64,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: true,
},
}
}
Expand Down Expand Up @@ -562,6 +565,7 @@ type ConsistentConfig struct {
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
121 changes: 115 additions & 6 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package model

import (
"bytes"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -241,13 +242,127 @@ type RedoLog struct {
Type RedoLogType `msg:"type"`
}

// GetCommitTs returns the commit ts of the redo log.
func (r *RedoLog) GetCommitTs() Ts {
switch r.Type {
case RedoLogTypeRow:
return r.RedoRow.Row.CommitTs
case RedoLogTypeDDL:
return r.RedoDDL.DDL.CommitTs
default:
log.Panic("invalid redo log type", zap.Any("type", r.Type))
}
return 0
}

// RedoRowChangedEvent represents the DML event used in RedoLog
type RedoRowChangedEvent struct {
Row *RowChangedEvent `msg:"row"`
PreColumns []*RedoColumn `msg:"pre-columns"`
Columns []*RedoColumn `msg:"columns"`
}

// RedoDDLEvent represents DDL event used in redo log persistent
type RedoDDLEvent struct {
DDL *DDLEvent `msg:"ddl"`
Type byte `msg:"type"`
}

// ToRedoLog converts row changed event to redo log
func (row *RowChangedEvent) ToRedoLog() *RedoLog {
return &RedoLog{
RedoRow: RowToRedo(row),
Type: RedoLogTypeRow,
}
}

// ToRedoLog converts ddl event to redo log
func (ddl *DDLEvent) ToRedoLog() *RedoLog {
return &RedoLog{
RedoDDL: DDLToRedo(ddl),
Type: RedoLogTypeDDL,
}
}

// RowToRedo converts row changed event to redo log row
func RowToRedo(row *RowChangedEvent) *RedoRowChangedEvent {
redoLog := &RedoRowChangedEvent{
Row: row,
Columns: make([]*RedoColumn, 0, len(row.Columns)),
PreColumns: make([]*RedoColumn, 0, len(row.PreColumns)),
}
for _, column := range row.Columns {
var redoColumn *RedoColumn
if column != nil {
// workaround msgp issue:
// (Decode replaces empty slices with nil https://github.com/tinylib/msgp/issues/247)
// if []byte("") send with RowChangedEvent after UnmarshalMsg,
// the value will become nil, which is unexpected.
switch v := column.Value.(type) {
case []byte:
if bytes.Equal(v, []byte("")) {
column.Value = ""
}
}
redoColumn = &RedoColumn{Column: column, Flag: uint64(column.Flag)}
}
redoLog.Columns = append(redoLog.Columns, redoColumn)
}
for _, column := range row.PreColumns {
var redoColumn *RedoColumn
if column != nil {
switch v := column.Value.(type) {
case []byte:
if bytes.Equal(v, []byte("")) {
column.Value = ""
}
}
redoColumn = &RedoColumn{Column: column, Flag: uint64(column.Flag)}
}
redoLog.PreColumns = append(redoLog.PreColumns, redoColumn)
}
return redoLog
}

// LogToRow converts redo log row to row changed event
func LogToRow(redoLog *RedoRowChangedEvent) *RowChangedEvent {
row := redoLog.Row
row.Columns = make([]*Column, 0, len(redoLog.Columns))
row.PreColumns = make([]*Column, 0, len(redoLog.PreColumns))
for _, column := range redoLog.PreColumns {
if column == nil {
row.PreColumns = append(row.PreColumns, nil)
continue
}
column.Column.Flag = ColumnFlagType(column.Flag)
row.PreColumns = append(row.PreColumns, column.Column)
}
for _, column := range redoLog.Columns {
if column == nil {
row.Columns = append(row.Columns, nil)
continue
}
column.Column.Flag = ColumnFlagType(column.Flag)
row.Columns = append(row.Columns, column.Column)
}
return row
}

// DDLToRedo converts ddl event to redo log ddl
func DDLToRedo(ddl *DDLEvent) *RedoDDLEvent {
redoDDL := &RedoDDLEvent{
DDL: ddl,
Type: byte(ddl.Type),
}
return redoDDL
}

// LogToDDL converts redo log ddl to ddl event
func LogToDDL(redoDDL *RedoDDLEvent) *DDLEvent {
redoDDL.DDL.Type = model.ActionType(redoDDL.Type)
return redoDDL.DDL
}

// RowChangedEvent represents a row changed event
type RowChangedEvent struct {
StartTs uint64 `json:"start-ts" msg:"start-ts"`
Expand Down Expand Up @@ -595,12 +710,6 @@ type DDLEvent struct {
Done bool `msg:"-"`
}

// RedoDDLEvent represents DDL event used in redo log persistent
type RedoDDLEvent struct {
DDL *DDLEvent `msg:"ddl"`
Type byte `msg:"type"`
}

// FromJob fills the values with DDLEvent from DDL job
func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo, tableInfo *TableInfo) {
// populating DDLEvent of an `rename tables` job is handled in `FromRenameTablesJob()`
Expand Down
129 changes: 129 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package model
import (
"testing"

"github.com/pingcap/tidb/parser/charset"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
Expand Down Expand Up @@ -383,3 +384,131 @@ func TestDDLEventFromRenameTablesJob(t *testing.T) {
require.Len(t, event.TableInfo.TableInfo.Columns, 1)
require.Equal(t, event.Query, "RENAME TABLE `test1`.`t2` TO `test1`.`t20`")
}

func TestRowRedoConvert(t *testing.T) {
t.Parallel()
row := &RowChangedEvent{
StartTs: 100,
CommitTs: 120,
Table: &TableName{Schema: "test", Table: "table1", TableID: 57},
PreColumns: []*Column{{
Name: "a1",
Type: mysql.TypeLong,
Flag: BinaryFlag | MultipleKeyFlag | HandleKeyFlag,
Value: int64(1),
}, {
Name: "a2",
Type: mysql.TypeVarchar,
Value: []byte("char"),
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: BinaryFlag | MultipleKeyFlag | HandleKeyFlag,
Value: int64(1),
}, {
Name: "a4",
Type: mysql.TypeTinyBlob,
Charset: charset.CharsetGBK,
Value: []byte("你好"),
}, nil},
Columns: []*Column{{
Name: "a1",
Type: mysql.TypeLong,
Flag: BinaryFlag | MultipleKeyFlag | HandleKeyFlag,
Value: int64(2),
}, {
Name: "a2",
Type: mysql.TypeVarchar,
Value: []byte("char-updated"),
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: BinaryFlag | MultipleKeyFlag | HandleKeyFlag,
Value: int64(2),
}, {
Name: "a4",
Type: mysql.TypeTinyBlob,
Charset: charset.CharsetGBK,
Value: []byte("世界"),
}, nil},
IndexColumns: [][]int{{1, 3}},
}
rowRedo := RowToRedo(row)
require.Equal(t, 5, len(rowRedo.PreColumns))
require.Equal(t, 5, len(rowRedo.Columns))

redoLog := &RedoLog{
RedoRow: rowRedo,
Type: RedoLogTypeRow,
}
data, err := redoLog.MarshalMsg(nil)
require.Nil(t, err)
redoLog2 := &RedoLog{}
_, err = redoLog2.UnmarshalMsg(data)
require.Nil(t, err)
require.Equal(t, row, LogToRow(redoLog2.RedoRow))
}

func TestRowRedoConvertWithEmptySlice(t *testing.T) {
t.Parallel()
row := &RowChangedEvent{
StartTs: 100,
CommitTs: 120,
Table: &TableName{Schema: "test", Table: "table1", TableID: 57},
PreColumns: []*Column{{
Name: "a1",
Type: mysql.TypeLong,
Flag: BinaryFlag | MultipleKeyFlag | HandleKeyFlag,
Value: int64(1),
}, {
Name: "a2",
Type: mysql.TypeVarchar,
Value: []byte(""), // empty slice should be marshal and unmarshal safely
}},
Columns: []*Column{{
Name: "a1",
Type: mysql.TypeLong,
Flag: BinaryFlag | MultipleKeyFlag | HandleKeyFlag,
Value: int64(2),
}, {
Name: "a2",
Type: mysql.TypeVarchar,
Value: []byte(""),
}},
IndexColumns: [][]int{{1}},
}
rowRedo := RowToRedo(row)
redoLog := &RedoLog{
RedoRow: rowRedo,
Type: RedoLogTypeRow,
}
data, err := redoLog.MarshalMsg(nil)
require.Nil(t, err)

redoLog2 := &RedoLog{}
_, err = redoLog2.UnmarshalMsg(data)
require.Nil(t, err)
require.Equal(t, row, LogToRow(redoLog2.RedoRow))
}

func TestDDLRedoConvert(t *testing.T) {
t.Parallel()
ddl := &DDLEvent{
StartTs: 1020,
CommitTs: 1030,
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t1 ADD COLUMN a int",
}
redoDDL := DDLToRedo(ddl)

redoLog := &RedoLog{
RedoDDL: redoDDL,
Type: RedoLogTypeDDL,
}
data, err := redoLog.MarshalMsg(nil)
require.Nil(t, err)
redoLog2 := &RedoLog{}
_, err = redoLog2.UnmarshalMsg(data)
require.Nil(t, err)
require.Equal(t, ddl, LogToDDL(redoLog2.RedoDDL))
}
15 changes: 11 additions & 4 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,17 @@ func testChangefeedReleaseResource(
err := cf.tick(ctx, captures)
require.Nil(t, err)
cancel()
// check redo log dir is deleted
_, err = os.Stat(redoLogDir)
log.Error(err)
require.True(t, os.IsNotExist(err))

if cf.state.Info.Config.Consistent.UseFileBackend {
// check redo log dir is deleted
_, err = os.Stat(redoLogDir)
log.Error(err)
require.True(t, os.IsNotExist(err))
} else {
files, err := os.ReadDir(redoLogDir)
require.NoError(t, err)
require.Len(t, files, 1) // only delete mark
}
}

func TestExecRenameTablesDDL(t *testing.T) {
Expand Down
Loading

0 comments on commit fd49fb3

Please sign in to comment.