From 5ac4bda135ab5b4efb9c1601ded83f4db9dee0ab Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 11 Jul 2023 17:29:13 +0800 Subject: [PATCH] config(ticdc): add large message handle option config (#9347) close pingcap/tiflow#9346 --- cdc/api/v2/model.go | 101 ++++++++----- cdc/api/v2/model_test.go | 1 - cdc/model/changefeed.go | 1 - cdc/sink/ddlsink/mq/mq_ddl_sink_test.go | 3 + docs/swagger/docs.go | 35 ++++- docs/swagger/swagger.json | 35 ++++- docs/swagger/swagger.yaml | 24 +++- pkg/cmd/cli/cli_changefeed_create.go | 2 +- pkg/cmd/util/helper_test.go | 2 - pkg/config/config_test_data.go | 15 +- pkg/config/replica_config.go | 7 +- pkg/config/replica_config_test.go | 20 +-- pkg/config/sink.go | 135 +++++++++++++----- pkg/sink/codec/common/config.go | 21 +-- pkg/sink/codec/common/config_test.go | 111 +++++++++++++- .../codec/open/open_protocol_decoder_test.go | 12 +- pkg/sink/codec/open/open_protocol_encoder.go | 2 +- .../codec/open/open_protocol_encoder_test.go | 13 +- pkg/sink/kafka/options_test.go | 6 +- pkg/sink/kafka/v2/factory_test.go | 3 +- 20 files changed, 407 insertions(+), 142 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index cdc2ead49aa..3a2e879bec9 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -285,6 +285,16 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode, } } + + var largeMessageHandle *config.LargeMessageHandleConfig + if c.Sink.KafkaConfig.LargeMessageHandle != nil { + oldConfig := c.Sink.KafkaConfig.LargeMessageHandle + largeMessageHandle = &config.LargeMessageHandleConfig{ + LargeMessageHandleOption: oldConfig.LargeMessageHandleOption, + ClaimCheckStorageURI: oldConfig.ClaimCheckStorageURI, + } + } + kafkaConfig = &config.KafkaConfig{ PartitionNum: c.Sink.KafkaConfig.PartitionNum, ReplicationFactor: c.Sink.KafkaConfig.ReplicationFactor, @@ -320,6 +330,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( Key: c.Sink.KafkaConfig.Key, InsecureSkipVerify: c.Sink.KafkaConfig.InsecureSkipVerify, CodecConfig: codeConfig, + LargeMessageHandle: largeMessageHandle, } } var mysqlConfig *config.MySQLConfig @@ -365,7 +376,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( EnableKafkaSinkV2: c.Sink.EnableKafkaSinkV2, OnlyOutputUpdatedColumns: c.Sink.OnlyOutputUpdatedColumns, DeleteOnlyOutputHandleKeyColumns: c.Sink.DeleteOnlyOutputHandleKeyColumns, - LargeMessageOnlyHandleKeyColumns: c.Sink.LargeMessageOnlyHandleKeyColumns, KafkaConfig: kafkaConfig, MySQLConfig: mysqlConfig, CloudStorageConfig: cloudStorageConfig, @@ -500,6 +510,16 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode, } } + + var largeMessageHandle *LargeMessageHandleConfig + if cloned.Sink.KafkaConfig.LargeMessageHandle != nil { + oldConfig := cloned.Sink.KafkaConfig.LargeMessageHandle + largeMessageHandle = &LargeMessageHandleConfig{ + LargeMessageHandleOption: oldConfig.LargeMessageHandleOption, + ClaimCheckStorageURI: oldConfig.ClaimCheckStorageURI, + } + } + kafkaConfig = &KafkaConfig{ PartitionNum: cloned.Sink.KafkaConfig.PartitionNum, ReplicationFactor: cloned.Sink.KafkaConfig.ReplicationFactor, @@ -535,6 +555,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Key: cloned.Sink.KafkaConfig.Key, InsecureSkipVerify: cloned.Sink.KafkaConfig.InsecureSkipVerify, CodecConfig: codeConfig, + LargeMessageHandle: largeMessageHandle, } } var mysqlConfig *MySQLConfig @@ -580,7 +601,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { EnableKafkaSinkV2: cloned.Sink.EnableKafkaSinkV2, OnlyOutputUpdatedColumns: cloned.Sink.OnlyOutputUpdatedColumns, DeleteOnlyOutputHandleKeyColumns: cloned.Sink.DeleteOnlyOutputHandleKeyColumns, - LargeMessageOnlyHandleKeyColumns: cloned.Sink.LargeMessageOnlyHandleKeyColumns, KafkaConfig: kafkaConfig, MySQLConfig: mysqlConfig, CloudStorageConfig: cloudStorageConfig, @@ -737,7 +757,6 @@ type SinkConfig struct { EnableKafkaSinkV2 *bool `json:"enable_kafka_sink_v2,omitempty"` OnlyOutputUpdatedColumns *bool `json:"only_output_updated_columns,omitempty"` DeleteOnlyOutputHandleKeyColumns *bool `json:"delete_only_output_handle_key_columns"` - LargeMessageOnlyHandleKeyColumns *bool `json:"large_message_only_handle_key_columns"` SafeMode *bool `json:"safe_mode,omitempty"` KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"` MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"` @@ -753,6 +772,13 @@ type CSVConfig struct { IncludeCommitTs bool `json:"include_commit_ts"` } +// LargeMessageHandleConfig denotes the large message handling config +// This is the same as config.LargeMessageHandleConfig +type LargeMessageHandleConfig struct { + LargeMessageHandleOption string `json:"large_message_handle_option"` + ClaimCheckStorageURI string `json:"claim_check_storage_uri"` +} + // DispatchRule represents partition rule for a table // This is a duplicate of config.DispatchRule type DispatchRule struct { @@ -928,40 +954,41 @@ type CodecConfig struct { // KafkaConfig represents a kafka sink configuration type KafkaConfig struct { - PartitionNum *int32 `json:"partition_num,omitempty"` - ReplicationFactor *int16 `json:"replication_factor,omitempty"` - KafkaVersion *string `json:"kafka_version,omitempty"` - MaxMessageBytes *int `json:"max_message_bytes,omitempty"` - Compression *string `json:"compression,omitempty"` - KafkaClientID *string `json:"kafka_client_id,omitempty"` - AutoCreateTopic *bool `json:"auto_create_topic,omitempty"` - DialTimeout *string `json:"dial_timeout,omitempty"` - WriteTimeout *string `json:"write_timeout,omitempty"` - ReadTimeout *string `json:"read_timeout,omitempty"` - RequiredAcks *int `json:"required_acks,omitempty"` - SASLUser *string `json:"sasl_user,omitempty"` - SASLPassword *string `json:"sasl_password,omitempty"` - SASLMechanism *string `json:"sasl_mechanism,omitempty"` - SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty"` - SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty"` - SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty"` - SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty"` - SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty"` - SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty"` - SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty"` - SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty"` - SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"` - SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"` - SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"` - SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"` - SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"` - SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"` - EnableTLS *bool `json:"enable_tls,omitempty"` - CA *string `json:"ca,omitempty"` - Cert *string `json:"cert,omitempty"` - Key *string `json:"key,omitempty"` - InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"` - CodecConfig *CodecConfig `json:"codec_config,omitempty"` + PartitionNum *int32 `json:"partition_num,omitempty"` + ReplicationFactor *int16 `json:"replication_factor,omitempty"` + KafkaVersion *string `json:"kafka_version,omitempty"` + MaxMessageBytes *int `json:"max_message_bytes,omitempty"` + Compression *string `json:"compression,omitempty"` + KafkaClientID *string `json:"kafka_client_id,omitempty"` + AutoCreateTopic *bool `json:"auto_create_topic,omitempty"` + DialTimeout *string `json:"dial_timeout,omitempty"` + WriteTimeout *string `json:"write_timeout,omitempty"` + ReadTimeout *string `json:"read_timeout,omitempty"` + RequiredAcks *int `json:"required_acks,omitempty"` + SASLUser *string `json:"sasl_user,omitempty"` + SASLPassword *string `json:"sasl_password,omitempty"` + SASLMechanism *string `json:"sasl_mechanism,omitempty"` + SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty"` + SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty"` + SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty"` + SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty"` + SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty"` + SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty"` + SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty"` + SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"` + SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"` + SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"` + SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"` + SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"` + SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"` + EnableTLS *bool `json:"enable_tls,omitempty"` + CA *string `json:"ca,omitempty"` + Cert *string `json:"cert,omitempty"` + Key *string `json:"key,omitempty"` + InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"` + CodecConfig *CodecConfig `json:"codec_config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` } // MySQLConfig represents a MySQL sink configuration diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 57e10445e53..2ce07bcb49c 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -56,7 +56,6 @@ var defaultAPIConfig = &ReplicaConfig{ EnableKafkaSinkV2: util.AddressOf(false), OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), - LargeMessageOnlyHandleKeyColumns: util.AddressOf(false), }, Consistent: &ConsistentConfig{ Level: "none", diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 8d5904be2ad..6254d6f0968 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -338,7 +338,6 @@ func (info *ChangeFeedInfo) rmMQOnlyFields() { info.Config.Sink.EnableKafkaSinkV2 = nil info.Config.Sink.OnlyOutputUpdatedColumns = nil info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil - info.Config.Sink.LargeMessageOnlyHandleKeyColumns = nil info.Config.Sink.KafkaConfig = nil } diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go index 696a4873e9a..7d3c47c2b38 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go @@ -258,6 +258,9 @@ func TestWriteCheckpointTsWhenCanalJsonTiDBExtensionIsDisable(t *testing.T) { sinkURI, err := url.Parse(uri) require.NoError(t, err) replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{ + LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(), + } require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI)) ctx = context.WithValue(ctx, "testing.T", t) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index adde05ad75b..7f58c8fa33e 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1472,6 +1472,9 @@ var doc = `{ "key": { "type": "string" }, + "large-message-handle": { + "$ref": "#/definitions/config.LargeMessageHandleConfig" + }, "max-message-bytes": { "type": "integer" }, @@ -1546,6 +1549,17 @@ var doc = `{ } } }, + "config.LargeMessageHandleConfig": { + "type": "object", + "properties": { + "claim-check-storage-uri": { + "type": "string" + }, + "large-message-handle-option": { + "type": "string" + } + } + }, "config.MySQLConfig": { "type": "object", "properties": { @@ -1647,10 +1661,6 @@ var doc = `{ "kafka-config": { "$ref": "#/definitions/config.KafkaConfig" }, - "large-message-only-handle-key-columns": { - "description": "LargeMessageOnlyHandleKeyColumns is only available when the downstream is MQ.", - "type": "boolean" - }, "mysql-config": { "$ref": "#/definitions/config.MySQLConfig" }, @@ -2370,6 +2380,9 @@ var doc = `{ "key": { "type": "string" }, + "large_message_handle": { + "$ref": "#/definitions/v2.LargeMessageHandleConfig" + }, "max_message_bytes": { "type": "integer" }, @@ -2444,6 +2457,17 @@ var doc = `{ } } }, + "v2.LargeMessageHandleConfig": { + "type": "object", + "properties": { + "claim_check_storage_uri": { + "type": "string" + }, + "large_message_handle_option": { + "type": "string" + } + } + }, "v2.LogLevelReq": { "type": "object", "properties": { @@ -2703,9 +2727,6 @@ var doc = `{ "kafka_config": { "$ref": "#/definitions/v2.KafkaConfig" }, - "large_message_only_handle_key_columns": { - "type": "boolean" - }, "mysql_config": { "$ref": "#/definitions/v2.MySQLConfig" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 9af6f1734c7..1da9e6c2a17 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1453,6 +1453,9 @@ "key": { "type": "string" }, + "large-message-handle": { + "$ref": "#/definitions/config.LargeMessageHandleConfig" + }, "max-message-bytes": { "type": "integer" }, @@ -1527,6 +1530,17 @@ } } }, + "config.LargeMessageHandleConfig": { + "type": "object", + "properties": { + "claim-check-storage-uri": { + "type": "string" + }, + "large-message-handle-option": { + "type": "string" + } + } + }, "config.MySQLConfig": { "type": "object", "properties": { @@ -1628,10 +1642,6 @@ "kafka-config": { "$ref": "#/definitions/config.KafkaConfig" }, - "large-message-only-handle-key-columns": { - "description": "LargeMessageOnlyHandleKeyColumns is only available when the downstream is MQ.", - "type": "boolean" - }, "mysql-config": { "$ref": "#/definitions/config.MySQLConfig" }, @@ -2351,6 +2361,9 @@ "key": { "type": "string" }, + "large_message_handle": { + "$ref": "#/definitions/v2.LargeMessageHandleConfig" + }, "max_message_bytes": { "type": "integer" }, @@ -2425,6 +2438,17 @@ } } }, + "v2.LargeMessageHandleConfig": { + "type": "object", + "properties": { + "claim_check_storage_uri": { + "type": "string" + }, + "large_message_handle_option": { + "type": "string" + } + } + }, "v2.LogLevelReq": { "type": "object", "properties": { @@ -2684,9 +2708,6 @@ "kafka_config": { "$ref": "#/definitions/v2.KafkaConfig" }, - "large_message_only_handle_key_columns": { - "type": "boolean" - }, "mysql_config": { "$ref": "#/definitions/v2.MySQLConfig" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 676cbca88e4..9bdd0cecd89 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -88,6 +88,8 @@ definitions: type: string key: type: string + large-message-handle: + $ref: '#/definitions/config.LargeMessageHandleConfig' max-message-bytes: type: integer partition-num: @@ -137,6 +139,13 @@ definitions: write-timeout: type: string type: object + config.LargeMessageHandleConfig: + properties: + claim-check-storage-uri: + type: string + large-message-handle-option: + type: string + type: object config.MySQLConfig: properties: enable-batch-dml: @@ -211,10 +220,6 @@ definitions: type: integer kafka-config: $ref: '#/definitions/config.KafkaConfig' - large-message-only-handle-key-columns: - description: LargeMessageOnlyHandleKeyColumns is only available when the downstream - is MQ. - type: boolean mysql-config: $ref: '#/definitions/config.MySQLConfig' only-output-updated-columns: @@ -703,6 +708,8 @@ definitions: type: string key: type: string + large_message_handle: + $ref: '#/definitions/v2.LargeMessageHandleConfig' max_message_bytes: type: integer partition_num: @@ -752,6 +759,13 @@ definitions: write_timeout: type: string type: object + v2.LargeMessageHandleConfig: + properties: + claim_check_storage_uri: + type: string + large_message_handle_option: + type: string + type: object v2.LogLevelReq: properties: log_level: @@ -922,8 +936,6 @@ definitions: type: integer kafka_config: $ref: '#/definitions/v2.KafkaConfig' - large_message_only_handle_key_columns: - type: boolean mysql_config: $ref: '#/definitions/v2.MySQLConfig' only_output_updated_columns: diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index f5dd885ac4f..ff85586841a 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -158,7 +158,7 @@ func (o *createChangefeedOptions) completeReplicaCfg( return err } - err = cfg.AdjustEnableOldValueAndVerifyForceReplicate(uri) + err = cfg.ValidateAndAdjust(uri) if err != nil { return err } diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 9631f1016f2..2ba4f90cf8f 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -211,7 +211,6 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { EnableKafkaSinkV2: util.AddressOf(false), OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), - LargeMessageOnlyHandleKeyColumns: util.AddressOf(false), Protocol: util.AddressOf("open-protocol"), }, cfg.Sink) } @@ -244,7 +243,6 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { }, OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), - LargeMessageOnlyHandleKeyColumns: util.AddressOf(false), }, cfg.Sink) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index e5e40f0af21..9d0e6eea3c3 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -60,7 +60,10 @@ const ( "enable-kafka-sink-v2": false, "only-output-updated-columns": false, "delete-only-output-handle-key-columns": false, - "large-message-only-handle-key-columns": false + "large-message-handle": { + "large-message-handle-option": "none", + "claim-check-storage-uri": "" + } }, "consistent": { "level": "none", @@ -207,7 +210,6 @@ const ( "enable-kafka-sink-v2": true, "only-output-updated-columns": true, "delete-only-output-handle-key-columns": true, - "large-message-only-handle-key-columns": true, "safe-mode": true, "terminator": "\r\n", "transaction-atomicity": "", @@ -244,6 +246,10 @@ const ( "avro-enable-watermark": true, "avro-decimal-handling-mode": "string", "avro-bigint-unsigned-handling-mode": "string" + }, + "large-message-handle": { + "large-message-handle-option": "handle-key-only", + "claim-check-storage-uri": "" } }, "mysql-config": { @@ -337,7 +343,6 @@ const ( "enable-kafka-sink-v2": true, "only-output-updated-columns": true, "delete-only-output-handle-key-columns": true, - "large-message-only-handle-key-columns": true, "safe-mode": true, "kafka-config": { "partition-num": 1, @@ -372,6 +377,10 @@ const ( "avro-enable-watermark": true, "avro-decimal-handling-mode": "string", "avro-bigint-unsigned-handling-mode": "string" + }, + "large-message-handle": { + "large-message-handle-option": "handle-key-only", + "claim-check-storage-uri": "" } }, "mysql-config": { diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 8dc6ec98700..d09363e7fbf 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -66,7 +66,6 @@ var defaultReplicaConfig = &ReplicaConfig{ EnableKafkaSinkV2: util.AddressOf(false), OnlyOutputUpdatedColumns: util.AddressOf(false), DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false), - LargeMessageOnlyHandleKeyColumns: util.AddressOf(false), TiDBSourceID: 1, }, Consistent: &ConsistentConfig{ @@ -201,7 +200,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin return err } - err = c.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = c.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) if err != nil { return err } @@ -311,9 +310,7 @@ func (c *ReplicaConfig) AdjustEnableOldValue(scheme, protocol string) { } } -// AdjustEnableOldValueAndVerifyForceReplicate adjust the old value configuration by the sink scheme and encoding protocol -// and then verify the force replicate. -func (c *ReplicaConfig) AdjustEnableOldValueAndVerifyForceReplicate(sinkURI *url.URL) error { +func (c *ReplicaConfig) adjustEnableOldValueAndVerifyForceReplicate(sinkURI *url.URL) error { scheme := strings.ToLower(sinkURI.Scheme) protocol := sinkURI.Query().Get(ProtocolKey) if protocol != "" { diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index aca011517a8..f1dbbee3613 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -64,7 +64,6 @@ func TestReplicaConfigMarshal(t *testing.T) { conf.Sink.OnlyOutputUpdatedColumns = aws.Bool(true) conf.Sink.DeleteOnlyOutputHandleKeyColumns = aws.Bool(true) - conf.Sink.LargeMessageOnlyHandleKeyColumns = aws.Bool(true) conf.Sink.SafeMode = aws.Bool(true) conf.Sink.KafkaConfig = &KafkaConfig{ PartitionNum: aws.Int32(1), @@ -100,6 +99,9 @@ func TestReplicaConfigMarshal(t *testing.T) { AvroDecimalHandlingMode: aws.String("string"), AvroBigintUnsignedHandlingMode: aws.String("string"), }, + LargeMessageHandle: &LargeMessageHandleConfig{ + LargeMessageHandleOption: LargeMessageHandleOptionHandleKeyOnly, + }, } conf.Sink.MySQLConfig = &MySQLConfig{ WorkerCount: aws.Int(8), @@ -321,13 +323,13 @@ func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) { // mysql sink, do not adjust enable-old-value sinkURI, err := url.Parse("mysql://") require.NoError(t, err) - err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = config.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) require.NoError(t, err) require.False(t, config.EnableOldValue) // mysql sink, `enable-old-value` false, `force-replicate` true, should return error config.ForceReplicate = true - err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = config.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) require.Error(t, cerror.ErrOldValueNotEnabled, err) // canal, `enable-old-value` false, `force-replicate` false, no error, `enable-old-value` adjust to true @@ -337,14 +339,14 @@ func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) { sinkURI, err = url.Parse("kafka://127.0.0.1:9092/test?protocol=canal") require.NoError(t, err) - err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = config.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) require.NoError(t, err) require.True(t, config.EnableOldValue) // canal, `force-replicate` true, `enable-old-value` true, no error config.ForceReplicate = true config.EnableOldValue = true - err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = config.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) require.NoError(t, err) require.True(t, config.ForceReplicate) require.True(t, config.EnableOldValue) @@ -355,14 +357,14 @@ func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) { sinkURI, err = url.Parse("kafka://127.0.0.1:9092/test?protocol=avro") require.NoError(t, err) - err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = config.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) require.NoError(t, err) require.False(t, config.EnableOldValue) // avro, `enable-old-value` true, no error, set to false. no matter `force-replicate` config.EnableOldValue = true config.ForceReplicate = true - err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = config.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) require.NoError(t, err) require.False(t, config.EnableOldValue) @@ -372,14 +374,14 @@ func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) { sinkURI, err = url.Parse("s3://xxx/yyy?protocol=csv") require.NoError(t, err) - err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = config.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) require.NoError(t, err) require.False(t, config.EnableOldValue) // csv, `enable-old-value` true, no error, set to false. no matter `force-replicate` config.EnableOldValue = true config.ForceReplicate = true - err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + err = config.adjustEnableOldValueAndVerifyForceReplicate(sinkURI) require.NoError(t, err) require.False(t, config.EnableOldValue) } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index a258997515f..d6cc94d7472 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -141,8 +141,6 @@ type SinkConfig struct { // DeleteOnlyOutputHandleKeyColumns is only available when the downstream is MQ. DeleteOnlyOutputHandleKeyColumns *bool `toml:"delete-only-output-handle-key-columns" json:"delete-only-output-handle-key-columns,omitempty"` - // LargeMessageOnlyHandleKeyColumns is only available when the downstream is MQ. - LargeMessageOnlyHandleKeyColumns *bool `toml:"large-message-only-handle-key-columns" json:"large-message-only-handle-key-columns,omitempty"` // TiDBSourceID is the source ID of the upstream TiDB, // which is used to set the `tidb_cdc_write_source` session variable. @@ -276,40 +274,41 @@ type CodecConfig struct { // KafkaConfig represents a kafka sink configuration type KafkaConfig struct { - PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` - ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` - KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` - MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` - Compression *string `toml:"compression" json:"compression,omitempty"` - KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` - AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` - DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` - WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` - ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` - RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` - SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` - SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` - SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` - SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` - SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` - SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` - SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` - SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` - SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` - SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` - SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` - SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` - SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` - SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` - SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` - SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` - SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` - EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` - CA *string `toml:"ca" json:"ca,omitempty"` - Cert *string `toml:"cert" json:"cert,omitempty"` - Key *string `toml:"key" json:"key,omitempty"` - InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` - CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` + PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` + ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` + KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` + MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` + Compression *string `toml:"compression" json:"compression,omitempty"` + KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` + AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` + DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` + SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` + SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` + SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` + SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` + SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` + SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` + SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` + SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` + SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` + SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` + SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` + SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` + SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` + SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` + SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` + SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` + EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` + CA *string `toml:"ca" json:"ca,omitempty"` + Cert *string `toml:"cert" json:"cert,omitempty"` + Key *string `toml:"key" json:"key,omitempty"` + InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` + CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` } // MySQLConfig represents a MySQL sink configuration @@ -527,3 +526,67 @@ func (s *SinkConfig) CheckCompatibilityWithSinkURI( } return compatibilityError } + +const ( + // LargeMessageHandleOptionNone means not handling large message. + LargeMessageHandleOptionNone string = "none" + // LargeMessageHandleOptionClaimCheck means handling large message by sending to the claim check storage. + LargeMessageHandleOptionClaimCheck string = "claim-check" + // LargeMessageHandleOptionHandleKeyOnly means handling large message by sending only handle key columns. + LargeMessageHandleOptionHandleKeyOnly string = "handle-key-only" +) + +// LargeMessageHandleConfig is the configuration for handling large message. +type LargeMessageHandleConfig struct { + LargeMessageHandleOption string `toml:"large-message-handle-option" json:"large-message-handle-option"` + ClaimCheckStorageURI string `toml:"claim-check-storage-uri" json:"claim-check-storage-uri"` +} + +// NewDefaultLargeMessageHandleConfig return the default LargeMessageHandleConfig. +func NewDefaultLargeMessageHandleConfig() *LargeMessageHandleConfig { + return &LargeMessageHandleConfig{ + LargeMessageHandleOption: LargeMessageHandleOptionNone, + } +} + +// Validate the LargeMessageHandleConfig. +func (c *LargeMessageHandleConfig) Validate(protocol Protocol, enableTiDBExtension bool) error { + if c.LargeMessageHandleOption == LargeMessageHandleOptionNone { + return nil + } + + switch protocol { + case ProtocolOpen: + case ProtocolCanalJSON, ProtocolAvro: + if !enableTiDBExtension { + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to %s, protocol is %s, but enable-tidb-extension is false", + c.LargeMessageHandleOption, protocol.String()) + } + default: + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to %s, protocol is %s, it's not supported", + c.LargeMessageHandleOption, protocol.String()) + } + + if c.LargeMessageHandleOption == LargeMessageHandleOptionClaimCheck && c.ClaimCheckStorageURI == "" { + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to claim-check, but the claim-check-storage-uri is empty") + } + return nil +} + +// HandleKeyOnly returns true if handle large message by encoding handle key only. +func (c *LargeMessageHandleConfig) HandleKeyOnly() bool { + return c.LargeMessageHandleOption == LargeMessageHandleOptionHandleKeyOnly +} + +// EnableClaimCheck returns true if enable claim check. +func (c *LargeMessageHandleConfig) EnableClaimCheck() bool { + return c.LargeMessageHandleOption == LargeMessageHandleOptionClaimCheck +} + +// Disabled returns true if disable large message handle. +func (c *LargeMessageHandleConfig) Disabled() bool { + return c.LargeMessageHandleOption == LargeMessageHandleOptionNone +} diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index c459a511376..42abcbe0de2 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -41,9 +41,7 @@ type Config struct { // DeleteOnlyHandleKeyColumns is true, for the delete event only output the handle key columns. DeleteOnlyHandleKeyColumns bool - // LargeMessageOnlyHandleKeyColumns is true, - // for message large then the MaxMessageBytes only output the handle key columns. - LargeMessageOnlyHandleKeyColumns bool + LargeMessageHandle *config.LargeMessageHandleConfig EnableTiDBExtension bool EnableRowChecksum bool @@ -177,6 +175,9 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er c.NullString = replicaConfig.Sink.CSVConfig.NullString c.IncludeCommitTs = replicaConfig.Sink.CSVConfig.IncludeCommitTs } + if replicaConfig.Sink.KafkaConfig != nil { + c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle + } } if urlParameter.OnlyOutputUpdatedColumns != nil { c.OnlyOutputUpdatedColumns = *urlParameter.OnlyOutputUpdatedColumns @@ -193,13 +194,6 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er } c.DeleteOnlyHandleKeyColumns = util.GetOrZero(replicaConfig.Sink.DeleteOnlyOutputHandleKeyColumns) - c.LargeMessageOnlyHandleKeyColumns = util.GetOrZero(replicaConfig.Sink.LargeMessageOnlyHandleKeyColumns) - if c.LargeMessageOnlyHandleKeyColumns { - log.Warn("large message only handle key columns is enabled, "+ - "if the full message's size is larger than max-message-bytes, only send the handle key columns", - zap.Any("protocol", c.Protocol)) - } - return nil } @@ -298,5 +292,12 @@ func (c *Config) Validate() error { ) } + if c.LargeMessageHandle != nil { + err := c.LargeMessageHandle.Validate(c.Protocol, c.EnableTiDBExtension) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/sink/codec/common/config_test.go b/pkg/sink/codec/common/config_test.go index af7ea4056d2..da78feeec75 100644 --- a/pkg/sink/codec/common/config_test.go +++ b/pkg/sink/codec/common/config_test.go @@ -112,6 +112,99 @@ func TestConfigApplyValidate4EnableRowChecksum(t *testing.T) { require.True(t, c.AvroEnableWatermark) } +func TestLargeMessageHandleConfig(t *testing.T) { + t.Parallel() + + // not set, should always success + uri := "kafka://127.0.0.1:9092/large-message-handle?protocol=open-protocol" + sinkURI, err := url.Parse(uri) + require.NoError(t, err) + + protocol := sinkURI.Query().Get("protocol") + p, err := config.ParseSinkProtocolFromString(protocol) + require.NoError(t, err) + + // open-protocol, should return no error + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{ + LargeMessageHandle: &config.LargeMessageHandleConfig{ + LargeMessageHandleOption: config.LargeMessageHandleOptionNone, + }, + } + + c := NewConfig(p) + err = c.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = c.Validate() + require.NoError(t, err) + require.True(t, c.LargeMessageHandle.Disabled()) + + replicaConfig.Sink.KafkaConfig.LargeMessageHandle.LargeMessageHandleOption = config.LargeMessageHandleOptionHandleKeyOnly + err = c.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = c.Validate() + require.NoError(t, err) + require.True(t, c.LargeMessageHandle.HandleKeyOnly()) + + // canal-json, `enable-tidb-extension` is false, return error + uri = "kafka://127.0.0.1:9092/large-message-handle?protocol=canal-json" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + + p, err = config.ParseSinkProtocolFromString(sinkURI.Query().Get("protocol")) + require.NoError(t, err) + + c = NewConfig(p) + err = c.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = c.Validate() + require.Error(t, err) + + // canal-json, `enable-tidb-extension` is true, no error + uri = "kafka://127.0.0.1:9092/large-message-handle?protocol=canal-json&enable-tidb-extension=true" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + + p, err = config.ParseSinkProtocolFromString(sinkURI.Query().Get("protocol")) + require.NoError(t, err) + + c = NewConfig(p) + err = c.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = c.Validate() + require.NoError(t, err) + + // avro, `enable-tidb-extension` is false, return error + uri = "kafka://127.0.0.1:9092/large-message-handle?protocol=avro" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + + p, err = config.ParseSinkProtocolFromString(sinkURI.Query().Get("protocol")) + require.NoError(t, err) + + replicaConfig.Sink.SchemaRegistry = util.AddressOf("this-is-a-uri") + + c = NewConfig(p) + err = c.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = c.Validate() + require.Error(t, err) + + // avro, `enable-tidb-extension` is true, no error + uri = "kafka://127.0.0.1:9092/large-message-handle?protocol=avro&enable-tidb-extension=true" + sinkURI, err = url.Parse(uri) + require.NoError(t, err) + + p, err = config.ParseSinkProtocolFromString(sinkURI.Query().Get("protocol")) + require.NoError(t, err) + + c = NewConfig(p) + err = c.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + err = c.Validate() + require.NoError(t, err) +} + func TestConfigApplyValidate(t *testing.T) { t.Parallel() @@ -290,10 +383,15 @@ func TestConfigApplyValidate(t *testing.T) { c = NewConfig(config.ProtocolOpen) replicaConfig = config.GetDefaultReplicaConfig() replicaConfig.Sink.DeleteOnlyOutputHandleKeyColumns = util.AddressOf(true) - replicaConfig.Sink.LargeMessageOnlyHandleKeyColumns = util.AddressOf(true) + replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{ + LargeMessageHandle: &config.LargeMessageHandleConfig{ + LargeMessageHandleOption: config.LargeMessageHandleOptionHandleKeyOnly, + }, + } err = c.Apply(sinkURI, replicaConfig) require.NoError(t, err) require.True(t, c.DeleteOnlyHandleKeyColumns) + require.NotNil(t, c.LargeMessageHandle) } func TestMergeConfig(t *testing.T) { @@ -324,7 +422,6 @@ func TestMergeConfig(t *testing.T) { require.NoError(t, err) replicaConfig.Sink.OnlyOutputUpdatedColumns = aws.Bool(true) replicaConfig.Sink.DeleteOnlyOutputHandleKeyColumns = aws.Bool(true) - replicaConfig.Sink.LargeMessageOnlyHandleKeyColumns = aws.Bool(true) replicaConfig.Sink.SchemaRegistry = util.AddressOf("abc") replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{ MaxMessageBytes: aws.Int(123), @@ -335,6 +432,9 @@ func TestMergeConfig(t *testing.T) { AvroBigintUnsignedHandlingMode: aws.String("ab"), AvroDecimalHandlingMode: aws.String("cd"), }, + LargeMessageHandle: &config.LargeMessageHandleConfig{ + LargeMessageHandleOption: config.LargeMessageHandleOptionHandleKeyOnly, + }, } c = NewConfig(config.ProtocolAvro) err = c.Apply(sinkURI, replicaConfig) @@ -347,6 +447,7 @@ func TestMergeConfig(t *testing.T) { require.Equal(t, "cd", c.AvroDecimalHandlingMode) require.Equal(t, 123, c.MaxMessageBytes) require.Equal(t, 456, c.MaxBatchSize) + require.Equal(t, config.LargeMessageHandleOptionHandleKeyOnly, c.LargeMessageHandle.LargeMessageHandleOption) // test override uri = "kafka://127.0.0.1:9092/abc?" + @@ -358,7 +459,6 @@ func TestMergeConfig(t *testing.T) { require.NoError(t, err) replicaConfig.Sink.OnlyOutputUpdatedColumns = aws.Bool(false) replicaConfig.Sink.DeleteOnlyOutputHandleKeyColumns = aws.Bool(true) - replicaConfig.Sink.LargeMessageOnlyHandleKeyColumns = aws.Bool(true) replicaConfig.Sink.SchemaRegistry = util.AddressOf("abcd") replicaConfig.Sink.KafkaConfig = &config.KafkaConfig{ MaxMessageBytes: aws.Int(1233), @@ -369,6 +469,10 @@ func TestMergeConfig(t *testing.T) { AvroBigintUnsignedHandlingMode: aws.String("adb"), AvroDecimalHandlingMode: aws.String("cde"), }, + LargeMessageHandle: &config.LargeMessageHandleConfig{ + LargeMessageHandleOption: config.LargeMessageHandleOptionClaimCheck, + ClaimCheckStorageURI: "file:///claim-check", + }, } c = NewConfig(config.ProtocolAvro) err = c.Apply(sinkURI, replicaConfig) @@ -381,4 +485,5 @@ func TestMergeConfig(t *testing.T) { require.Equal(t, "cd", c.AvroDecimalHandlingMode) require.Equal(t, 123, c.MaxMessageBytes) require.Equal(t, 456, c.MaxBatchSize) + require.Equal(t, c.LargeMessageHandle.LargeMessageHandleOption, config.LargeMessageHandleOptionClaimCheck) } diff --git a/pkg/sink/codec/open/open_protocol_decoder_test.go b/pkg/sink/codec/open/open_protocol_decoder_test.go index 83bb3ffd52c..4952c0d5bd2 100644 --- a/pkg/sink/codec/open/open_protocol_decoder_test.go +++ b/pkg/sink/codec/open/open_protocol_decoder_test.go @@ -82,11 +82,15 @@ func TestDecodeEvent(t *testing.T) { } func TestDecodeEventOnlyHandleKeyColumns(t *testing.T) { - config := common.NewConfig(config.ProtocolOpen) - config.LargeMessageOnlyHandleKeyColumns = true - config.MaxMessageBytes = 185 + codecConfig := common.NewConfig(config.ProtocolOpen) + codecConfig.LargeMessageHandle = &config.LargeMessageHandleConfig{ + LargeMessageHandleOption: config.LargeMessageHandleOptionHandleKeyOnly, + } - encoder := NewBatchEncoderBuilder(config).Build() + //config.LargeMessageOnlyHandleKeyColumns = true + codecConfig.MaxMessageBytes = 185 + + encoder := NewBatchEncoderBuilder(codecConfig).Build() ctx := context.Background() topic := "test" diff --git a/pkg/sink/codec/open/open_protocol_encoder.go b/pkg/sink/codec/open/open_protocol_encoder.go index 616b611d32a..723b55c86d1 100644 --- a/pkg/sink/codec/open/open_protocol_encoder.go +++ b/pkg/sink/codec/open/open_protocol_encoder.go @@ -97,7 +97,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( zap.Int("length", length), zap.Any("table", e.Table), zap.Any("key", key)) - if !d.config.LargeMessageOnlyHandleKeyColumns { + if d.config.LargeMessageHandle.Disabled() { return cerror.ErrMessageTooLarge.GenWithStackByArgs() } diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index 163933765bf..34595c3841d 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -52,20 +52,21 @@ func TestMaxMessageBytes(t *testing.T) { topic := "" // for a single message, the overhead is 36(maxRecordOverhead) + 8(versionHea) = 44, just can hold it. a := 88 + 44 - config := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(a) - encoder := NewBatchEncoderBuilder(config).Build() + codecConfig := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(a) + codecConfig.LargeMessageHandle = config.NewDefaultLargeMessageHandleConfig() + encoder := NewBatchEncoderBuilder(codecConfig).Build() err := encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) require.Nil(t, err) // cannot hold a single message - config = config.WithMaxMessageBytes(a - 1) - encoder = NewBatchEncoderBuilder(config).Build() + codecConfig = codecConfig.WithMaxMessageBytes(a - 1) + encoder = NewBatchEncoderBuilder(codecConfig).Build() err = encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) require.NotNil(t, err) // make sure each batch's `Length` not greater than `max-message-bytes` - config = config.WithMaxMessageBytes(256) - encoder = NewBatchEncoderBuilder(config).Build() + codecConfig = codecConfig.WithMaxMessageBytes(256) + encoder = NewBatchEncoderBuilder(codecConfig).Build() for i := 0; i < 10000; i++ { err := encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) require.Nil(t, err) diff --git a/pkg/sink/kafka/options_test.go b/pkg/sink/kafka/options_test.go index c7f4ac88134..5c7897ce3a1 100644 --- a/pkg/sink/kafka/options_test.go +++ b/pkg/sink/kafka/options_test.go @@ -610,7 +610,11 @@ func TestConfigurationCombinations(t *testing.T) { encoderConfig := common.NewConfig(config.ProtocolOpen) err = encoderConfig.Apply(sinkURI, &config.ReplicaConfig{ - Sink: &config.SinkConfig{}, + Sink: &config.SinkConfig{ + KafkaConfig: &config.KafkaConfig{ + LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(), + }, + }, }) require.Nil(t, err) encoderConfig.WithMaxMessageBytes(options.MaxMessageBytes) diff --git a/pkg/sink/kafka/v2/factory_test.go b/pkg/sink/kafka/v2/factory_test.go index b2620399cf0..1f65850bed4 100644 --- a/pkg/sink/kafka/v2/factory_test.go +++ b/pkg/sink/kafka/v2/factory_test.go @@ -261,8 +261,7 @@ func TestAsyncProducerErrorChan(t *testing.T) { o := newOptions4Test() factory := newFactory4Test(o, t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() asyncProducer, err := factory.AsyncProducer(ctx, make(chan error, 1)) require.NoError(t, err)