Skip to content

Commit

Permalink
sink(ticdc): Add output-old-value config (#10915)
Browse files Browse the repository at this point in the history
close #10916
  • Loading branch information
sdojjy authored Apr 24, 2024
1 parent 344a5d8 commit 295a39a
Show file tree
Hide file tree
Showing 18 changed files with 348 additions and 22 deletions.
41 changes: 40 additions & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,18 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
}
}
var debeziumConfig *config.DebeziumConfig
if c.Sink.DebeziumConfig != nil {
debeziumConfig = &config.DebeziumConfig{
OutputOldValue: c.Sink.DebeziumConfig.OutputOldValue,
}
}
var openProtocolConfig *config.OpenProtocolConfig
if c.Sink.OpenProtocolConfig != nil {
openProtocolConfig = &config.OpenProtocolConfig{
OutputOldValue: c.Sink.OpenProtocolConfig.OutputOldValue,
}
}

res.Sink = &config.SinkConfig{
DispatchRules: dispatchRules,
Expand All @@ -457,6 +469,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
PulsarConfig: pulsarConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: c.Sink.SafeMode,
OpenProtocol: openProtocolConfig,
Debezium: debeziumConfig,
}

if c.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -717,7 +731,18 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
}
}

var debeziumConfig *DebeziumConfig
if cloned.Sink.Debezium != nil {
debeziumConfig = &DebeziumConfig{
OutputOldValue: cloned.Sink.Debezium.OutputOldValue,
}
}
var openProtocolConfig *OpenProtocolConfig
if cloned.Sink.OpenProtocol != nil {
openProtocolConfig = &OpenProtocolConfig{
OutputOldValue: cloned.Sink.OpenProtocol.OutputOldValue,
}
}
res.Sink = &SinkConfig{
Protocol: cloned.Sink.Protocol,
SchemaRegistry: cloned.Sink.SchemaRegistry,
Expand All @@ -738,6 +763,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
PulsarConfig: pulsarConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: cloned.Sink.SafeMode,
DebeziumConfig: debeziumConfig,
OpenProtocolConfig: openProtocolConfig,
}

if cloned.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -925,6 +952,8 @@ type SinkConfig struct {
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"`
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down Expand Up @@ -1267,3 +1296,13 @@ type GlueSchemaRegistryConfig struct {
SecretAccessKey string `json:"secret_access_key,omitempty"`
Token string `json:"token,omitempty"`
}

// OpenProtocolConfig represents the configurations for open protocol encoding
type OpenProtocolConfig struct {
OutputOldValue bool `json:"output_old_value"`
}

// DebeziumConfig represents the configurations for debezium protocol encoding
type DebeziumConfig struct {
OutputOldValue bool `json:"output_old_value"`
}
2 changes: 2 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ var defaultAPIConfig = &ReplicaConfig{
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true},
DebeziumConfig: &DebeziumConfig{OutputOldValue: true},
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
46 changes: 46 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,14 @@ var doc = `{
}
}
},
"config.DebeziumConfig": {
"type": "object",
"properties": {
"output-old-value": {
"type": "boolean"
}
}
},
"config.DispatchRule": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1754,6 +1762,14 @@ var doc = `{
}
}
},
"config.OpenProtocolConfig": {
"type": "object",
"properties": {
"output-old-value": {
"type": "boolean"
}
}
},
"config.PulsarConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1856,6 +1872,10 @@ var doc = `{
"description": "DateSeparator is only available when the downstream is Storage.",
"type": "string"
},
"debezium": {
"description": "DebeziumConfig related configurations",
"$ref": "#/definitions/config.DebeziumConfig"
},
"debezium-disable-schema": {
"description": "Debezium only. Whether schema should be excluded in the output.",
"type": "boolean"
Expand Down Expand Up @@ -1897,6 +1917,10 @@ var doc = `{
"description": "OnlyOutputUpdatedColumns is only available when the downstream is MQ.",
"type": "boolean"
},
"open": {
"description": "OpenProtocol related configurations",
"$ref": "#/definitions/config.OpenProtocolConfig"
},
"protocol": {
"description": "Protocol is NOT available when the downstream is DB.",
"type": "string"
Expand Down Expand Up @@ -2519,6 +2543,14 @@ var doc = `{
}
}
},
"v2.DebeziumConfig": {
"type": "object",
"properties": {
"output_old_value": {
"type": "boolean"
}
}
},
"v2.DispatchRule": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2842,6 +2874,14 @@ var doc = `{
}
}
},
"v2.OpenProtocolConfig": {
"type": "object",
"properties": {
"output_old_value": {
"type": "boolean"
}
}
},
"v2.ProcessorCommonInfo": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -3106,6 +3146,9 @@ var doc = `{
"date_separator": {
"type": "string"
},
"debezium": {
"$ref": "#/definitions/v2.DebeziumConfig"
},
"debezium_disable_schema": {
"type": "boolean"
},
Expand Down Expand Up @@ -3139,6 +3182,9 @@ var doc = `{
"only_output_updated_columns": {
"type": "boolean"
},
"open": {
"$ref": "#/definitions/v2.OpenProtocolConfig"
},
"protocol": {
"type": "string"
},
Expand Down
46 changes: 46 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,14 @@
}
}
},
"config.DebeziumConfig": {
"type": "object",
"properties": {
"output-old-value": {
"type": "boolean"
}
}
},
"config.DispatchRule": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1735,6 +1743,14 @@
}
}
},
"config.OpenProtocolConfig": {
"type": "object",
"properties": {
"output-old-value": {
"type": "boolean"
}
}
},
"config.PulsarConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1837,6 +1853,10 @@
"description": "DateSeparator is only available when the downstream is Storage.",
"type": "string"
},
"debezium": {
"description": "DebeziumConfig related configurations",
"$ref": "#/definitions/config.DebeziumConfig"
},
"debezium-disable-schema": {
"description": "Debezium only. Whether schema should be excluded in the output.",
"type": "boolean"
Expand Down Expand Up @@ -1878,6 +1898,10 @@
"description": "OnlyOutputUpdatedColumns is only available when the downstream is MQ.",
"type": "boolean"
},
"open": {
"description": "OpenProtocol related configurations",
"$ref": "#/definitions/config.OpenProtocolConfig"
},
"protocol": {
"description": "Protocol is NOT available when the downstream is DB.",
"type": "string"
Expand Down Expand Up @@ -2500,6 +2524,14 @@
}
}
},
"v2.DebeziumConfig": {
"type": "object",
"properties": {
"output_old_value": {
"type": "boolean"
}
}
},
"v2.DispatchRule": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2823,6 +2855,14 @@
}
}
},
"v2.OpenProtocolConfig": {
"type": "object",
"properties": {
"output_old_value": {
"type": "boolean"
}
}
},
"v2.ProcessorCommonInfo": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -3087,6 +3127,9 @@
"date_separator": {
"type": "string"
},
"debezium": {
"$ref": "#/definitions/v2.DebeziumConfig"
},
"debezium_disable_schema": {
"type": "boolean"
},
Expand Down Expand Up @@ -3120,6 +3163,9 @@
"only_output_updated_columns": {
"type": "boolean"
},
"open": {
"$ref": "#/definitions/v2.OpenProtocolConfig"
},
"protocol": {
"type": "string"
},
Expand Down
30 changes: 30 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ definitions:
type: string
type: array
type: object
config.DebeziumConfig:
properties:
output-old-value:
type: boolean
type: object
config.DispatchRule:
properties:
columns:
Expand Down Expand Up @@ -249,6 +254,11 @@ definitions:
description: OAuth2Scope scope
type: string
type: object
config.OpenProtocolConfig:
properties:
output-old-value:
type: boolean
type: object
config.PulsarConfig:
properties:
auth-tls-certificate-path:
Expand Down Expand Up @@ -342,6 +352,9 @@ definitions:
date-separator:
description: DateSeparator is only available when the downstream is Storage.
type: string
debezium:
$ref: '#/definitions/config.DebeziumConfig'
description: DebeziumConfig related configurations
debezium-disable-schema:
description: Debezium only. Whether schema should be excluded in the output.
type: boolean
Expand Down Expand Up @@ -377,6 +390,9 @@ definitions:
description: OnlyOutputUpdatedColumns is only available when the downstream
is MQ.
type: boolean
open:
$ref: '#/definitions/config.OpenProtocolConfig'
description: OpenProtocol related configurations
protocol:
description: Protocol is NOT available when the downstream is DB.
type: string
Expand Down Expand Up @@ -807,6 +823,11 @@ definitions:
memory_quota_percentage:
type: integer
type: object
v2.DebeziumConfig:
properties:
output_old_value:
type: boolean
type: object
v2.DispatchRule:
properties:
columns:
Expand Down Expand Up @@ -1021,6 +1042,11 @@ definitions:
write_timeout:
type: string
type: object
v2.OpenProtocolConfig:
properties:
output_old_value:
type: boolean
type: object
v2.ProcessorCommonInfo:
properties:
capture_id:
Expand Down Expand Up @@ -1195,6 +1221,8 @@ definitions:
$ref: '#/definitions/v2.CSVConfig'
date_separator:
type: string
debezium:
$ref: '#/definitions/v2.DebeziumConfig'
debezium_disable_schema:
type: boolean
delete_only_output_handle_key_columns:
Expand All @@ -1217,6 +1245,8 @@ definitions:
$ref: '#/definitions/v2.MySQLConfig'
only_output_updated_columns:
type: boolean
open:
$ref: '#/definitions/v2.OpenProtocolConfig'
protocol:
type: string
pulsar_config:
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
}, cfg.Sink)
}

Expand Down Expand Up @@ -252,6 +254,8 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
}, cfg.Sink)
}

Expand Down
Loading

0 comments on commit 295a39a

Please sign in to comment.