Skip to content

Commit

Permalink
sink(ticdc): use multi part s3 uploader in storage sink (pingcap#9954)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Dec 8, 2023
1 parent dc68c76 commit 8cb3218
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 57 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -613,6 +614,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -1053,6 +1055,7 @@ type CloudStorageConfig struct {
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
31 changes: 27 additions & 4 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -238,9 +238,32 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
}

if err := d.statistics.RecordBatchExecution(func() (int, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, err
if d.config.FlushConcurrency <= 1 {
return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, inErr
}
return rowsCnt, nil
}); err != nil {
Expand Down
51 changes: 33 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/KimMachineGun/automemlimit v0.2.4
github.com/Shopify/sarama v1.38.1
github.com/VividCortex/mysqlerr v1.0.0
github.com/aws/aws-sdk-go v1.44.48
github.com/aws/aws-sdk-go v1.44.259
github.com/benbjohnson/clock v1.3.0
github.com/bradleyjkemp/grpc-tools v0.2.5
github.com/cenkalti/backoff/v4 v4.0.2
Expand All @@ -30,7 +30,7 @@ require (
github.com/glebarez/sqlite v1.4.6
github.com/go-mysql-org/go-mysql v1.6.1-0.20221223014230-81966e15b9c5
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/go-sql-driver/mysql v1.7.0
github.com/go-sql-driver/mysql v1.7.1
github.com/goccy/go-json v0.9.11
github.com/gogo/gateway v1.1.0
github.com/gogo/protobuf v1.3.2
Expand Down Expand Up @@ -62,12 +62,12 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20230419072653-dc3cd8784a19
github.com/pingcap/kvproto v0.0.0-20231011074246-fa00d2b03372
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22
github.com/pingcap/tidb v1.1.0-beta.0.20230420065519-eb77d3928398
github.com/pingcap/tidb v1.1.0-beta.0.20231207141451-a53a963a4a50
github.com/pingcap/tidb-tools v7.0.0+incompatible
github.com/pingcap/tidb/parser v0.0.0-20230420065519-eb77d3928398
github.com/prometheus/client_golang v1.15.0
github.com/pingcap/tidb/parser v0.0.0-20231207141451-a53a963a4a50
github.com/prometheus/client_golang v1.15.1
github.com/prometheus/client_model v0.3.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand All @@ -83,9 +83,9 @@ require (
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.8.3
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tikv/client-go/v2 v2.0.8-0.20230419123920-35c1ee47c4f9
github.com/tikv/client-go/v2 v2.0.8-0.20231018094816-44449c0526f2
github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132
github.com/tikv/pd/client v0.0.0-20230419153320-f1d1a80feb95
github.com/tikv/pd/client v0.0.0-20230905092614-113cdedbebb6
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -97,7 +97,7 @@ require (
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/tests/v3 v3.5.2
go.uber.org/atomic v1.10.0
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.13.0
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.11.0
Expand All @@ -106,7 +106,7 @@ require (
golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e
golang.org/x/net v0.10.0
golang.org/x/oauth2 v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/sync v0.2.0
golang.org/x/sys v0.8.0
golang.org/x/text v0.9.0
golang.org/x/time v0.3.0
Expand All @@ -127,6 +127,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
Expand All @@ -136,7 +137,7 @@ require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/apache/thrift v0.13.1-0.20201008052519-daf620915714 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.5 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.6 // indirect
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 // indirect
github.com/carlmjohnson/flagext v0.21.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down Expand Up @@ -167,18 +168,22 @@ require (
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
github.com/go-ldap/ldap/v3 v3.4.4 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
github.com/go-openapi/spec v0.20.6 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
Expand All @@ -189,7 +194,8 @@ require (
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/improbable-eng/grpc-web v0.12.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/tdigest v0.0.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
Expand Down Expand Up @@ -243,7 +249,7 @@ require (
github.com/prometheus/procfs v0.9.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20220927061507-ef77025ab5aa // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/sasha-s/go-deadlock v0.2.0 // indirect
github.com/shoenig/go-m1cpu v0.1.5 // indirect
Expand Down Expand Up @@ -288,17 +294,24 @@ require (
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/tools v0.8.0 // indirect
golang.org/x/tools v0.9.1 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.27.2 // indirect
k8s.io/apimachinery v0.27.2 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
modernc.org/libc v1.16.8 // indirect
modernc.org/mathutil v1.4.1 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.1.1 // indirect
modernc.org/sqlite v1.17.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 // indirect
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 // indirect
Expand All @@ -312,3 +325,5 @@ replace github.com/benbjohnson/clock v1.3.0 => github.com/benbjohnson/clock v1.1

// copy from TiDB
replace go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac

replace github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
Loading

0 comments on commit 8cb3218

Please sign in to comment.