From 1eed6c08e8d47157f3a5330df4f40500c0254b41 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sun, 28 Apr 2024 17:52:27 +0800 Subject: [PATCH] sink(ticdc): Add output-old-value config (#10915) (#10964) close pingcap/tiflow#10916 --- cdc/api/v2/model.go | 41 +++++++++++++- cdc/api/v2/model_test.go | 2 + docs/swagger/docs.go | 46 +++++++++++++++ docs/swagger/swagger.json | 46 +++++++++++++++ docs/swagger/swagger.yaml | 30 ++++++++++ pkg/cmd/util/helper_test.go | 4 ++ pkg/config/config_test_data.go | 24 +++++++- pkg/config/replica_config.go | 2 + pkg/config/replica_config_test.go | 6 ++ pkg/config/sink.go | 16 ++++++ pkg/orchestrator/reactor_state_test.go | 6 ++ pkg/sink/codec/common/config.go | 15 +++++ pkg/sink/codec/debezium/codec.go | 4 +- pkg/sink/codec/debezium/codec_test.go | 36 ++++++++++++ .../codec/open/open_protocol_encoder_test.go | 56 +++++++++++++++++++ pkg/sink/codec/open/open_protocol_message.go | 7 ++- tests/integration_tests/api_v2/cases.go | 4 ++ tests/integration_tests/api_v2/model.go | 42 +++++++++----- 18 files changed, 365 insertions(+), 22 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index cdfc0f1303a..75f1207ff78 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -436,6 +436,18 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, } } + var debeziumConfig *config.DebeziumConfig + if c.Sink.DebeziumConfig != nil { + debeziumConfig = &config.DebeziumConfig{ + OutputOldValue: c.Sink.DebeziumConfig.OutputOldValue, + } + } + var openProtocolConfig *config.OpenProtocolConfig + if c.Sink.OpenProtocolConfig != nil { + openProtocolConfig = &config.OpenProtocolConfig{ + OutputOldValue: c.Sink.OpenProtocolConfig.OutputOldValue, + } + } res.Sink = &config.SinkConfig{ DispatchRules: dispatchRules, @@ -457,6 +469,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( PulsarConfig: pulsarConfig, CloudStorageConfig: cloudStorageConfig, SafeMode: c.Sink.SafeMode, + OpenProtocol: openProtocolConfig, + Debezium: debeziumConfig, } if c.Sink.TxnAtomicity != nil { @@ -717,7 +731,18 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, } } - + var debeziumConfig *DebeziumConfig + if cloned.Sink.Debezium != nil { + debeziumConfig = &DebeziumConfig{ + OutputOldValue: cloned.Sink.Debezium.OutputOldValue, + } + } + var openProtocolConfig *OpenProtocolConfig + if cloned.Sink.OpenProtocol != nil { + openProtocolConfig = &OpenProtocolConfig{ + OutputOldValue: cloned.Sink.OpenProtocol.OutputOldValue, + } + } res.Sink = &SinkConfig{ Protocol: cloned.Sink.Protocol, SchemaRegistry: cloned.Sink.SchemaRegistry, @@ -738,6 +763,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { PulsarConfig: pulsarConfig, CloudStorageConfig: cloudStorageConfig, SafeMode: cloned.Sink.SafeMode, + DebeziumConfig: debeziumConfig, + OpenProtocolConfig: openProtocolConfig, } if cloned.Sink.TxnAtomicity != nil { @@ -925,6 +952,8 @@ type SinkConfig struct { SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"` SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"` DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"` + DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"` + OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"` } // CSVConfig denotes the csv config @@ -1267,3 +1296,13 @@ type GlueSchemaRegistryConfig struct { SecretAccessKey string `json:"secret_access_key,omitempty"` Token string `json:"token,omitempty"` } + +// OpenProtocolConfig represents the configurations for open protocol encoding +type OpenProtocolConfig struct { + OutputOldValue bool `json:"output_old_value"` +} + +// DebeziumConfig represents the configurations for debezium protocol encoding +type DebeziumConfig struct { + OutputOldValue bool `json:"output_old_value"` +} diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index ffb061b0a6d..27eff85db1d 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -63,6 +63,8 @@ var defaultAPIConfig = &ReplicaConfig{ SendBootstrapInMsgCount: util.AddressOf(int32(10000)), SendBootstrapToAllPartition: util.AddressOf(true), DebeziumDisableSchema: util.AddressOf(false), + OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true}, + DebeziumConfig: &DebeziumConfig{OutputOldValue: true}, }, Consistent: &ConsistentConfig{ Level: "none", diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 5538290ee95..4f605257db2 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1492,6 +1492,14 @@ var doc = `{ } } }, + "config.DebeziumConfig": { + "type": "object", + "properties": { + "output-old-value": { + "type": "boolean" + } + } + }, "config.DispatchRule": { "type": "object", "properties": { @@ -1754,6 +1762,14 @@ var doc = `{ } } }, + "config.OpenProtocolConfig": { + "type": "object", + "properties": { + "output-old-value": { + "type": "boolean" + } + } + }, "config.PulsarConfig": { "type": "object", "properties": { @@ -1856,6 +1872,10 @@ var doc = `{ "description": "DateSeparator is only available when the downstream is Storage.", "type": "string" }, + "debezium": { + "description": "DebeziumConfig related configurations", + "$ref": "#/definitions/config.DebeziumConfig" + }, "debezium-disable-schema": { "description": "Debezium only. Whether schema should be excluded in the output.", "type": "boolean" @@ -1897,6 +1917,10 @@ var doc = `{ "description": "OnlyOutputUpdatedColumns is only available when the downstream is MQ.", "type": "boolean" }, + "open": { + "description": "OpenProtocol related configurations", + "$ref": "#/definitions/config.OpenProtocolConfig" + }, "protocol": { "description": "Protocol is NOT available when the downstream is DB.", "type": "string" @@ -2519,6 +2543,14 @@ var doc = `{ } } }, + "v2.DebeziumConfig": { + "type": "object", + "properties": { + "output_old_value": { + "type": "boolean" + } + } + }, "v2.DispatchRule": { "type": "object", "properties": { @@ -2842,6 +2874,14 @@ var doc = `{ } } }, + "v2.OpenProtocolConfig": { + "type": "object", + "properties": { + "output_old_value": { + "type": "boolean" + } + } + }, "v2.ProcessorCommonInfo": { "type": "object", "properties": { @@ -3106,6 +3146,9 @@ var doc = `{ "date_separator": { "type": "string" }, + "debezium": { + "$ref": "#/definitions/v2.DebeziumConfig" + }, "debezium_disable_schema": { "type": "boolean" }, @@ -3139,6 +3182,9 @@ var doc = `{ "only_output_updated_columns": { "type": "boolean" }, + "open": { + "$ref": "#/definitions/v2.OpenProtocolConfig" + }, "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 05a3f7e1b14..38f0c2008e5 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1473,6 +1473,14 @@ } } }, + "config.DebeziumConfig": { + "type": "object", + "properties": { + "output-old-value": { + "type": "boolean" + } + } + }, "config.DispatchRule": { "type": "object", "properties": { @@ -1735,6 +1743,14 @@ } } }, + "config.OpenProtocolConfig": { + "type": "object", + "properties": { + "output-old-value": { + "type": "boolean" + } + } + }, "config.PulsarConfig": { "type": "object", "properties": { @@ -1837,6 +1853,10 @@ "description": "DateSeparator is only available when the downstream is Storage.", "type": "string" }, + "debezium": { + "description": "DebeziumConfig related configurations", + "$ref": "#/definitions/config.DebeziumConfig" + }, "debezium-disable-schema": { "description": "Debezium only. Whether schema should be excluded in the output.", "type": "boolean" @@ -1878,6 +1898,10 @@ "description": "OnlyOutputUpdatedColumns is only available when the downstream is MQ.", "type": "boolean" }, + "open": { + "description": "OpenProtocol related configurations", + "$ref": "#/definitions/config.OpenProtocolConfig" + }, "protocol": { "description": "Protocol is NOT available when the downstream is DB.", "type": "string" @@ -2500,6 +2524,14 @@ } } }, + "v2.DebeziumConfig": { + "type": "object", + "properties": { + "output_old_value": { + "type": "boolean" + } + } + }, "v2.DispatchRule": { "type": "object", "properties": { @@ -2823,6 +2855,14 @@ } } }, + "v2.OpenProtocolConfig": { + "type": "object", + "properties": { + "output_old_value": { + "type": "boolean" + } + } + }, "v2.ProcessorCommonInfo": { "type": "object", "properties": { @@ -3087,6 +3127,9 @@ "date_separator": { "type": "string" }, + "debezium": { + "$ref": "#/definitions/v2.DebeziumConfig" + }, "debezium_disable_schema": { "type": "boolean" }, @@ -3120,6 +3163,9 @@ "only_output_updated_columns": { "type": "boolean" }, + "open": { + "$ref": "#/definitions/v2.OpenProtocolConfig" + }, "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 36693b13470..11113b710e8 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -69,6 +69,11 @@ definitions: type: string type: array type: object + config.DebeziumConfig: + properties: + output-old-value: + type: boolean + type: object config.DispatchRule: properties: columns: @@ -249,6 +254,11 @@ definitions: description: OAuth2Scope scope type: string type: object + config.OpenProtocolConfig: + properties: + output-old-value: + type: boolean + type: object config.PulsarConfig: properties: auth-tls-certificate-path: @@ -342,6 +352,9 @@ definitions: date-separator: description: DateSeparator is only available when the downstream is Storage. type: string + debezium: + $ref: '#/definitions/config.DebeziumConfig' + description: DebeziumConfig related configurations debezium-disable-schema: description: Debezium only. Whether schema should be excluded in the output. type: boolean @@ -377,6 +390,9 @@ definitions: description: OnlyOutputUpdatedColumns is only available when the downstream is MQ. type: boolean + open: + $ref: '#/definitions/config.OpenProtocolConfig' + description: OpenProtocol related configurations protocol: description: Protocol is NOT available when the downstream is DB. type: string @@ -807,6 +823,11 @@ definitions: memory_quota_percentage: type: integer type: object + v2.DebeziumConfig: + properties: + output_old_value: + type: boolean + type: object v2.DispatchRule: properties: columns: @@ -1021,6 +1042,11 @@ definitions: write_timeout: type: string type: object + v2.OpenProtocolConfig: + properties: + output_old_value: + type: boolean + type: object v2.ProcessorCommonInfo: properties: capture_id: @@ -1195,6 +1221,8 @@ definitions: $ref: '#/definitions/v2.CSVConfig' date_separator: type: string + debezium: + $ref: '#/definitions/v2.DebeziumConfig' debezium_disable_schema: type: boolean delete_only_output_handle_key_columns: @@ -1217,6 +1245,8 @@ definitions: $ref: '#/definitions/v2.MySQLConfig' only_output_updated_columns: type: boolean + open: + $ref: '#/definitions/v2.OpenProtocolConfig' protocol: type: string pulsar_config: diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index f4eaaa41006..b6ee7dfef06 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -214,6 +214,8 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { SendBootstrapInMsgCount: util.AddressOf(int32(10000)), SendBootstrapToAllPartition: util.AddressOf(true), DebeziumDisableSchema: util.AddressOf(false), + OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true}, + Debezium: &config.DebeziumConfig{OutputOldValue: true}, }, cfg.Sink) } @@ -252,6 +254,8 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { SendBootstrapInMsgCount: util.AddressOf(int32(10000)), SendBootstrapToAllPartition: util.AddressOf(true), DebeziumDisableSchema: util.AddressOf(false), + OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true}, + Debezium: &config.DebeziumConfig{OutputOldValue: true}, }, cfg.Sink) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 198531f695c..5fccf453993 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -70,7 +70,13 @@ const ( "send-bootstrap-interval-in-sec": 120, "send-bootstrap-in-msg-count": 10000, "send-bootstrap-to-all-partition": true, - "debezium-disable-schema": false + "debezium-disable-schema": false, + "open": { + "output-old-value": true + }, + "debezium": { + "output-old-value": true + } }, "consistent": { "level": "none", @@ -331,7 +337,13 @@ const ( "send-bootstrap-interval-in-sec": 120, "send-bootstrap-in-msg-count": 10000, "send-bootstrap-to-all-partition": true, - "debezium-disable-schema": false + "debezium-disable-schema": false, + "open": { + "output-old-value": true + }, + "debezium": { + "output-old-value": true + } }, "consistent": { "level": "none", @@ -499,7 +511,13 @@ const ( "send-bootstrap-interval-in-sec": 120, "send-bootstrap-in-msg-count": 10000, "send-bootstrap-to-all-partition": true, - "debezium-disable-schema": false + "debezium-disable-schema": false, + "open": { + "output-old-value": true + }, + "debezium": { + "output-old-value": true + } }, "consistent": { "level": "none", diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 5f5924363d1..c2c436a61ee 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -78,6 +78,8 @@ var defaultReplicaConfig = &ReplicaConfig{ SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount), SendBootstrapToAllPartition: util.AddressOf(DefaultSendBootstrapToAllPartition), DebeziumDisableSchema: util.AddressOf(false), + OpenProtocol: &OpenProtocolConfig{OutputOldValue: true}, + Debezium: &DebeziumConfig{OutputOldValue: true}, }, Consistent: &ConsistentConfig{ Level: "none", diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 16dba7e9a27..84c83227678 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -144,6 +144,12 @@ func TestReplicaConfigMarshal(t *testing.T) { FileSize: aws.Int(1024), OutputColumnID: aws.Bool(false), } + conf.Sink.Debezium = &DebeziumConfig{ + OutputOldValue: true, + } + conf.Sink.OpenProtocol = &OpenProtocolConfig{ + OutputOldValue: true, + } b, err := conf.Marshal() require.NoError(t, err) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 0fe50106fef..44e1a3dbcd3 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -193,6 +193,12 @@ type SinkConfig struct { // Debezium only. Whether schema should be excluded in the output. DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"` + + // OpenProtocol related configurations + OpenProtocol *OpenProtocolConfig `toml:"open" json:"open,omitempty"` + + // DebeziumConfig related configurations + Debezium *DebeziumConfig `toml:"debezium" json:"debezium,omitempty"` } // MaskSensitiveData masks sensitive data in SinkConfig @@ -923,3 +929,13 @@ func (g *GlueSchemaRegistryConfig) Validate() error { func (g *GlueSchemaRegistryConfig) NoCredentials() bool { return g.AccessKey == "" && g.SecretAccessKey == "" && g.Token == "" } + +// OpenProtocolConfig represents the configurations for open protocol encoding +type OpenProtocolConfig struct { + OutputOldValue bool `toml:"output-old-value" json:"output-old-value"` +} + +// DebeziumConfig represents the configurations for debezium protocol encoding +type DebeziumConfig struct { + OutputOldValue bool `toml:"output-old-value" json:"output-old-value"` +} diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 6a4eec2c5e6..362a004df3b 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -131,6 +131,8 @@ func TestChangefeedStateUpdate(t *testing.T) { SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount, SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition, DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema, + Debezium: config.GetDefaultReplicaConfig().Sink.Debezium, + OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol, }, Consistent: config.GetDefaultReplicaConfig().Consistent, Integrity: config.GetDefaultReplicaConfig().Integrity, @@ -198,6 +200,8 @@ func TestChangefeedStateUpdate(t *testing.T) { SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount, SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition, DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema, + Debezium: config.GetDefaultReplicaConfig().Sink.Debezium, + OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol, }, Scheduler: config.GetDefaultReplicaConfig().Scheduler, Integrity: config.GetDefaultReplicaConfig().Integrity, @@ -271,6 +275,8 @@ func TestChangefeedStateUpdate(t *testing.T) { SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount, SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition, DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema, + Debezium: config.GetDefaultReplicaConfig().Sink.Debezium, + OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol, }, Consistent: config.GetDefaultReplicaConfig().Consistent, Scheduler: config.GetDefaultReplicaConfig().Scheduler, diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index bbd60f7c31e..be97c6a20c5 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -76,6 +76,8 @@ type Config struct { // for open protocol OnlyOutputUpdatedColumns bool + // Whether old value should be excluded in the output. + OpenOutputOldValue bool // for the simple protocol, can be "json" and "avro", default to "json" EncodingFormat EncodingFormatType @@ -85,6 +87,8 @@ type Config struct { // Debezium only. Whether schema should be excluded in the output. DebeziumDisableSchema bool + // Debezium only. Whether before value should be included in the output. + DebeziumOutputOldValue bool } // EncodingFormatType is the type of encoding format @@ -120,6 +124,11 @@ func NewConfig(protocol config.Protocol) *Config { EncodingFormat: EncodingFormatJSON, TimeZone: time.Local, + + // default value is true + DebeziumOutputOldValue: true, + OpenOutputOldValue: true, + DebeziumDisableSchema: false, } } @@ -233,6 +242,12 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er `force-replicate must be disabled, when the large message handle is enabled, large message handle: "%s"`, c.LargeMessageHandle.LargeMessageHandleOption) } + if replicaConfig.Sink.OpenProtocol != nil { + c.OpenOutputOldValue = replicaConfig.Sink.OpenProtocol.OutputOldValue + } + if replicaConfig.Sink.Debezium != nil { + c.DebeziumOutputOldValue = replicaConfig.Sink.Debezium.OutputOldValue + } } if urlParameter.OnlyOutputUpdatedColumns != nil { c.OnlyOutputUpdatedColumns = *urlParameter.OnlyOutputUpdatedColumns diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index 372ac581c3d..22a9bca7940 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -584,7 +584,9 @@ func (c *dbzCodec) EncodeRowChangedEvent( err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreColumns(), e.TableInfo) } else if e.IsUpdate() { jWriter.WriteStringField("op", "u") - err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreColumns(), e.TableInfo) + if c.config.DebeziumOutputOldValue { + err = c.writeDebeziumFieldValues(jWriter, "before", e.GetPreColumns(), e.TableInfo) + } if err == nil { err = c.writeDebeziumFieldValues(jWriter, "after", e.GetColumns(), e.TableInfo) } diff --git a/pkg/sink/codec/debezium/codec_test.go b/pkg/sink/codec/debezium/codec_test.go index bcd7317b12c..6e8529164ce 100644 --- a/pkg/sink/codec/debezium/codec_test.go +++ b/pkg/sink/codec/debezium/codec_test.go @@ -33,6 +33,7 @@ func TestEncodeInsert(t *testing.T) { nowFunc: func() time.Time { return time.Unix(1701326309, 0) }, } codec.config.DebeziumDisableSchema = true + codec.config.DebeziumOutputOldValue = false tableInfo := model.BuildTableInfo("test", "table1", []*model.Column{{ Name: "tiny", @@ -356,6 +357,40 @@ func TestEncodeUpdate(t *testing.T) { } } `, buf.String()) + + codec.config.DebeziumOutputOldValue = false + codec.config.DebeziumDisableSchema = true + buf.Reset() + err = codec.EncodeRowChangedEvent(e, buf) + require.Nil(t, err) + require.JSONEq(t, ` + { + "payload": { + "source": { + "version": "2.4.0.Final", + "connector": "TiCDC", + "name": "test-cluster", + "ts_ms": 0, + "snapshot": "false", + "db": "test", + "table": "table1", + "server_id": 0, + "gtid": null, + "file": "", + "pos": 0, + "row": 0, + "thread": 0, + "query": null, + "commit_ts": 1, + "cluster_id": "test-cluster" + }, + "ts_ms": 1701326309000, + "transaction": null, + "op": "u", + "after": { "tiny": 1 } + } + } + `, buf.String()) } func TestEncodeDelete(t *testing.T) { @@ -364,6 +399,7 @@ func TestEncodeDelete(t *testing.T) { clusterID: "test-cluster", nowFunc: func() time.Time { return time.Unix(1701326309, 0) }, } + codec.config.DebeziumOutputOldValue = false codec.config.DebeziumDisableSchema = true tableInfo := model.BuildTableInfo("test", "table1", []*model.Column{{ diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index 55bbf4c6614..ebf833dfa17 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -205,6 +205,30 @@ var ( Query: "create table person(id int, name varchar(32), tiny tinyint unsigned, comment text, primary key(id))", Type: timodel.ActionCreateTable, } + updateEvent = &model.RowChangedEvent{ + CommitTs: 1, + TableInfo: tableInfo, + Columns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "col1", + Value: []byte("aa"), + }, + { + Name: "col2", + Value: []byte("bb"), + }, + }, tableInfo), + PreColumns: model.Columns2ColumnDatas([]*model.Column{ + { + Name: "col1", + Value: []byte("aaa"), + }, + { + Name: "col2", + Value: []byte("bbb"), + }, + }, tableInfo), + } ) func TestMaxMessageBytes(t *testing.T) { @@ -378,6 +402,7 @@ func TestEncodeDecodeE2E(t *testing.T) { topic := "test" codecConfig := common.NewConfig(config.ProtocolOpen) + codecConfig.OpenOutputOldValue = false builder, err := NewBatchEncoderBuilder(ctx, codecConfig) require.NoError(t, err) encoder := builder.Build() @@ -572,3 +597,34 @@ func TestE2EClaimCheckMessage(t *testing.T) { require.Equal(t, column.Value, decodedColumn.Value) } } + +func TestOutputOldValueFalse(t *testing.T) { + ctx := context.Background() + topic := "test" + + codecConfig := common.NewConfig(config.ProtocolOpen) + codecConfig.OpenOutputOldValue = false + builder, err := NewBatchEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + err = encoder.AppendRowChangedEvent(ctx, topic, updateEvent, func() {}) + require.NoError(t, err) + + message := encoder.Build()[0] + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, hasNext, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeRow) + + decoded, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.Nil(t, decoded.PreColumns) +} diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index a24d5d6e74f..062553292e6 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -122,8 +122,11 @@ func rowChangeToMsg( } } else if e.IsUpdate() { value.Update = rowChangeColumns2CodecColumns(e.GetColumns(), largeMessageOnlyHandleKeyColumns) - value.PreColumns = rowChangeColumns2CodecColumns(e.GetPreColumns(), largeMessageOnlyHandleKeyColumns) - if largeMessageOnlyHandleKeyColumns && (len(value.Update) == 0 || len(value.PreColumns) == 0) { + if config.OpenOutputOldValue { + value.PreColumns = rowChangeColumns2CodecColumns(e.GetPreColumns(), largeMessageOnlyHandleKeyColumns) + } + if largeMessageOnlyHandleKeyColumns && (len(value.Update) == 0 || + (len(value.PreColumns) == 0 && config.OpenOutputOldValue)) { return nil, nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found handle key columns for the update event") } if config.OnlyOutputUpdatedColumns { diff --git a/tests/integration_tests/api_v2/cases.go b/tests/integration_tests/api_v2/cases.go index 1602cf6b0d5..229f6e436a8 100644 --- a/tests/integration_tests/api_v2/cases.go +++ b/tests/integration_tests/api_v2/cases.go @@ -78,6 +78,8 @@ var customReplicaConfig = &ReplicaConfig{ SendBootstrapInMsgCount: util.AddressOf(int32(10000)), SendBootstrapToAllPartition: util.AddressOf(true), DebeziumDisableSchema: util.AddressOf(true), + OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true}, + DebeziumConfig: &DebeziumConfig{OutputOldValue: true}, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false, @@ -129,6 +131,8 @@ var defaultReplicaConfig = &ReplicaConfig{ SendBootstrapInMsgCount: util.AddressOf(int32(10000)), SendBootstrapToAllPartition: util.AddressOf(true), DebeziumDisableSchema: util.AddressOf(false), + OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true}, + DebeziumConfig: &DebeziumConfig{OutputOldValue: true}, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false, diff --git a/tests/integration_tests/api_v2/model.go b/tests/integration_tests/api_v2/model.go index 1e86af8a620..6be3335af63 100644 --- a/tests/integration_tests/api_v2/model.go +++ b/tests/integration_tests/api_v2/model.go @@ -219,21 +219,23 @@ type Table struct { // SinkConfig represents sink config for a changefeed // This is a duplicate of config.SinkConfig type SinkConfig struct { - Protocol string `json:"protocol,omitempty"` - SchemaRegistry string `json:"schema_registry,omitempty"` - CSVConfig *CSVConfig `json:"csv,omitempty"` - DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` - ColumnSelectors []*ColumnSelector `json:"column_selectors,omitempty"` - TxnAtomicity string `json:"transaction_atomicity"` - EncoderConcurrency *int `json:"encoder_concurrency,omitempty"` - Terminator string `json:"terminator"` - DateSeparator string `json:"date_separator,omitempty"` - EnablePartitionSeparator *bool `json:"enable_partition_separator,omitempty"` - ContentCompatible *bool `json:"content_compatible"` - SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"` - SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"` - SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"` - DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"` + Protocol string `json:"protocol,omitempty"` + SchemaRegistry string `json:"schema_registry,omitempty"` + CSVConfig *CSVConfig `json:"csv,omitempty"` + DispatchRules []*DispatchRule `json:"dispatchers,omitempty"` + ColumnSelectors []*ColumnSelector `json:"column_selectors,omitempty"` + TxnAtomicity string `json:"transaction_atomicity"` + EncoderConcurrency *int `json:"encoder_concurrency,omitempty"` + Terminator string `json:"terminator"` + DateSeparator string `json:"date_separator,omitempty"` + EnablePartitionSeparator *bool `json:"enable_partition_separator,omitempty"` + ContentCompatible *bool `json:"content_compatible"` + SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"` + SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"` + SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"` + DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"` + DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"` + OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"` } // CSVConfig denotes the csv config @@ -370,3 +372,13 @@ type Capture struct { AdvertiseAddr string `json:"address"` ClusterID string `json:"cluster_id"` } + +// OpenProtocolConfig represents the configurations for open protocol encoding +type OpenProtocolConfig struct { + OutputOldValue bool `json:"output_old_value"` +} + +// DebeziumConfig represents the configurations for debezium protocol encoding +type DebeziumConfig struct { + OutputOldValue bool `json:"output_old_value"` +}