Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: add changefeed epoch to prevent unexpected state (#8268) #8298

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
465 changes: 465 additions & 0 deletions cdc/api/v2/api_helpers.go

Large diffs are not rendered by default.

147 changes: 147 additions & 0 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"context"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestVerifyCreateChangefeedConfig(t *testing.T) {
ctx := context.Background()
pdClient := &mockPDClient{}
helper := entry.NewSchemaTestHelper(t)
helper.Tk().MustExec("use test;")
storage := helper.Storage()
provider := &mockStatusProvider{}
cfg := &ChangefeedConfig{}
h := &APIV2HelpersImpl{}
cfInfo, err := h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, cfInfo)
require.NotNil(t, err)
cfg.SinkURI = "blackhole://"
// repliconfig is nil
require.Panics(t, func() {
_, _ = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
})
cfg.ReplicaConfig = GetDefaultReplicaConfig()
cfg.ReplicaConfig.ForceReplicate = true
cfg.ReplicaConfig.EnableOldValue = false
// disable old value but force replicate
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
require.Nil(t, cfInfo)
cfg.ReplicaConfig.ForceReplicate = false
cfg.ReplicaConfig.IgnoreIneligibleTable = true
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, err)
require.NotNil(t, cfInfo)
require.NotEqual(t, "", cfInfo.ID)
require.Equal(t, model.DefaultNamespace, cfInfo.Namespace)
require.NotEqual(t, 0, cfInfo.Epoch)

cfg.ID = "abdc/sss"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
cfg.ID = ""
cfg.Namespace = "abdc/sss"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
cfg.ID = ""
cfg.Namespace = ""
// changefeed already exists
provider.changefeedStatus = &model.ChangeFeedStatus{}
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
provider.changefeedStatus = nil
provider.err = cerror.ErrChangeFeedNotExists.GenWithStackByArgs("aaa")
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.Nil(t, err)
require.Equal(t, uint64(123), cfInfo.UpstreamID)
cfg.TargetTs = 3
cfg.StartTs = 4
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
cfg.TargetTs = 6
cfg.ReplicaConfig.EnableOldValue = false
cfg.SinkURI = "aaab://"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
cfg.SinkURI = string([]byte{0x7f, ' '})
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
}

func TestVerifyUpdateChangefeedConfig(t *testing.T) {
ctx := context.Background()
cfg := &ChangefeedConfig{}
oldInfo := &model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
}
oldUpInfo := &model.UpstreamInfo{}
helper := entry.NewSchemaTestHelper(t)
helper.Tk().MustExec("use test;")
storage := helper.Storage()
h := &APIV2HelpersImpl{}
newCfInfo, newUpInfo, err := h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
require.Nil(t, newCfInfo)
require.Nil(t, newUpInfo)
// namespace and id can not be updated
cfg.Namespace = "abc"
cfg.ID = "1234"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
require.Nil(t, newCfInfo)
require.Nil(t, newUpInfo)
cfg.StartTs = 2
cfg.TargetTs = 10
cfg.Engine = model.SortInMemory
cfg.ReplicaConfig = ToAPIReplicaConfig(config.GetDefaultReplicaConfig())
cfg.ReplicaConfig.EnableSyncPoint = true
cfg.ReplicaConfig.SyncPointInterval = 30 * time.Second
cfg.PDAddrs = []string{"a", "b"}
cfg.CertPath = "p1"
cfg.CAPath = "p2"
cfg.KeyPath = "p3"
cfg.SinkURI = "blackhole://"
cfg.CertAllowedCN = []string{"c", "d"}
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Nil(t, err)
// startTs can not be updated
newCfInfo.Config.Sink.TxnAtomicity = ""
require.Equal(t, uint64(0), newCfInfo.StartTs)
require.Equal(t, uint64(10), newCfInfo.TargetTs)
require.Equal(t, model.SortInMemory, newCfInfo.Engine)
require.Equal(t, true, newCfInfo.Config.EnableSyncPoint)
require.Equal(t, 30*time.Second, newCfInfo.Config.SyncPointInterval)
require.Equal(t, cfg.ReplicaConfig.ToInternalReplicaConfig(), newCfInfo.Config)
require.Equal(t, "a,b", newUpInfo.PDEndpoints)
require.Equal(t, "p1", newUpInfo.CertPath)
require.Equal(t, "p2", newUpInfo.CAPath)
require.Equal(t, "p3", newUpInfo.KeyPath)
require.Equal(t, []string{"c", "d"}, newUpInfo.CertAllowedCN)
require.Equal(t, "blackhole://", newCfInfo.SinkURI)
oldInfo.StartTs = 10
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)
}
33 changes: 33 additions & 0 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
<<<<<<< HEAD:cdc/api/validator.go
"github.com/pingcap/tiflow/cdc/sink"
=======
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/sinkv2/validator"
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268)):cdc/api/v1/validator.go
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -130,6 +135,7 @@ func verifyCreateChangefeedConfig(

// init ChangefeedInfo
info := &model.ChangeFeedInfo{
<<<<<<< HEAD:cdc/api/validator.go
SinkURI: changefeedConfig.SinkURI,
Opts: make(map[string]string),
CreateTime: time.Now(),
Expand All @@ -141,6 +147,33 @@ func verifyCreateChangefeedConfig(
SyncPointEnabled: false,
SyncPointInterval: 10 * time.Minute,
CreatorVersion: version.ReleaseVersion,
=======
Namespace: model.DefaultNamespace,
ID: changefeedConfig.ID,
UpstreamID: up.ID,
SinkURI: changefeedConfig.SinkURI,
CreateTime: time.Now(),
StartTs: changefeedConfig.StartTS,
TargetTs: changefeedConfig.TargetTS,
Config: replicaConfig,
Engine: sortEngine,
State: model.StateNormal,
CreatorVersion: version.ReleaseVersion,
Epoch: owner.GenerateChangefeedEpoch(ctx, up.PDClient),
}
f, err := filter.NewFilter(replicaConfig, "")
if err != nil {
return nil, err
}
tableInfos, ineligibleTables, _, err := entry.VerifyTables(f,
up.KVStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
err = f.Verify(tableInfos)
if err != nil {
return nil, err
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268)):cdc/api/v1/validator.go
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
Expand Down
1 change: 1 addition & 0 deletions cdc/api/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.Nil(t, err)
require.NotNil(t, newInfo)
require.NotEqual(t, 0, newInfo.Epoch)
}
6 changes: 6 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,15 @@ type ChangeFeedInfo struct {
State FeedState `json:"state"`
Error *RunningError `json:"error"`

<<<<<<< HEAD
SyncPointEnabled bool `json:"sync-point-enabled"`
SyncPointInterval time.Duration `json:"sync-point-interval"`
CreatorVersion string `json:"creator-version"`
=======
CreatorVersion string `json:"creator-version"`
// Epoch is the epoch of a changefeed, changes on every restart.
Epoch uint64 `json:"epoch"`
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
}

const changeFeedIDMaxLen = 128
Expand Down
100 changes: 100 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,20 @@ import (

// newSchedulerV2FromCtx creates a new schedulerV2 from context.
// This function is factored out to facilitate unit testing.
<<<<<<< HEAD
func newSchedulerV2FromCtx(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
=======
func newSchedulerFromCtx(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (ret scheduler.Scheduler, err error) {
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ownerRev := ctx.GlobalVars().OwnerRevision
<<<<<<< HEAD
ret, err := scheduler.NewScheduler(
ctx, changeFeedID, startTs, messageServer, messageRouter, ownerRev)
if err != nil {
Expand All @@ -59,6 +66,19 @@ func newSchedulerV2FromCtx(

func newScheduler(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) {
return newSchedulerV2FromCtx(ctx, startTs)
=======
captureID := ctx.GlobalVars().CaptureInfo.ID
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID,
messageServer, messageRouter, ownerRev, epoch, up.RegionCache, up.PDClock, cfg)
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, up, epoch, cfg)
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
}

type changefeed struct {
Expand Down Expand Up @@ -109,9 +129,39 @@ type changefeed struct {
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer

<<<<<<< HEAD
newDDLPuller func(ctx cdcContext.Context, upStream *upstream.Upstream, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error)
=======
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsLagDuration prometheus.Observer
metricsCurrentPDTsGauge prometheus.Gauge

metricsChangefeedBarrierTsGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer

downstreamObserver observer.Observer
observerLastTick *atomic.Time

newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error)

newDownstreamObserver func(
ctx context.Context, sinkURIStr string, replCfg *config.ReplicaConfig,
opts ...observer.NewObserverOption,
) (observer.Observer, error)
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}
Expand All @@ -122,8 +172,13 @@ func newChangefeed(id model.ChangeFeedID, upStream *upstream.Upstream) *changefe
// The scheduler will be created lazily.
scheduler: nil,
barriers: newBarriers(),
<<<<<<< HEAD
feedStateManager: newFeedStateManager(),
upStream: upStream,
=======
feedStateManager: newFeedStateManager(up),
upstream: up,
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))

errCh: make(chan error, defaultErrChSize),
cancel: func() {},
Expand All @@ -136,9 +191,27 @@ func newChangefeed(id model.ChangeFeedID, upStream *upstream.Upstream) *changefe
}

func newChangefeed4Test(
<<<<<<< HEAD
id model.ChangeFeedID, upStream *upstream.Upstream,
newDDLPuller func(ctx cdcContext.Context, upStream *upstream.Upstream, startTs uint64) (DDLPuller, error),
newSink func() DDLSink,
=======
id model.ChangeFeedID, state *orchestrator.ChangefeedReactorState, up *upstream.Upstream,
newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error),
newDownstreamObserver func(
ctx context.Context, sinkURIStr string, replCfg *config.ReplicaConfig,
opts ...observer.NewObserverOption,
) (observer.Observer, error),
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
) *changefeed {
c := newChangefeed(id, upStream)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -470,9 +543,36 @@ LOOP:
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))

<<<<<<< HEAD
// init metrics
c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
=======
// create scheduler
cfg := *c.cfg
cfg.ChangefeedSettings = c.state.Info.Config.Scheduler
epoch := c.state.Info.Epoch
c.scheduler, err = c.newScheduler(ctx, c.upstream, epoch, &cfg)
if err != nil {
return errors.Trace(err)
}

c.initMetrics()

c.initialized = true
log.Info("changefeed initialized",
zap.String("namespace", c.state.ID.Namespace),
zap.String("changefeed", c.state.ID.ID),
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Stringer("info", c.state.Info))

return nil
}

func (c *changefeed) initMetrics() {
>>>>>>> 0867f80e5f (cdc: add changefeed epoch to prevent unexpected state (#8268))
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.
Expand Down
Loading