Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): use multi part s3 uploader in storage sink (#9954) #10180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -742,6 +743,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -1261,6 +1263,7 @@ type CloudStorageConfig struct {
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
8 changes: 5 additions & 3 deletions cdc/redo/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,14 @@ func TestGCAndCleanup(t *testing.T) {
cancel()
require.ErrorIs(t, eg.Wait(), context.Canceled)

m.Cleanup(ctx)
ret, err := extStorage.FileExists(ctx, getDeletedChangefeedMarker(changefeedID))
cleanupCtx, cleanupCancel := context.WithCancel(context.Background())
defer cleanupCancel()
m.Cleanup(cleanupCtx)
ret, err := extStorage.FileExists(cleanupCtx, getDeletedChangefeedMarker(changefeedID))
require.NoError(t, err)
require.True(t, ret)
cnt := 0
extStorage.WalkDir(ctx, nil, func(path string, size int64) error {
extStorage.WalkDir(cleanupCtx, nil, func(path string, size int64) error {
cnt++
return nil
})
Expand Down
29 changes: 26 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,32 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
}

if err := d.statistics.RecordBatchExecution(func() (int, int64, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, 0, err
if d.config.FlushConcurrency <= 1 {
return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, 0, inErr
}
return rowsCnt, bytesCnt, nil
}); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ type CloudStorageConfig struct {
OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"`
FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"`
FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"`
FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"`
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
Expand Down
11 changes: 11 additions & 0 deletions pkg/sink/cloudstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ const (
minFlushInterval = 2 * time.Second
// the upper limit of flush-interval.
maxFlushInterval = 10 * time.Minute
// defaultFlushConcurrency is the default value of flush-concurrency.
defaultFlushConcurrency = 1
// the lower limit of flush-concurrency.
minFlushConcurrency = 1
// the upper limit of flush-concurrency.
maxFlushConcurrency = 512
// defaultFileSize is the default value of file-size.
defaultFileSize = 64 * 1024 * 1024
// the lower limit of file size
Expand Down Expand Up @@ -73,6 +79,7 @@ type Config struct {
FileCleanupCronSpec string
EnablePartitionSeparator bool
OutputColumnID bool
FlushConcurrency int
}

// NewConfig returns the default cloud storage sink config.
Expand Down Expand Up @@ -133,11 +140,15 @@ func (c *Config) Apply(
if replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec != nil {
c.FileCleanupCronSpec = *replicaConfig.Sink.CloudStorageConfig.FileCleanupCronSpec
}
c.FlushConcurrency = util.GetOrZero(replicaConfig.Sink.CloudStorageConfig.FlushConcurrency)
}

if c.FileIndexWidth < config.MinFileIndexWidth || c.FileIndexWidth > config.MaxFileIndexWidth {
c.FileIndexWidth = config.DefaultFileIndexWidth
}
if c.FlushConcurrency < minFlushConcurrency || c.FlushConcurrency > maxFlushConcurrency {
c.FlushConcurrency = defaultFlushConcurrency
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/cloudstorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestConfigApply(t *testing.T) {
expected.FileIndexWidth = config.DefaultFileIndexWidth
expected.DateSeparator = config.DateSeparatorDay.String()
expected.EnablePartitionSeparator = true
expected.FlushConcurrency = 1
uri := "s3://bucket/prefix?worker-count=32&flush-interval=10s&file-size=16777216&protocol=csv"
sinkURI, err := url.Parse(uri)
require.Nil(t, err)
Expand Down
Loading