Skip to content

Commit

Permalink
This is an automated cherry-pick of #10351
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jan 22, 2024
1 parent b118c02 commit a406dbe
Show file tree
Hide file tree
Showing 19 changed files with 973 additions and 32 deletions.
13 changes: 13 additions & 0 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,15 @@ func verifyCreateChangefeedConfig(
if err != nil {
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
}
<<<<<<< HEAD
ctx = contextutil.PutTimezoneInCtx(ctx, tz)
if err := sink.Validate(ctx, info.SinkURI, info.Config); err != nil {
=======
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: changefeedConfig.Namespace, ID: changefeedConfig.ID},
info.SinkURI, info.Config, up.PDClock,
); err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
return nil, err
}

Expand Down Expand Up @@ -228,7 +235,13 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

<<<<<<< HEAD
if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
=======
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: changefeedConfig.Namespace, ID: changefeedConfig.ID},
newInfo.SinkURI, newInfo.Config, nil); err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
27 changes: 27 additions & 0 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,14 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
}

// verify sink
<<<<<<< HEAD
if err := sink.Validate(ctx, cfg.SinkURI, replicaCfg); err != nil {
=======
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID},
cfg.SinkURI, replicaCfg, nil,
); err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
return nil, err
}

Expand Down Expand Up @@ -321,7 +328,27 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

<<<<<<< HEAD
if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
=======
// use the sinkURI to validate and adjust the new config
sinkURI := oldInfo.SinkURI
if sinkURIUpdated {
sinkURI = newInfo.SinkURI
}
sinkURIParsed, err := url.Parse(sinkURI)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
err = newInfo.Config.ValidateAndAdjust(sinkURIParsed)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID},
newInfo.SinkURI, newInfo.Config, nil); err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,11 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) {
return m.sinkFactory.errors, m.sinkFactory.version
}

<<<<<<< HEAD
m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors)
=======
m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors, m.up.PDClock)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351))
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
Expand Down
17 changes: 17 additions & 0 deletions cdc/sink/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -30,10 +31,20 @@ import (
// Validate sink if given valid parameters.
// TODO: For now, we create a real sink instance and validate it.
// Maybe we should support the dry-run mode to validate sink.
<<<<<<< HEAD:cdc/sink/validator.go
func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) error {
var err error
var uri *url.URL
if uri, err = preCheckSinkURI(sinkURI); err != nil {
=======
func Validate(ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI string, cfg *config.ReplicaConfig,
pdClock pdutil.Clock,
) error {
uri, err := preCheckSinkURI(sinkURI)
if err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/validator/validator.go
return err
}

Expand All @@ -44,6 +55,7 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) er
}
}

<<<<<<< HEAD:cdc/sink/validator.go
errCh := make(chan error)
ctx, cancel := context.WithCancel(contextutil.PutRoleInCtx(ctx, util.RoleClient))
conf := config.GetGlobalServerConfig()
Expand All @@ -56,6 +68,11 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) er
}
// NOTICE: We have to cancel the context before we close it,
// otherwise we will write data to closed chan after sink closed.
=======
ctx, cancel := context.WithCancel(ctx)
s, err := factory.New(ctx, changefeedID, sinkURI, cfg, make(chan error), pdClock)
if err != nil {
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/validator/validator.go
cancel()
err = s.Close(ctx)
} else {
Expand Down
39 changes: 39 additions & 0 deletions cdc/sink/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,42 @@ func TestPreCheckSinkURI(t *testing.T) {
})
}
}
<<<<<<< HEAD:cdc/sink/validator_test.go
=======

func TestValidateSink(t *testing.T) {
t.Parallel()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
replicateConfig := config.GetDefaultReplicaConfig()

// test sink uri error
sinkURI := "mysql://root:111@127.0.0.1:3306/"
err := Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(t, err.Error(), "fail to open MySQL connection")

// test sink uri right
sinkURI = "blackhole://"
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.Nil(t, err)

// test bdr mode error
replicateConfig.BDRMode = util.AddressOf(true)
sinkURI = "blackhole://"
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(t, err.Error(), "sink uri scheme is not supported in BDR mode")

// test sink-scheme/syncpoint error
replicateConfig.EnableSyncPoint = util.AddressOf(true)
sinkURI = "kafka://"
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(
t, err.Error(),
"sink uri scheme is not supported with syncpoint enabled",
)
}
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/validator/validator_test.go
18 changes: 16 additions & 2 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go

Check failure on line 26 in cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

expected 'STRING', found '<<'

Check failure on line 26 in cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

expected 'STRING', found '<<'
"github.com/pingcap/tiflow/cdc/sink/codec/builder"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/cdc/sinkv2/util"
"github.com/pingcap/tiflow/engine/pkg/clock"
=======
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/cdc/sink/util"
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
putil "github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -91,9 +99,16 @@ type dmlSink struct {
dead chan struct{}
}

<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
// NewCloudStorageSink creates a cloud storage sink.
func NewCloudStorageSink(
ctx context.Context,
=======
// NewDMLSink creates a cloud storage sink.
func NewDMLSink(ctx context.Context,
changefeedID model.ChangeFeedID,
pdClock pdutil.Clock,
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
errCh chan error,
Expand Down Expand Up @@ -153,11 +168,10 @@ func NewCloudStorageSink(
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
// create a group of dml workers.
clock := clock.New()
for i := 0; i < cfg.WorkerCount; i++ {
inputCh := chann.NewDrainableChann[eventFragment]()
s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext,
inputCh, clock, s.statistics)
inputCh, pdClock, s.statistics)
workerChannels[i] = inputCh
}

Expand Down
33 changes: 30 additions & 3 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
=======
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/util"
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
"github.com/stretchr/testify/require"
)

func setClock(s *dmlSink, clock clock.Clock) {
for _, w := range s.workers {
w.filePathGenerator.SetClock(clock)
w.filePathGenerator.SetClock(pdutil.NewMonotonicClock(clock))
}
}

Expand Down Expand Up @@ -126,7 +131,14 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.FileIndexWidth = 6
errCh := make(chan error, 5)
<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
=======
s, err := NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(clock.New()),
sinkURI, replicaConfig, errCh)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
require.Nil(t, err)
var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -194,10 +206,17 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
=======
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
mockClock := clock.NewMock()
setClock(s, mockClock)
s, err := NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)

var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -268,11 +287,19 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
// test table is scheduled from one node to another
cnt = 0
ctx, cancel = context.WithCancel(context.Background())
<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go
s, err = NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
=======

>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
mockClock = clock.NewMock()
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
setClock(s, mockClock)
s, err = NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)

err = s.WriteEvents(txns...)
require.Nil(t, err)
Expand Down
10 changes: 8 additions & 2 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
=======
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/cloudstorage/dml_worker.go
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -103,7 +109,7 @@ func newDMLWorker(
config *cloudstorage.Config,
extension string,
inputCh *chann.DrainableChann[eventFragment],
clock clock.Clock,
pdClock pdutil.Clock,
statistics *metrics.Statistics,
) *dmlWorker {
d := &dmlWorker{
Expand All @@ -114,7 +120,7 @@ func newDMLWorker(
inputCh: inputCh,
flushNotifyCh: make(chan dmlTask, 64),
statistics: statistics,
filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock),
filePathGenerator: cloudstorage.NewFilePathGenerator(changefeedID, config, storage, extension, pdClock),
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.
Expand Down
9 changes: 9 additions & 0 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -48,9 +49,17 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
cfg.FileIndexWidth = 6
require.Nil(t, err)

<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
statistics := metrics.NewStatistics(ctx, sink.TxnSink)
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", chann.NewDrainableChann[eventFragment](), clock.New(), statistics)
=======
statistics := metrics.NewStatistics(ctx, model.DefaultChangeFeedID("dml-worker-test"),
sink.TxnSink)
pdlock := pdutil.NewMonotonicClock(clock.New())
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
return d
}

Expand Down
6 changes: 6 additions & 0 deletions cdc/sinkv2/eventsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -63,6 +64,7 @@ func New(ctx context.Context,
sinkURIStr string,
cfg *config.ReplicaConfig,
errCh chan error,
pdClock pdutil.Clock,
) (*SinkFactory, error) {
sinkURI, err := config.GetSinkURIAndAdjustConfigWithSinkURI(sinkURIStr, cfg)
if err != nil {
Expand Down Expand Up @@ -90,7 +92,11 @@ func New(ctx context.Context,
s.sinkType = sink.TxnSink
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go
storageSink, err := cloudstorage.NewCloudStorageSink(ctx, sinkURI, cfg, errCh)
=======
storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, pdClock, sinkURI, cfg, errCh)
>>>>>>> c8ea7d0a75 (sink(ticdc): use pd clock in storage sink (#10351)):cdc/sink/dmlsink/factory/factory.go
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a406dbe

Please sign in to comment.