Skip to content

Commit

Permalink
config(ticdc): add large message handle option config (#9347)
Browse files Browse the repository at this point in the history
close #9346
  • Loading branch information
3AceShowHand authored Jul 11, 2023
1 parent e6d77bf commit 5ac4bda
Show file tree
Hide file tree
Showing 20 changed files with 407 additions and 142 deletions.
101 changes: 64 additions & 37 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode,
}
}

var largeMessageHandle *config.LargeMessageHandleConfig
if c.Sink.KafkaConfig.LargeMessageHandle != nil {
oldConfig := c.Sink.KafkaConfig.LargeMessageHandle
largeMessageHandle = &config.LargeMessageHandleConfig{
LargeMessageHandleOption: oldConfig.LargeMessageHandleOption,
ClaimCheckStorageURI: oldConfig.ClaimCheckStorageURI,
}
}

kafkaConfig = &config.KafkaConfig{
PartitionNum: c.Sink.KafkaConfig.PartitionNum,
ReplicationFactor: c.Sink.KafkaConfig.ReplicationFactor,
Expand Down Expand Up @@ -320,6 +330,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
Key: c.Sink.KafkaConfig.Key,
InsecureSkipVerify: c.Sink.KafkaConfig.InsecureSkipVerify,
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
}
}
var mysqlConfig *config.MySQLConfig
Expand Down Expand Up @@ -365,7 +376,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: c.Sink.DeleteOnlyOutputHandleKeyColumns,
LargeMessageOnlyHandleKeyColumns: c.Sink.LargeMessageOnlyHandleKeyColumns,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
Expand Down Expand Up @@ -500,6 +510,16 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode,
}
}

var largeMessageHandle *LargeMessageHandleConfig
if cloned.Sink.KafkaConfig.LargeMessageHandle != nil {
oldConfig := cloned.Sink.KafkaConfig.LargeMessageHandle
largeMessageHandle = &LargeMessageHandleConfig{
LargeMessageHandleOption: oldConfig.LargeMessageHandleOption,
ClaimCheckStorageURI: oldConfig.ClaimCheckStorageURI,
}
}

kafkaConfig = &KafkaConfig{
PartitionNum: cloned.Sink.KafkaConfig.PartitionNum,
ReplicationFactor: cloned.Sink.KafkaConfig.ReplicationFactor,
Expand Down Expand Up @@ -535,6 +555,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Key: cloned.Sink.KafkaConfig.Key,
InsecureSkipVerify: cloned.Sink.KafkaConfig.InsecureSkipVerify,
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
}
}
var mysqlConfig *MySQLConfig
Expand Down Expand Up @@ -580,7 +601,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2,
OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: cloned.Sink.DeleteOnlyOutputHandleKeyColumns,
LargeMessageOnlyHandleKeyColumns: cloned.Sink.LargeMessageOnlyHandleKeyColumns,
KafkaConfig: kafkaConfig,
MySQLConfig: mysqlConfig,
CloudStorageConfig: cloudStorageConfig,
Expand Down Expand Up @@ -737,7 +757,6 @@ type SinkConfig struct {
EnableKafkaSinkV2 *bool `json:"enable_kafka_sink_v2,omitempty"`
OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns,omitempty"`
DeleteOnlyOutputHandleKeyColumns *bool `json:"delete_only_output_handle_key_columns"`
LargeMessageOnlyHandleKeyColumns *bool `json:"large_message_only_handle_key_columns"`
SafeMode *bool `json:"safe_mode,omitempty"`
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
Expand All @@ -753,6 +772,13 @@ type CSVConfig struct {
IncludeCommitTs bool `json:"include_commit_ts"`
}

// LargeMessageHandleConfig denotes the large message handling config
// This is the same as config.LargeMessageHandleConfig
type LargeMessageHandleConfig struct {
LargeMessageHandleOption string `json:"large_message_handle_option"`
ClaimCheckStorageURI string `json:"claim_check_storage_uri"`
}

// DispatchRule represents partition rule for a table
// This is a duplicate of config.DispatchRule
type DispatchRule struct {
Expand Down Expand Up @@ -928,40 +954,41 @@ type CodecConfig struct {

// KafkaConfig represents a kafka sink configuration
type KafkaConfig struct {
PartitionNum *int32 `json:"partition_num,omitempty"`
ReplicationFactor *int16 `json:"replication_factor,omitempty"`
KafkaVersion *string `json:"kafka_version,omitempty"`
MaxMessageBytes *int `json:"max_message_bytes,omitempty"`
Compression *string `json:"compression,omitempty"`
KafkaClientID *string `json:"kafka_client_id,omitempty"`
AutoCreateTopic *bool `json:"auto_create_topic,omitempty"`
DialTimeout *string `json:"dial_timeout,omitempty"`
WriteTimeout *string `json:"write_timeout,omitempty"`
ReadTimeout *string `json:"read_timeout,omitempty"`
RequiredAcks *int `json:"required_acks,omitempty"`
SASLUser *string `json:"sasl_user,omitempty"`
SASLPassword *string `json:"sasl_password,omitempty"`
SASLMechanism *string `json:"sasl_mechanism,omitempty"`
SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty"`
SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty"`
SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty"`
SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty"`
SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty"`
SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty"`
SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty"`
SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty"`
SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"`
SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"`
SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"`
SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"`
SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"`
SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"`
EnableTLS *bool `json:"enable_tls,omitempty"`
CA *string `json:"ca,omitempty"`
Cert *string `json:"cert,omitempty"`
Key *string `json:"key,omitempty"`
InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"`
CodecConfig *CodecConfig `json:"codec_config,omitempty"`
PartitionNum *int32 `json:"partition_num,omitempty"`
ReplicationFactor *int16 `json:"replication_factor,omitempty"`
KafkaVersion *string `json:"kafka_version,omitempty"`
MaxMessageBytes *int `json:"max_message_bytes,omitempty"`
Compression *string `json:"compression,omitempty"`
KafkaClientID *string `json:"kafka_client_id,omitempty"`
AutoCreateTopic *bool `json:"auto_create_topic,omitempty"`
DialTimeout *string `json:"dial_timeout,omitempty"`
WriteTimeout *string `json:"write_timeout,omitempty"`
ReadTimeout *string `json:"read_timeout,omitempty"`
RequiredAcks *int `json:"required_acks,omitempty"`
SASLUser *string `json:"sasl_user,omitempty"`
SASLPassword *string `json:"sasl_password,omitempty"`
SASLMechanism *string `json:"sasl_mechanism,omitempty"`
SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty"`
SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty"`
SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty"`
SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty"`
SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty"`
SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty"`
SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty"`
SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty"`
SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"`
SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"`
SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"`
SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"`
SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"`
SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"`
EnableTLS *bool `json:"enable_tls,omitempty"`
CA *string `json:"ca,omitempty"`
Cert *string `json:"cert,omitempty"`
Key *string `json:"key,omitempty"`
InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"`
CodecConfig *CodecConfig `json:"codec_config,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
}

// MySQLConfig represents a MySQL sink configuration
Expand Down
1 change: 0 additions & 1 deletion cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ var defaultAPIConfig = &ReplicaConfig{
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
LargeMessageOnlyHandleKeyColumns: util.AddressOf(false),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
1 change: 0 additions & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ func (info *ChangeFeedInfo) rmMQOnlyFields() {
info.Config.Sink.EnableKafkaSinkV2 = nil
info.Config.Sink.OnlyOutputUpdatedColumns = nil
info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil
info.Config.Sink.LargeMessageOnlyHandleKeyColumns = nil
info.Config.Sink.KafkaConfig = nil
}

Expand Down
3 changes: 3 additions & 0 deletions cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ func TestWriteCheckpointTsWhenCanalJsonTiDBExtensionIsDisable(t *testing.T) {
sinkURI, err := url.Parse(uri)
require.NoError(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{
LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(),
}
require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI))

ctx = context.WithValue(ctx, "testing.T", t)
Expand Down
35 changes: 28 additions & 7 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,9 @@ var doc = `{
"key": {
"type": "string"
},
"large-message-handle": {
"$ref": "#/definitions/config.LargeMessageHandleConfig"
},
"max-message-bytes": {
"type": "integer"
},
Expand Down Expand Up @@ -1546,6 +1549,17 @@ var doc = `{
}
}
},
"config.LargeMessageHandleConfig": {
"type": "object",
"properties": {
"claim-check-storage-uri": {
"type": "string"
},
"large-message-handle-option": {
"type": "string"
}
}
},
"config.MySQLConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1647,10 +1661,6 @@ var doc = `{
"kafka-config": {
"$ref": "#/definitions/config.KafkaConfig"
},
"large-message-only-handle-key-columns": {
"description": "LargeMessageOnlyHandleKeyColumns is only available when the downstream is MQ.",
"type": "boolean"
},
"mysql-config": {
"$ref": "#/definitions/config.MySQLConfig"
},
Expand Down Expand Up @@ -2370,6 +2380,9 @@ var doc = `{
"key": {
"type": "string"
},
"large_message_handle": {
"$ref": "#/definitions/v2.LargeMessageHandleConfig"
},
"max_message_bytes": {
"type": "integer"
},
Expand Down Expand Up @@ -2444,6 +2457,17 @@ var doc = `{
}
}
},
"v2.LargeMessageHandleConfig": {
"type": "object",
"properties": {
"claim_check_storage_uri": {
"type": "string"
},
"large_message_handle_option": {
"type": "string"
}
}
},
"v2.LogLevelReq": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2703,9 +2727,6 @@ var doc = `{
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"large_message_only_handle_key_columns": {
"type": "boolean"
},
"mysql_config": {
"$ref": "#/definitions/v2.MySQLConfig"
},
Expand Down
35 changes: 28 additions & 7 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,9 @@
"key": {
"type": "string"
},
"large-message-handle": {
"$ref": "#/definitions/config.LargeMessageHandleConfig"
},
"max-message-bytes": {
"type": "integer"
},
Expand Down Expand Up @@ -1527,6 +1530,17 @@
}
}
},
"config.LargeMessageHandleConfig": {
"type": "object",
"properties": {
"claim-check-storage-uri": {
"type": "string"
},
"large-message-handle-option": {
"type": "string"
}
}
},
"config.MySQLConfig": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -1628,10 +1642,6 @@
"kafka-config": {
"$ref": "#/definitions/config.KafkaConfig"
},
"large-message-only-handle-key-columns": {
"description": "LargeMessageOnlyHandleKeyColumns is only available when the downstream is MQ.",
"type": "boolean"
},
"mysql-config": {
"$ref": "#/definitions/config.MySQLConfig"
},
Expand Down Expand Up @@ -2351,6 +2361,9 @@
"key": {
"type": "string"
},
"large_message_handle": {
"$ref": "#/definitions/v2.LargeMessageHandleConfig"
},
"max_message_bytes": {
"type": "integer"
},
Expand Down Expand Up @@ -2425,6 +2438,17 @@
}
}
},
"v2.LargeMessageHandleConfig": {
"type": "object",
"properties": {
"claim_check_storage_uri": {
"type": "string"
},
"large_message_handle_option": {
"type": "string"
}
}
},
"v2.LogLevelReq": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2684,9 +2708,6 @@
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"large_message_only_handle_key_columns": {
"type": "boolean"
},
"mysql_config": {
"$ref": "#/definitions/v2.MySQLConfig"
},
Expand Down
Loading

0 comments on commit 5ac4bda

Please sign in to comment.