diff --git a/pkg/sink/codec/debezium/codec_test.go b/pkg/sink/codec/debezium/codec_test.go index bcd7317b12c..6e8529164ce 100644 --- a/pkg/sink/codec/debezium/codec_test.go +++ b/pkg/sink/codec/debezium/codec_test.go @@ -33,6 +33,7 @@ func TestEncodeInsert(t *testing.T) { nowFunc: func() time.Time { return time.Unix(1701326309, 0) }, } codec.config.DebeziumDisableSchema = true + codec.config.DebeziumOutputOldValue = false tableInfo := model.BuildTableInfo("test", "table1", []*model.Column{{ Name: "tiny", @@ -356,6 +357,40 @@ func TestEncodeUpdate(t *testing.T) { } } `, buf.String()) + + codec.config.DebeziumOutputOldValue = false + codec.config.DebeziumDisableSchema = true + buf.Reset() + err = codec.EncodeRowChangedEvent(e, buf) + require.Nil(t, err) + require.JSONEq(t, ` + { + "payload": { + "source": { + "version": "2.4.0.Final", + "connector": "TiCDC", + "name": "test-cluster", + "ts_ms": 0, + "snapshot": "false", + "db": "test", + "table": "table1", + "server_id": 0, + "gtid": null, + "file": "", + "pos": 0, + "row": 0, + "thread": 0, + "query": null, + "commit_ts": 1, + "cluster_id": "test-cluster" + }, + "ts_ms": 1701326309000, + "transaction": null, + "op": "u", + "after": { "tiny": 1 } + } + } + `, buf.String()) } func TestEncodeDelete(t *testing.T) { @@ -364,6 +399,7 @@ func TestEncodeDelete(t *testing.T) { clusterID: "test-cluster", nowFunc: func() time.Time { return time.Unix(1701326309, 0) }, } + codec.config.DebeziumOutputOldValue = false codec.config.DebeziumDisableSchema = true tableInfo := model.BuildTableInfo("test", "table1", []*model.Column{{ diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index 55bbf4c6614..3f4b77d21a0 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -195,6 +195,31 @@ var ( }, tableInfoWithManyCols), } + updateEvent = &model.RowChangedEvent{ + CommitTs: 1, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "col1", + Value: []byte("aa"), + }, + { + Name: "col2", + Value: []byte("bb"), + }, + }, tableInfo), + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "col1", + Value: []byte("aaa"), + }, + { + Name: "col2", + Value: []byte("bbb"), + }, + }, tableInfo), + } + testCaseDDL = &model.DDLEvent{ CommitTs: 417318403368288260, TableInfo: &model.TableInfo{ @@ -378,6 +403,7 @@ func TestEncodeDecodeE2E(t *testing.T) { topic := "test" codecConfig := common.NewConfig(config.ProtocolOpen) + codecConfig.OpenOutputOldValue = false builder, err := NewBatchEncoderBuilder(ctx, codecConfig) require.NoError(t, err) encoder := builder.Build() @@ -572,3 +598,36 @@ func TestE2EClaimCheckMessage(t *testing.T) { require.Equal(t, column.Value, decodedColumn.Value) } } + +func TestE2EOutputOldValueFalse(t *testing.T) { + t.Parallel() + + codecConfig := common.NewConfig(config.ProtocolOpen) + codecConfig.OpenOutputOldValue = false + + codecConfig.MaxMessageBytes = 251 + + ctx := context.Background() + builder, err := NewBatchEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + topic := "test" + err = encoder.AppendRowChangedEvent(ctx, topic, updateEvent, nil) + require.NoError(t, err) + + message := encoder.Build()[0] + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeRow) + + decoded, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.Nil(t, decoded.PreColumns) +}