Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Apr 23, 2024
1 parent 8ca0b11 commit 4c5e279
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 0 deletions.
36 changes: 36 additions & 0 deletions pkg/sink/codec/debezium/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestEncodeInsert(t *testing.T) {
nowFunc: func() time.Time { return time.Unix(1701326309, 0) },
}
codec.config.DebeziumDisableSchema = true
codec.config.DebeziumOutputOldValue = false

tableInfo := model.BuildTableInfo("test", "table1", []*model.Column{{
Name: "tiny",
Expand Down Expand Up @@ -356,6 +357,40 @@ func TestEncodeUpdate(t *testing.T) {
}
}
`, buf.String())

codec.config.DebeziumOutputOldValue = false
codec.config.DebeziumDisableSchema = true
buf.Reset()
err = codec.EncodeRowChangedEvent(e, buf)
require.Nil(t, err)
require.JSONEq(t, `
{
"payload": {
"source": {
"version": "2.4.0.Final",
"connector": "TiCDC",
"name": "test-cluster",
"ts_ms": 0,
"snapshot": "false",
"db": "test",
"table": "table1",
"server_id": 0,
"gtid": null,
"file": "",
"pos": 0,
"row": 0,
"thread": 0,
"query": null,
"commit_ts": 1,
"cluster_id": "test-cluster"
},
"ts_ms": 1701326309000,
"transaction": null,
"op": "u",
"after": { "tiny": 1 }
}
}
`, buf.String())
}

func TestEncodeDelete(t *testing.T) {
Expand All @@ -364,6 +399,7 @@ func TestEncodeDelete(t *testing.T) {
clusterID: "test-cluster",
nowFunc: func() time.Time { return time.Unix(1701326309, 0) },
}
codec.config.DebeziumOutputOldValue = false
codec.config.DebeziumDisableSchema = true

tableInfo := model.BuildTableInfo("test", "table1", []*model.Column{{
Expand Down
59 changes: 59 additions & 0 deletions pkg/sink/codec/open/open_protocol_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ var (
}, tableInfoWithManyCols),
}

updateEvent = &model.RowChangedEvent{
CommitTs: 1,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas([]*model.Column{
{
Name: "col1",
Value: []byte("aa"),
},
{
Name: "col2",
Value: []byte("bb"),
},
}, tableInfo),
PreColumns: model.Columns2ColumnDatas([]*model.Column{
{
Name: "col1",
Value: []byte("aaa"),
},
{
Name: "col2",
Value: []byte("bbb"),
},
}, tableInfo),
}

testCaseDDL = &model.DDLEvent{
CommitTs: 417318403368288260,
TableInfo: &model.TableInfo{
Expand Down Expand Up @@ -378,6 +403,7 @@ func TestEncodeDecodeE2E(t *testing.T) {
topic := "test"

codecConfig := common.NewConfig(config.ProtocolOpen)
codecConfig.OpenOutputOldValue = false
builder, err := NewBatchEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()
Expand Down Expand Up @@ -572,3 +598,36 @@ func TestE2EClaimCheckMessage(t *testing.T) {
require.Equal(t, column.Value, decodedColumn.Value)
}
}

func TestE2EOutputOldValueFalse(t *testing.T) {
t.Parallel()

codecConfig := common.NewConfig(config.ProtocolOpen)
codecConfig.OpenOutputOldValue = false

codecConfig.MaxMessageBytes = 251

ctx := context.Background()
builder, err := NewBatchEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

topic := "test"
err = encoder.AppendRowChangedEvent(ctx, topic, updateEvent, nil)
require.NoError(t, err)

message := encoder.Build()[0]
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
messageType, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, messageType, model.MessageTypeRow)

decoded, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.Nil(t, decoded.PreColumns)
}

0 comments on commit 4c5e279

Please sign in to comment.