Skip to content

Commit

Permalink
sink(ticdc): Add output-old-value config (#10915) (#10974)
Browse files Browse the repository at this point in the history
close #10916
  • Loading branch information
ti-chi-bot authored May 8, 2024
1 parent fd3a6e7 commit 6fd9cf2
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 20 deletions.
21 changes: 20 additions & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
}
}
var openProtocolConfig *config.OpenProtocolConfig
if c.Sink.OpenProtocolConfig != nil {
openProtocolConfig = &config.OpenProtocolConfig{
OutputOldValue: c.Sink.OpenProtocolConfig.OutputOldValue,
}
}

res.Sink = &config.SinkConfig{
DispatchRules: dispatchRules,
Expand All @@ -478,6 +484,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
PulsarConfig: pulsarConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: c.Sink.SafeMode,
OpenProtocol: openProtocolConfig,
}

if c.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -759,7 +766,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
}
}

var openProtocolConfig *OpenProtocolConfig
if cloned.Sink.OpenProtocol != nil {
openProtocolConfig = &OpenProtocolConfig{
OutputOldValue: cloned.Sink.OpenProtocol.OutputOldValue,
}
}
res.Sink = &SinkConfig{
Protocol: cloned.Sink.Protocol,
SchemaRegistry: cloned.Sink.SchemaRegistry,
Expand All @@ -779,6 +791,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
PulsarConfig: pulsarConfig,
CloudStorageConfig: cloudStorageConfig,
SafeMode: cloned.Sink.SafeMode,
OpenProtocolConfig: openProtocolConfig,
}

if cloned.Sink.TxnAtomicity != nil {
Expand Down Expand Up @@ -975,6 +988,7 @@ type SinkConfig struct {
SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"`
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down Expand Up @@ -1316,3 +1330,8 @@ type GlueSchemaRegistryConfig struct {
SecretAccessKey string `json:"secret_access_key,omitempty"`
Token string `json:"token,omitempty"`
}

// OpenProtocolConfig represents the configurations for open protocol encoding
type OpenProtocolConfig struct {
OutputOldValue bool `json:"output_old_value"`
}
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var defaultAPIConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true},
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
23 changes: 23 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,14 @@ var doc = `{
}
}
},
"config.OpenProtocolConfig": {
"type": "object",
"properties": {
"output-old-value": {
"type": "boolean"
}
}
},
"config.PulsarConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1878,6 +1886,10 @@ var doc = `{
"description": "OnlyOutputUpdatedColumns is only available when the downstream is MQ.",
"type": "boolean"
},
"open": {
"description": "OpenProtocol related configurations",
"$ref": "#/definitions/config.OpenProtocolConfig"
},
"protocol": {
"description": "Protocol is NOT available when the downstream is DB.",
"type": "string"
Expand Down Expand Up @@ -2833,6 +2845,14 @@ var doc = `{
}
}
},
"v2.OpenProtocolConfig": {
"type": "object",
"properties": {
"output_old_value": {
"type": "boolean"
}
}
},
"v2.ProcessorCommonInfo": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -3120,6 +3140,9 @@ var doc = `{
"only_output_updated_columns": {
"type": "boolean"
},
"open": {
"$ref": "#/definitions/v2.OpenProtocolConfig"
},
"protocol": {
"type": "string"
},
Expand Down
23 changes: 23 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1724,6 +1724,14 @@
}
}
},
"config.OpenProtocolConfig": {
"type": "object",
"properties": {
"output-old-value": {
"type": "boolean"
}
}
},
"config.PulsarConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1859,6 +1867,10 @@
"description": "OnlyOutputUpdatedColumns is only available when the downstream is MQ.",
"type": "boolean"
},
"open": {
"description": "OpenProtocol related configurations",
"$ref": "#/definitions/config.OpenProtocolConfig"
},
"protocol": {
"description": "Protocol is NOT available when the downstream is DB.",
"type": "string"
Expand Down Expand Up @@ -2814,6 +2826,14 @@
}
}
},
"v2.OpenProtocolConfig": {
"type": "object",
"properties": {
"output_old_value": {
"type": "boolean"
}
}
},
"v2.ProcessorCommonInfo": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -3101,6 +3121,9 @@
"only_output_updated_columns": {
"type": "boolean"
},
"open": {
"$ref": "#/definitions/v2.OpenProtocolConfig"
},
"protocol": {
"type": "string"
},
Expand Down
15 changes: 15 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ definitions:
description: OAuth2Scope scope
type: string
type: object
config.OpenProtocolConfig:
properties:
output-old-value:
type: boolean
type: object
config.PulsarConfig:
properties:
auth-tls-certificate-path:
Expand Down Expand Up @@ -360,6 +365,9 @@ definitions:
description: OnlyOutputUpdatedColumns is only available when the downstream
is MQ.
type: boolean
open:
$ref: '#/definitions/config.OpenProtocolConfig'
description: OpenProtocol related configurations
protocol:
description: Protocol is NOT available when the downstream is DB.
type: string
Expand Down Expand Up @@ -1003,6 +1011,11 @@ definitions:
write_timeout:
type: string
type: object
v2.OpenProtocolConfig:
properties:
output_old_value:
type: boolean
type: object
v2.ProcessorCommonInfo:
properties:
capture_id:
Expand Down Expand Up @@ -1192,6 +1205,8 @@ definitions:
$ref: '#/definitions/v2.MySQLConfig'
only_output_updated_columns:
type: boolean
open:
$ref: '#/definitions/v2.OpenProtocolConfig'
protocol:
type: string
pulsar_config:
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
}, cfg.Sink)
}

Expand Down Expand Up @@ -253,6 +254,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
}, cfg.Sink)
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ const (
"advance-timeout-in-sec": 150,
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true
"send-bootstrap-to-all-partition": true,
"open": {
"output-old-value": true
}
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -311,7 +314,10 @@ const (
"advance-timeout-in-sec": 150,
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true
"send-bootstrap-to-all-partition": true,
"open": {
"output-old-value": true
}
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -474,7 +480,10 @@ const (
"advance-timeout-in-sec": 150,
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true
"send-bootstrap-to-all-partition": true,
"open": {
"output-old-value": true
}
},
"consistent": {
"level": "none",
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var defaultReplicaConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec),
SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount),
SendBootstrapToAllPartition: util.AddressOf(DefaultSendBootstrapToAllPartition),
OpenProtocol: &OpenProtocolConfig{OutputOldValue: true},
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func TestReplicaConfigMarshal(t *testing.T) {
FileSize: aws.Int(1024),
OutputColumnID: aws.Bool(false),
}
conf.Sink.OpenProtocol = &OpenProtocolConfig{
OutputOldValue: true,
}

b, err := conf.Marshal()
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ type SinkConfig struct {
// If set to false, bootstrap message will only be sent to the first partition of each topic.
// Default value is true.
SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"`

// OpenProtocol related configurations
OpenProtocol *OpenProtocolConfig `toml:"open" json:"open,omitempty"`
}

// MaskSensitiveData masks sensitive data in SinkConfig
Expand Down Expand Up @@ -901,3 +904,8 @@ func (g *GlueSchemaRegistryConfig) Validate() error {
func (g *GlueSchemaRegistryConfig) NoCredentials() bool {
return g.AccessKey == "" && g.SecretAccessKey == "" && g.Token == ""
}

// OpenProtocolConfig represents the configurations for open protocol encoding
type OpenProtocolConfig struct {
OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
}
3 changes: 3 additions & 0 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition,
OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol,
},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Expand Down Expand Up @@ -195,6 +196,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition,
OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol,
},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Expand Down Expand Up @@ -267,6 +269,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec,
SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount,
SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition,
OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol,
},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Expand Down
8 changes: 8 additions & 0 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type Config struct {
TimeZone *time.Location
// for the simple protocol, can be "json" and "avro", default to "json"
EncodingFormat EncodingFormatType
// Whether old value should be excluded in the output.
OpenOutputOldValue bool
}

// EncodingFormatType is the type of encoding format
Expand Down Expand Up @@ -109,6 +111,9 @@ func NewConfig(protocol config.Protocol) *Config {

EncodingFormat: EncodingFormatJSON,
TimeZone: time.Local,

// default value is true
OpenOutputOldValue: true,
}
}

Expand Down Expand Up @@ -217,6 +222,9 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er
`force-replicate must be disabled, when the large message handle is enabled, large message handle: "%s"`,
c.LargeMessageHandle.LargeMessageHandleOption)
}
if replicaConfig.Sink.OpenProtocol != nil {
c.OpenOutputOldValue = replicaConfig.Sink.OpenProtocol.OutputOldValue
}
}
if urlParameter.OnlyOutputUpdatedColumns != nil {
c.OnlyOutputUpdatedColumns = *urlParameter.OnlyOutputUpdatedColumns
Expand Down
Loading

0 comments on commit 6fd9cf2

Please sign in to comment.