Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): use multi part s3 uploader in storage sink (#9954) #10179

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -613,6 +614,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
}
}

Expand Down Expand Up @@ -1053,6 +1055,7 @@ type CloudStorageConfig struct {
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
31 changes: 27 additions & 4 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -238,9 +238,32 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
}

if err := d.statistics.RecordBatchExecution(func() (int, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, err
if d.config.FlushConcurrency <= 1 {
return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, inErr
}

defer func() {
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, inErr
}
return rowsCnt, nil
}); err != nil {
Expand Down
17 changes: 16 additions & 1 deletion dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
"github.com/pingcap/tiflow/dm/unit"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -444,7 +445,20 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if err != nil {
return err
}
targetInfoGetter, err := importer.NewTargetInfoGetterImpl(lCfg, targetDB)

pdClient, err := pd.NewClientWithContext(
ctx, []string{lCfg.TiDB.PdAddr}, pd.SecurityOption{
CAPath: lCfg.Security.CAPath,
CertPath: lCfg.Security.CertPath,
KeyPath: lCfg.Security.KeyPath,
SSLCABytes: lCfg.Security.CABytes,
SSLCertBytes: lCfg.Security.CertBytes,
SSLKEYBytes: lCfg.Security.KeyBytes,
})
if err != nil {
return err
}
targetInfoGetter, err := importer.NewTargetInfoGetterImpl(lCfg, targetDB, pdClient)
if err != nil {
return err
}
Expand All @@ -471,6 +485,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
dbMetas,
newLightningPrecheckAdaptor(targetInfoGetter, info),
cpdb,
pdClient,
)

if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok {
Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/example/worker_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (w *exampleWorker) Tick(ctx context.Context) error {
return err
}

file, err := storage.BrExternalStorage().Create(ctx, strconv.Itoa(count)+".txt")
file, err := storage.BrExternalStorage().Create(ctx, strconv.Itoa(count)+".txt", nil)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/broker/broker_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func createS3ResourceForWorker(
defer cancel()
for index, fileName := range newTestFiles {
filePath := fmt.Sprintf("%s/%s/%s", creator, resName, fileName)
f, err := resStorage.Create(ctx, fileName)
f, err := resStorage.Create(ctx, fileName, nil)
require.NoError(t, err)
content := filePath + fmt.Sprintf("_index-%d", index)
// FIXME(CharlesCheung): If nothing is written, f.Close will report an error.
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestIntegrationBrokerOpenNewS3Storage(t *testing.T) {
cli.AssertExpectations(t)
cli.ExpectedCalls = nil

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestBrokerOpenNewStorage(t *testing.T) {
cli.AssertExpectations(t)
cli.ExpectedCalls = nil

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestBrokerOpenExistingStorage(t *testing.T) {

cli.AssertExpectations(t)

f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt")
f, err := hdl.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)

err = f.Close(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions engine/pkg/externalresource/integration_test/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestLocalFileTriggeredByJobRemoval(t *testing.T) {
resID)
require.NoError(t, err)

_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt")
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = handle.Persist(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestLocalFileRecordRemovedTriggeredByExecutorOffline(t *testing.T) {
"job-1",
"/local/resource-1")
require.NoError(t, err)
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt")
_, err = handle.BrExternalStorage().Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = handle.Persist(ctx)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func createPlaceholderFile(ctx context.Context, storage brStorage.ExternalStorag
return errors.ErrExternalStorageAPI.GenWithStackByArgs("resource already exists")
}

writer, err := storage.Create(ctx, placeholderFileName)
writer, err := storage.Create(ctx, placeholderFileName, nil)
if err != nil {
return errors.ErrExternalStorageAPI.Wrap(err).GenWithStackByArgs("creating placeholder file")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestFileManagerBasics(t *testing.T) {

storage, err := newBrStorageForLocalFile(res1.AbsolutePath())
require.NoError(t, err)
fwriter, err := storage.Create(context.Background(), "1.txt")
fwriter, err := storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand All @@ -100,7 +100,7 @@ func TestFileManagerBasics(t *testing.T) {

storage, err = newBrStorageForLocalFile(res2.AbsolutePath())
require.NoError(t, err)
fwriter, err = storage.Create(context.Background(), "1.txt")
fwriter, err = storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestFileManagerManyWorkers(t *testing.T) {

storage, err := newBrStorageForLocalFile(res1.AbsolutePath())
require.NoError(t, err)
fwriter, err := storage.Create(context.Background(), "1.txt")
fwriter, err := storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand All @@ -166,7 +166,7 @@ func TestFileManagerManyWorkers(t *testing.T) {

storage, err = newBrStorageForLocalFile(res2.AbsolutePath())
require.NoError(t, err)
fwriter, err = storage.Create(context.Background(), "1.txt")
fwriter, err = storage.Create(context.Background(), "1.txt", nil)
require.NoError(t, err)
err = fwriter.Close(context.Background())
require.NoError(t, err)
Expand Down
51 changes: 33 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/KimMachineGun/automemlimit v0.2.4
github.com/Shopify/sarama v1.38.1
github.com/VividCortex/mysqlerr v1.0.0
github.com/aws/aws-sdk-go v1.44.48
github.com/aws/aws-sdk-go v1.44.259
github.com/benbjohnson/clock v1.3.0
github.com/bradleyjkemp/grpc-tools v0.2.5
github.com/cenkalti/backoff/v4 v4.0.2
Expand All @@ -30,7 +30,7 @@ require (
github.com/glebarez/sqlite v1.4.6
github.com/go-mysql-org/go-mysql v1.6.1-0.20221223014230-81966e15b9c5
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/go-sql-driver/mysql v1.7.0
github.com/go-sql-driver/mysql v1.7.1
github.com/goccy/go-json v0.9.11
github.com/gogo/gateway v1.1.0
github.com/gogo/protobuf v1.3.2
Expand Down Expand Up @@ -62,12 +62,12 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20230419072653-dc3cd8784a19
github.com/pingcap/kvproto v0.0.0-20231011074246-fa00d2b03372
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22
github.com/pingcap/tidb v1.1.0-beta.0.20230420065519-eb77d3928398
github.com/pingcap/tidb v1.1.0-beta.0.20231207141451-a53a963a4a50
github.com/pingcap/tidb-tools v7.0.0+incompatible
github.com/pingcap/tidb/parser v0.0.0-20230420065519-eb77d3928398
github.com/prometheus/client_golang v1.15.0
github.com/pingcap/tidb/parser v0.0.0-20231207141451-a53a963a4a50
github.com/prometheus/client_golang v1.15.1
github.com/prometheus/client_model v0.3.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand All @@ -83,9 +83,9 @@ require (
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.8.3
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tikv/client-go/v2 v2.0.8-0.20230419123920-35c1ee47c4f9
github.com/tikv/client-go/v2 v2.0.8-0.20231018094816-44449c0526f2
github.com/tikv/pd v1.1.0-beta.0.20230203015356-248b3f0be132
github.com/tikv/pd/client v0.0.0-20230419153320-f1d1a80feb95
github.com/tikv/pd/client v0.0.0-20230905092614-113cdedbebb6
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -97,7 +97,7 @@ require (
go.etcd.io/etcd/raft/v3 v3.5.2
go.etcd.io/etcd/server/v3 v3.5.2
go.etcd.io/etcd/tests/v3 v3.5.2
go.uber.org/atomic v1.10.0
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.13.0
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.11.0
Expand All @@ -106,7 +106,7 @@ require (
golang.org/x/exp v0.0.0-20221023144134-a1e5550cf13e
golang.org/x/net v0.10.0
golang.org/x/oauth2 v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/sync v0.2.0
golang.org/x/sys v0.8.0
golang.org/x/text v0.9.0
golang.org/x/time v0.3.0
Expand All @@ -127,6 +127,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
Expand All @@ -136,7 +137,7 @@ require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/apache/thrift v0.13.1-0.20201008052519-daf620915714 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.5 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.6 // indirect
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 // indirect
github.com/carlmjohnson/flagext v0.21.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand Down Expand Up @@ -167,18 +168,22 @@ require (
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.4 // indirect
github.com/go-ldap/ldap/v3 v3.4.4 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
github.com/go-openapi/spec v0.20.6 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
Expand All @@ -189,7 +194,8 @@ require (
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/improbable-eng/grpc-web v0.12.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/tdigest v0.0.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
Expand Down Expand Up @@ -243,7 +249,7 @@ require (
github.com/prometheus/procfs v0.9.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20220927061507-ef77025ab5aa // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/sasha-s/go-deadlock v0.2.0 // indirect
github.com/shoenig/go-m1cpu v0.1.5 // indirect
Expand Down Expand Up @@ -288,17 +294,24 @@ require (
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/tools v0.8.0 // indirect
golang.org/x/tools v0.9.1 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.27.2 // indirect
k8s.io/apimachinery v0.27.2 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
modernc.org/libc v1.16.8 // indirect
modernc.org/mathutil v1.4.1 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.1.1 // indirect
modernc.org/sqlite v1.17.3 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 // indirect
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 // indirect
Expand All @@ -312,3 +325,5 @@ replace github.com/benbjohnson/clock v1.3.0 => github.com/benbjohnson/clock v1.1

// copy from TiDB
replace go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac

replace github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
Loading
Loading