From c2f423f42c8da1dc136c4f571276382b5830c4b5 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 29 Nov 2023 14:58:18 +0800 Subject: [PATCH] This is an automated cherry-pick of #9954 Signed-off-by: ti-chi-bot --- cdc/api/v2/model.go | 115 ++++++++++ cdc/redo/meta_manager_test.go | 11 + .../eventsink/cloudstorage/dml_worker.go | 37 ++- pkg/config/sink.go | 210 ++++++++++++++++++ pkg/sink/cloudstorage/config.go | 1 + 5 files changed, 373 insertions(+), 1 deletion(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 83dff1266d3..844905bacbb 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -346,6 +346,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, } } } @@ -487,6 +488,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, } } } @@ -821,6 +823,119 @@ type Capture struct { ClusterID string `json:"cluster_id"` } +<<<<<<< HEAD +======= +// CodecConfig represents a MQ codec configuration +type CodecConfig struct { + EnableTiDBExtension *bool `json:"enable_tidb_extension,omitempty"` + MaxBatchSize *int `json:"max_batch_size,omitempty"` + AvroEnableWatermark *bool `json:"avro_enable_watermark"` + AvroDecimalHandlingMode *string `json:"avro_decimal_handling_mode,omitempty"` + AvroBigintUnsignedHandlingMode *string `json:"avro_bigint_unsigned_handling_mode,omitempty"` +} + +// PulsarConfig represents a pulsar sink configuration +type PulsarConfig struct { + TLSKeyFilePath *string `json:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `json:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `json:"tls-trust-certs-file-path,omitempty"` + PulsarProducerCacheSize *int32 `json:"pulsar-producer-cache-size,omitempty"` + PulsarVersion *string `json:"pulsar-version,omitempty"` + CompressionType *string `json:"compression-type,omitempty"` + AuthenticationToken *string `json:"authentication-token,omitempty"` + ConnectionTimeout *int `json:"connection-timeout,omitempty"` + OperationTimeout *int `json:"operation-timeout,omitempty"` + BatchingMaxMessages *uint `json:"batching-max-messages,omitempty"` + BatchingMaxPublishDelay *int `json:"batching-max-publish-delay,omitempty"` + SendTimeout *int `json:"send-timeout,omitempty"` + TokenFromFile *string `json:"token-from-file,omitempty"` + BasicUserName *string `json:"basic-user-name,omitempty"` + BasicPassword *string `json:"basic-password,omitempty"` + AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty"` + AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"` + OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"` +} + +// PulsarOAuth2 is the configuration for OAuth2 +type PulsarOAuth2 struct { + OAuth2IssuerURL string `json:"oauth2-issuer-url,omitempty"` + OAuth2Audience string `json:"oauth2-audience,omitempty"` + OAuth2PrivateKey string `json:"oauth2-private-key,omitempty"` + OAuth2ClientID string `json:"oauth2-client-id,omitempty"` + OAuth2Scope string `json:"oauth2-scope,omitempty"` +} + +// KafkaConfig represents a kafka sink configuration +type KafkaConfig struct { + PartitionNum *int32 `json:"partition_num,omitempty"` + ReplicationFactor *int16 `json:"replication_factor,omitempty"` + KafkaVersion *string `json:"kafka_version,omitempty"` + MaxMessageBytes *int `json:"max_message_bytes,omitempty"` + Compression *string `json:"compression,omitempty"` + KafkaClientID *string `json:"kafka_client_id,omitempty"` + AutoCreateTopic *bool `json:"auto_create_topic,omitempty"` + DialTimeout *string `json:"dial_timeout,omitempty"` + WriteTimeout *string `json:"write_timeout,omitempty"` + ReadTimeout *string `json:"read_timeout,omitempty"` + RequiredAcks *int `json:"required_acks,omitempty"` + SASLUser *string `json:"sasl_user,omitempty"` + SASLPassword *string `json:"sasl_password,omitempty"` + SASLMechanism *string `json:"sasl_mechanism,omitempty"` + SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty"` + SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty"` + SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty"` + SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty"` + SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty"` + SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty"` + SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty"` + SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"` + SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"` + SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"` + SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"` + SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"` + SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"` + EnableTLS *bool `json:"enable_tls,omitempty"` + CA *string `json:"ca,omitempty"` + Cert *string `json:"cert,omitempty"` + Key *string `json:"key,omitempty"` + InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"` + CodecConfig *CodecConfig `json:"codec_config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` + GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"` +} + +// MySQLConfig represents a MySQL sink configuration +type MySQLConfig struct { + WorkerCount *int `json:"worker_count,omitempty"` + MaxTxnRow *int `json:"max_txn_row,omitempty"` + MaxMultiUpdateRowSize *int `json:"max_multi_update_row_size,omitempty"` + MaxMultiUpdateRowCount *int `json:"max_multi_update_row_count,omitempty"` + TiDBTxnMode *string `json:"tidb_txn_mode,omitempty"` + SSLCa *string `json:"ssl_ca,omitempty"` + SSLCert *string `json:"ssl_cert,omitempty"` + SSLKey *string `json:"ssl_key,omitempty"` + TimeZone *string `json:"time_zone,omitempty"` + WriteTimeout *string `json:"write_timeout,omitempty"` + ReadTimeout *string `json:"read_timeout,omitempty"` + Timeout *string `json:"timeout,omitempty"` + EnableBatchDML *bool `json:"enable_batch_dml,omitempty"` + EnableMultiStatement *bool `json:"enable_multi_statement,omitempty"` + EnableCachePreparedStatement *bool `json:"enable_cache_prepared_statement,omitempty"` +} + +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` +} + +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) // ChangefeedStatus holds common information of a changefeed in cdc type ChangefeedStatus struct { State string `json:"state,omitempty"` diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index b359d26d1b0..d82a1eb7b54 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -326,6 +326,7 @@ func TestGCAndCleanup(t *testing.T) { cancel() require.ErrorIs(t, eg.Wait(), context.Canceled) +<<<<<<< HEAD clenupCtx, clenupCancel := context.WithCancel(context.Background()) defer clenupCancel() m.Cleanup(clenupCtx) @@ -334,6 +335,16 @@ func TestGCAndCleanup(t *testing.T) { require.True(t, ret) cnt := 0 extStorage.WalkDir(clenupCtx, nil, func(path string, size int64) error { +======= + cleanupCtx, cleanupCancel := context.WithCancel(context.Background()) + defer cleanupCancel() + m.Cleanup(cleanupCtx) + ret, err := extStorage.FileExists(cleanupCtx, getDeletedChangefeedMarker(changefeedID)) + require.NoError(t, err) + require.True(t, ret) + cnt := 0 + extStorage.WalkDir(cleanupCtx, nil, func(path string, size int64) error { +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) cnt++ return nil }) diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go index 5620f67a814..f9d786bb03a 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go @@ -19,7 +19,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" @@ -28,6 +27,7 @@ import ( mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -237,12 +237,47 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single callbacks = append(callbacks, msg.Callback) } +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/dml_worker.go if err := d.statistics.RecordBatchExecution(func() (int, error) { err := d.storage.WriteFile(ctx, path, buf.Bytes()) if err != nil { return 0, err } return rowsCnt, nil +======= + if err := d.statistics.RecordBatchExecution(func() (int, int64, error) { + start := time.Now() + defer d.metricFlushDuration.Observe(time.Since(start).Seconds()) + + if d.config.FlushConcurrency <= 1 { + return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes()) + } + + writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{ + Concurrency: d.config.FlushConcurrency, + }) + if inErr != nil { + return 0, 0, inErr + } + + defer func() { + closeErr := writer.Close(ctx) + if inErr != nil { + log.Error("failed to close writer", zap.Error(closeErr), + zap.Int("workerID", d.id), + zap.Any("table", task.tableInfo.TableName), + zap.String("namespace", d.changeFeedID.Namespace), + zap.String("changefeed", d.changeFeedID.ID)) + if inErr == nil { + inErr = closeErr + } + } + }() + if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil { + return 0, 0, inErr + } + return rowsCnt, bytesCnt, nil +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)):cdc/sink/dmlsink/cloudstorage/dml_worker.go }); err != nil { return err } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 9f73988dd32..25c517a2c72 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -319,6 +319,216 @@ func (k *KafkaConfig) MaskSensitiveData() { } } +<<<<<<< HEAD +======= +// PulsarCompressionType is the compression type for pulsar +type PulsarCompressionType string + +// Value returns the pulsar compression type +func (p *PulsarCompressionType) Value() pulsar.CompressionType { + if p == nil { + return 0 + } + switch strings.ToLower(string(*p)) { + case "lz4": + return pulsar.LZ4 + case "zlib": + return pulsar.ZLib + case "zstd": + return pulsar.ZSTD + default: + return 0 // default is no compression + } +} + +// TimeMill is the time in milliseconds +type TimeMill int + +// Duration returns the time in seconds as a duration +func (t *TimeMill) Duration() time.Duration { + if t == nil { + return 0 + } + return time.Duration(*t) * time.Millisecond +} + +// NewTimeMill returns a new time in milliseconds +func NewTimeMill(x int) *TimeMill { + t := TimeMill(x) + return &t +} + +// TimeSec is the time in seconds +type TimeSec int + +// Duration returns the time in seconds as a duration +func (t *TimeSec) Duration() time.Duration { + if t == nil { + return 0 + } + return time.Duration(*t) * time.Second +} + +// NewTimeSec returns a new time in seconds +func NewTimeSec(x int) *TimeSec { + t := TimeSec(x) + return &t +} + +// OAuth2 is the configuration for OAuth2 +type OAuth2 struct { + // OAuth2IssuerURL the URL of the authorization server. + OAuth2IssuerURL string `toml:"oauth2-issuer-url" json:"oauth2-issuer-url,omitempty"` + // OAuth2Audience the URL of the resource server. + OAuth2Audience string `toml:"oauth2-audience" json:"oauth2-audience,omitempty"` + // OAuth2PrivateKey the private key used to sign the server. + OAuth2PrivateKey string `toml:"oauth2-private-key" json:"oauth2-private-key,omitempty"` + // OAuth2ClientID the client ID of the application. + OAuth2ClientID string `toml:"oauth2-client-id" json:"oauth2-client-id,omitempty"` + // OAuth2Scope scope + OAuth2Scope string `toml:"oauth2-scope" json:"oauth2-scope,omitempty"` +} + +func (o *OAuth2) validate() (err error) { + if o == nil { + return nil + } + if len(o.OAuth2IssuerURL) == 0 || len(o.OAuth2ClientID) == 0 || len(o.OAuth2PrivateKey) == 0 || + len(o.OAuth2Audience) == 0 { + return fmt.Errorf("issuer-url and audience and private-key and client-id not be empty") + } + return nil +} + +// PulsarConfig pulsar sink configuration +type PulsarConfig struct { + TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` + + // PulsarProducerCacheSize is the size of the cache of pulsar producers + PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"` + + // PulsarVersion print the version of pulsar + PulsarVersion *string `toml:"pulsar-version" json:"pulsar-version,omitempty"` + + // pulsar client compression + CompressionType *PulsarCompressionType `toml:"compression-type" json:"compression-type,omitempty"` + + // AuthenticationToken the token for the Pulsar server + AuthenticationToken *string `toml:"authentication-token" json:"authentication-token,omitempty"` + + // ConnectionTimeout Timeout for the establishment of a TCP connection (default: 5 seconds) + ConnectionTimeout *TimeSec `toml:"connection-timeout" json:"connection-timeout,omitempty"` + + // Set the operation timeout (default: 30 seconds) + // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the + // operation will be marked as failed + OperationTimeout *TimeSec `toml:"operation-timeout" json:"operation-timeout,omitempty"` + + // BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000) + BatchingMaxMessages *uint `toml:"batching-max-messages" json:"batching-max-messages,omitempty"` + + // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) + // if batch messages are enabled. If set to a non zero value, messages will be queued until this time + // interval or until + BatchingMaxPublishDelay *TimeMill `toml:"batching-max-publish-delay" json:"batching-max-publish-delay,omitempty"` + + // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. + // Send and SendAsync returns an error after timeout. + // default: 30s + SendTimeout *TimeSec `toml:"send-timeout" json:"send-timeout,omitempty"` + + // TokenFromFile Authentication from the file token, + // the path name of the file (the third priority authentication method) + TokenFromFile *string `toml:"token-from-file" json:"token-from-file,omitempty"` + + // BasicUserName Account name for pulsar basic authentication (the second priority authentication method) + BasicUserName *string `toml:"basic-user-name" json:"basic-user-name,omitempty"` + // BasicPassword with account + BasicPassword *string `toml:"basic-password" json:"basic-password,omitempty"` + + // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key + AuthTLSCertificatePath *string `toml:"auth-tls-certificate-path" json:"auth-tls-certificate-path,omitempty"` + // AuthTLSPrivateKeyPath private key + AuthTLSPrivateKeyPath *string `toml:"auth-tls-private-key-path" json:"auth-tls-private-key-path,omitempty"` + + // Oauth2 include oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id + // and 'type' always use 'client_credentials' + OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"` + + // BrokerURL is used to configure service brokerUrl for the Pulsar service. + // This parameter is a part of the `sink-uri`. Internal use only. + BrokerURL string `toml:"-" json:"-"` + // SinkURI is the parsed sinkURI. Internal use only. + SinkURI *url.URL `toml:"-" json:"-"` +} + +// MaskSensitiveData masks sensitive data in PulsarConfig +func (c *PulsarConfig) MaskSensitiveData() { + if c.AuthenticationToken != nil { + c.AuthenticationToken = aws.String("******") + } + if c.BasicPassword != nil { + c.BasicPassword = aws.String("******") + } + if c.OAuth2 != nil { + c.OAuth2.OAuth2PrivateKey = "******" + } +} + +// Check get broker url +func (c *PulsarConfig) validate() (err error) { + if c.OAuth2 != nil { + if err = c.OAuth2.validate(); err != nil { + return err + } + if c.TLSTrustCertsFilePath == nil { + return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty") + } + } + + return nil +} + +// GetDefaultTopicName get default topic name +func (c *PulsarConfig) GetDefaultTopicName() string { + topicName := c.SinkURI.Path + return topicName[1:] +} + +// MySQLConfig represents a MySQL sink configuration +type MySQLConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + MaxTxnRow *int `toml:"max-txn-row" json:"max-txn-row,omitempty"` + MaxMultiUpdateRowSize *int `toml:"max-multi-update-row-size" json:"max-multi-update-row-size,omitempty"` + MaxMultiUpdateRowCount *int `toml:"max-multi-update-row" json:"max-multi-update-row,omitempty"` + TiDBTxnMode *string `toml:"tidb-txn-mode" json:"tidb-txn-mode,omitempty"` + SSLCa *string `toml:"ssl-ca" json:"ssl-ca,omitempty"` + SSLCert *string `toml:"ssl-cert" json:"ssl-cert,omitempty"` + SSLKey *string `toml:"ssl-key" json:"ssl-key,omitempty"` + TimeZone *string `toml:"time-zone" json:"time-zone,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + Timeout *string `toml:"timeout" json:"timeout,omitempty"` + EnableBatchDML *bool `toml:"enable-batch-dml" json:"enable-batch-dml,omitempty"` + EnableMultiStatement *bool `toml:"enable-multi-statement" json:"enable-multi-statement,omitempty"` + EnableCachePreparedStatement *bool `toml:"enable-cache-prepared-statement" json:"enable-cache-prepared-statement,omitempty"` +} + +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` + FileSize *int `toml:"file-size" json:"file-size,omitempty"` + + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` + FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` + FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` + FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` +} + +>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)) func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 918058bdc91..93f1f6d8a28 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -126,6 +126,7 @@ func (c *Config) Apply( if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec } + c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency) } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth {