diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 99f0bd428a0..08ecb4c2e27 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -436,6 +436,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, } } @@ -719,6 +720,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, } } @@ -1205,6 +1207,7 @@ type CloudStorageConfig struct { OutputColumnID *bool `json:"output_column_id,omitempty"` FileExpirationDays *int `json:"file_expiration_days,omitempty"` FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index ee61cf9ef12..40de8505424 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" @@ -28,6 +27,7 @@ import ( mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage" "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/prometheus/client_golang/prometheus" @@ -264,12 +264,35 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single if err := d.statistics.RecordBatchExecution(func() (int, int64, error) { start := time.Now() + defer d.metricFlushDuration.Observe(time.Since(start).Seconds()) - err := d.storage.WriteFile(ctx, path, buf.Bytes()) - if err != nil { - return 0, 0, err + if d.config.FlushConcurrency <= 1 { + return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes()) + } + + writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{ + Concurrency: d.config.FlushConcurrency, + }) + if inErr != nil { + return 0, 0, inErr + } + + defer func() { + closeErr := writer.Close(ctx) + if inErr != nil { + log.Error("failed to close writer", zap.Error(closeErr), + zap.Int("workerID", d.id), + zap.Any("table", task.tableInfo.TableName), + zap.String("namespace", d.changeFeedID.Namespace), + zap.String("changefeed", d.changeFeedID.ID)) + if inErr == nil { + inErr = closeErr + } + } + }() + if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil { + return 0, 0, inErr } - d.metricFlushDuration.Observe(time.Since(start).Seconds()) return rowsCnt, bytesCnt, nil }); err != nil { return err diff --git a/pkg/config/sink.go b/pkg/config/sink.go index b5db5e5b4ea..facddaa2e3b 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -597,6 +597,7 @@ type CloudStorageConfig struct { OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` + FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index 9ed6ca04c4b..628e0d77ecf 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -42,6 +42,12 @@ const ( minFlushInterval = 2 * time.Second // the upper limit of flush-interval. maxFlushInterval = 10 * time.Minute + // defaultFlushConcurrency is the default value of flush-concurrency. + defaultFlushConcurrency = 1 + // the lower limit of flush-concurrency. + minFlushConcurrency = 1 + // the upper limit of flush-concurrency. + maxFlushConcurrency = 512 // defaultFileSize is the default value of file-size. defaultFileSize = 64 * 1024 * 1024 // the lower limit of file size @@ -73,6 +79,7 @@ type Config struct { FileCleanupCronSpec string EnablePartitionSeparator bool OutputColumnID bool + FlushConcurrency int } // NewConfig returns the default cloud storage sink config. @@ -133,11 +140,15 @@ func (c *Config) Apply( if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil { c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec } + c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency) } if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth { c.FileIndexWidth = config.DefaultFileIndexWidth } + if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency { + c.FlushConcurrency = defaultFlushConcurrency + } return nil } diff --git a/pkg/sink/cloudstorage/config_test.go b/pkg/sink/cloudstorage/config_test.go index 863b5c648c4..5703ed0d4f7 100644 --- a/pkg/sink/cloudstorage/config_test.go +++ b/pkg/sink/cloudstorage/config_test.go @@ -32,6 +32,7 @@ func TestConfigApply(t *testing.T) { expected.FileIndexWidth = config.DefaultFileIndexWidth expected.DateSeparator = config.DateSeparatorDay.String() expected.EnablePartitionSeparator = true + expected.FlushConcurrency = 1 uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv" sinkURI, err := url.Parse(uri) require.Nil(t, err)