Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9954
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Nov 29, 2023
1 parent 8fb72da commit c2f423f
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 1 deletion.
115 changes: 115 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
}
}
}
Expand Down Expand Up @@ -487,6 +488,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
}
}
}
Expand Down Expand Up @@ -821,6 +823,119 @@ type Capture struct {
ClusterID string `json:"cluster_id"`
}

<<<<<<< HEAD
=======
// CodecConfig represents a MQ codec configuration
type CodecConfig struct {
EnableTiDBExtension *bool `json:"enable_tidb_extension,omitempty"`
MaxBatchSize *int `json:"max_batch_size,omitempty"`
AvroEnableWatermark *bool `json:"avro_enable_watermark"`
AvroDecimalHandlingMode *string `json:"avro_decimal_handling_mode,omitempty"`
AvroBigintUnsignedHandlingMode *string `json:"avro_bigint_unsigned_handling_mode,omitempty"`
}

// PulsarConfig represents a pulsar sink configuration
type PulsarConfig struct {
TLSKeyFilePath *string `json:"tls-certificate-path,omitempty"`
TLSCertificateFile *string `json:"tls-private-key-path,omitempty"`
TLSTrustCertsFilePath *string `json:"tls-trust-certs-file-path,omitempty"`
PulsarProducerCacheSize *int32 `json:"pulsar-producer-cache-size,omitempty"`
PulsarVersion *string `json:"pulsar-version,omitempty"`
CompressionType *string `json:"compression-type,omitempty"`
AuthenticationToken *string `json:"authentication-token,omitempty"`
ConnectionTimeout *int `json:"connection-timeout,omitempty"`
OperationTimeout *int `json:"operation-timeout,omitempty"`
BatchingMaxMessages *uint `json:"batching-max-messages,omitempty"`
BatchingMaxPublishDelay *int `json:"batching-max-publish-delay,omitempty"`
SendTimeout *int `json:"send-timeout,omitempty"`
TokenFromFile *string `json:"token-from-file,omitempty"`
BasicUserName *string `json:"basic-user-name,omitempty"`
BasicPassword *string `json:"basic-password,omitempty"`
AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty"`
AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"`
OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"`
}

// PulsarOAuth2 is the configuration for OAuth2
type PulsarOAuth2 struct {
OAuth2IssuerURL string `json:"oauth2-issuer-url,omitempty"`
OAuth2Audience string `json:"oauth2-audience,omitempty"`
OAuth2PrivateKey string `json:"oauth2-private-key,omitempty"`
OAuth2ClientID string `json:"oauth2-client-id,omitempty"`
OAuth2Scope string `json:"oauth2-scope,omitempty"`
}

// KafkaConfig represents a kafka sink configuration
type KafkaConfig struct {
PartitionNum *int32 `json:"partition_num,omitempty"`
ReplicationFactor *int16 `json:"replication_factor,omitempty"`
KafkaVersion *string `json:"kafka_version,omitempty"`
MaxMessageBytes *int `json:"max_message_bytes,omitempty"`
Compression *string `json:"compression,omitempty"`
KafkaClientID *string `json:"kafka_client_id,omitempty"`
AutoCreateTopic *bool `json:"auto_create_topic,omitempty"`
DialTimeout *string `json:"dial_timeout,omitempty"`
WriteTimeout *string `json:"write_timeout,omitempty"`
ReadTimeout *string `json:"read_timeout,omitempty"`
RequiredAcks *int `json:"required_acks,omitempty"`
SASLUser *string `json:"sasl_user,omitempty"`
SASLPassword *string `json:"sasl_password,omitempty"`
SASLMechanism *string `json:"sasl_mechanism,omitempty"`
SASLGssAPIAuthType *string `json:"sasl_gssapi_auth_type,omitempty"`
SASLGssAPIKeytabPath *string `json:"sasl_gssapi_keytab_path,omitempty"`
SASLGssAPIKerberosConfigPath *string `json:"sasl_gssapi_kerberos_config_path,omitempty"`
SASLGssAPIServiceName *string `json:"sasl_gssapi_service_name,omitempty"`
SASLGssAPIUser *string `json:"sasl_gssapi_user,omitempty"`
SASLGssAPIPassword *string `json:"sasl_gssapi_password,omitempty"`
SASLGssAPIRealm *string `json:"sasl_gssapi_realm,omitempty"`
SASLGssAPIDisablePafxfast *bool `json:"sasl_gssapi_disable_pafxfast,omitempty"`
SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"`
SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"`
SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"`
SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"`
SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"`
SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"`
EnableTLS *bool `json:"enable_tls,omitempty"`
CA *string `json:"ca,omitempty"`
Cert *string `json:"cert,omitempty"`
Key *string `json:"key,omitempty"`
InsecureSkipVerify *bool `json:"insecure_skip_verify,omitempty"`
CodecConfig *CodecConfig `json:"codec_config,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"`
}

// MySQLConfig represents a MySQL sink configuration
type MySQLConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
MaxTxnRow *int `json:"max_txn_row,omitempty"`
MaxMultiUpdateRowSize *int `json:"max_multi_update_row_size,omitempty"`
MaxMultiUpdateRowCount *int `json:"max_multi_update_row_count,omitempty"`
TiDBTxnMode *string `json:"tidb_txn_mode,omitempty"`
SSLCa *string `json:"ssl_ca,omitempty"`
SSLCert *string `json:"ssl_cert,omitempty"`
SSLKey *string `json:"ssl_key,omitempty"`
TimeZone *string `json:"time_zone,omitempty"`
WriteTimeout *string `json:"write_timeout,omitempty"`
ReadTimeout *string `json:"read_timeout,omitempty"`
Timeout *string `json:"timeout,omitempty"`
EnableBatchDML *bool `json:"enable_batch_dml,omitempty"`
EnableMultiStatement *bool `json:"enable_multi_statement,omitempty"`
EnableCachePreparedStatement *bool `json:"enable_cache_prepared_statement,omitempty"`
}

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
}

>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954))
// ChangefeedStatus holds common information of a changefeed in cdc
type ChangefeedStatus struct {
State string `json:"state,omitempty"`
Expand Down
11 changes: 11 additions & 0 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func TestGCAndCleanup(t *testing.T) {
cancel()
require.ErrorIs(t, eg.Wait(), context.Canceled)

<<<<<<< HEAD
clenupCtx, clenupCancel := context.WithCancel(context.Background())
defer clenupCancel()
m.Cleanup(clenupCtx)
Expand All @@ -334,6 +335,16 @@ func TestGCAndCleanup(t *testing.T) {
require.True(t, ret)
cnt := 0
extStorage.WalkDir(clenupCtx, nil, func(path string, size int64) error {
=======
cleanupCtx, cleanupCancel := context.WithCancel(context.Background())
defer cleanupCancel()
m.Cleanup(cleanupCtx)
ret, err := extStorage.FileExists(cleanupCtx, getDeletedChangefeedMarker(changefeedID))
require.NoError(t, err)
require.True(t, ret)
cnt := 0
extStorage.WalkDir(cleanupCtx, nil, func(path string, size int64) error {
>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954))
cnt++
return nil
})
Expand Down
37 changes: 36 additions & 1 deletion cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -28,6 +27,7 @@ import (
mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -237,12 +237,47 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
callbacks = append(callbacks, msg.Callback)
}

<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
if err := d.statistics.RecordBatchExecution(func() (int, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, err
}
return rowsCnt, nil
=======
if err := d.statistics.RecordBatchExecution(func() (int, int64, error) {
start := time.Now()
defer d.metricFlushDuration.Observe(time.Since(start).Seconds())

if d.config.FlushConcurrency <= 1 {
return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, 0, inErr
}
return rowsCnt, bytesCnt, nil
>>>>>>> b6c1a9404a (sink(ticdc): use multi part s3 uploader in storage sink (#9954)):cdc/sink/dmlsink/cloudstorage/dml_worker.go
}); err != nil {
return err
}
Expand Down
Loading

0 comments on commit c2f423f

Please sign in to comment.