Skip to content

Commit

Permalink
sink(ticdc): use multi part s3 uploader in storage sink (#9954)
Browse files Browse the repository at this point in the history
close #10098, close #10172
  • Loading branch information
CharlesCheung96 authored Nov 29, 2023
1 parent 13499f4 commit b6c1a94
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 8 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -719,6 +720,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -1205,6 +1207,7 @@ type CloudStorageConfig struct {
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
8 changes: 5 additions & 3 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,14 @@ func TestGCAndCleanup(t *testing.T) {
cancel()
require.ErrorIs(t, eg.Wait(), context.Canceled)

m.Cleanup(ctx)
ret, err := extStorage.FileExists(ctx, getDeletedChangefeedMarker(changefeedID))
cleanupCtx, cleanupCancel := context.WithCancel(context.Background())
defer cleanupCancel()
m.Cleanup(cleanupCtx)
ret, err := extStorage.FileExists(cleanupCtx, getDeletedChangefeedMarker(changefeedID))
require.NoError(t, err)
require.True(t, ret)
cnt := 0
extStorage.WalkDir(ctx, nil, func(path string, size int64) error {
extStorage.WalkDir(cleanupCtx, nil, func(path string, size int64) error {
cnt++
return nil
})
Expand Down
33 changes: 28 additions & 5 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -264,12 +264,35 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single

if err := d.statistics.RecordBatchExecution(func() (int, int64, error) {
start := time.Now()
defer d.metricFlushDuration.Observe(time.Since(start).Seconds())

err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, 0, err
if d.config.FlushConcurrency <= 1 {
return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, 0, inErr
}
d.metricFlushDuration.Observe(time.Since(start).Seconds())
return rowsCnt, bytesCnt, nil
}); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ type CloudStorageConfig struct {
OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"`
FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"`
FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
Expand Down
11 changes: 11 additions & 0 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ const (
minFlushInterval = 2 * time.Second
// the upper limit of flush-interval.
maxFlushInterval = 10 * time.Minute
// defaultFlushConcurrency is the default value of flush-concurrency.
defaultFlushConcurrency = 1
// the lower limit of flush-concurrency.
minFlushConcurrency = 1
// the upper limit of flush-concurrency.
maxFlushConcurrency = 512
// defaultFileSize is the default value of file-size.
defaultFileSize = 64 * 1024 * 1024
// the lower limit of file size
Expand Down Expand Up @@ -73,6 +79,7 @@ type Config struct {
FileCleanupCronSpec string
EnablePartitionSeparator bool
OutputColumnID bool
FlushConcurrency int
}

// NewConfig returns the default cloud storage sink config.
Expand Down Expand Up @@ -133,11 +140,15 @@ func (c *Config) Apply(
if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil {
c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec
}
c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency)
}

if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth {
c.FileIndexWidth = config.DefaultFileIndexWidth
}
if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency {
c.FlushConcurrency = defaultFlushConcurrency
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/cloudstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestConfigApply(t *testing.T) {
expected.FileIndexWidth = config.DefaultFileIndexWidth
expected.DateSeparator = config.DateSeparatorDay.String()
expected.EnablePartitionSeparator = true
expected.FlushConcurrency = 1
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv"
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
Expand Down

0 comments on commit b6c1a94

Please sign in to comment.