Skip to content

Commit

Permalink
changefeed,kvClient, sink (ticdc): support bidirectional replication (#…
Browse files Browse the repository at this point in the history
…7630)

close #7736
  • Loading branch information
asddongmen authored Nov 30, 2022
1 parent 197bc0e commit 3f4a8a7
Show file tree
Hide file tree
Showing 34 changed files with 506 additions and 111 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type ReplicaConfig struct {
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint bool `json:"enable_sync_point"`
BDRMode bool `json:"bdr_mode"`
SyncPointInterval time.Duration `json:"sync_point_interval"`
SyncPointRetention time.Duration `json:"sync_point_retention"`
Filter *FilterConfig `json:"filter"`
Expand All @@ -113,6 +114,7 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
res.EnableSyncPoint = c.EnableSyncPoint
res.SyncPointInterval = c.SyncPointInterval
res.SyncPointRetention = c.SyncPointRetention
res.BDRMode = c.BDRMode

if c.Filter != nil {
var mySQLReplicationRules *filter.MySQLReplicationRules
Expand Down Expand Up @@ -221,6 +223,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnableSyncPoint: cloned.EnableSyncPoint,
SyncPointInterval: cloned.SyncPointInterval,
SyncPointRetention: cloned.SyncPointRetention,
BDRMode: cloned.BDRMode,
}

if cloned.Filter != nil {
Expand Down
6 changes: 6 additions & 0 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ type CDCClient struct {
}
ingressCommitTs model.Ts
ingressResolvedTs model.Ts
// filterLoop is used in BDR mode, when it is true, tikv cdc component
// will filter data that are written by another TiCDC.
filterLoop bool
}

// NewCDCClient creates a CDCClient instance
Expand All @@ -247,6 +250,7 @@ func NewCDCClient(
changefeed model.ChangeFeedID,
tableID model.TableID,
tableName string,
filterLoop bool,
) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

Expand All @@ -268,6 +272,7 @@ func NewCDCClient(
}{
counts: list.New(),
},
filterLoop: filterLoop,
}
return
}
Expand Down Expand Up @@ -674,6 +679,7 @@ func (s *eventFeedSession) requestRegionToStore(
StartKey: sri.span.Start,
EndKey: sri.span.End,
ExtraOp: extraOp,
FilterLoop: s.client.filterLoop,
}

failpoint.Inject("kvClientPendingRegionDelay", nil)
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -290,7 +290,7 @@ func prepareBench(b *testing.B, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
52 changes: 26 additions & 26 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) {
defer regionCache.Close()
cli := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "")
config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false)
require.NotNil(t, cli)
}

Expand Down Expand Up @@ -320,7 +320,7 @@ func TestConnectOfflineTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
// Take care of the eventCh, it's used to output resolvedTs event or kv event
// It will stuck the normal routine
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestRecvLargeMessageSize(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestHandleError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -681,7 +681,7 @@ func TestCompatibilityWithSameConn(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var wg2 sync.WaitGroup
wg2.Add(1)
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestClusterIDMismatch(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

var wg2 sync.WaitGroup
Expand Down Expand Up @@ -817,7 +817,7 @@ func testHandleFeedEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1278,7 +1278,7 @@ func TestStreamSendWithError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1390,7 +1390,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1523,7 +1523,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1732,7 +1732,7 @@ func TestIncompatibleTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
// NOTICE: eventCh may block the main logic of EventFeed
eventCh := make(chan model.RegionFeedEvent, 128)
wg.Add(1)
Expand Down Expand Up @@ -1809,7 +1809,7 @@ func TestNoPendingRegionError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

wg.Add(1)
Expand Down Expand Up @@ -1888,7 +1888,7 @@ func TestDropStaleRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2002,7 +2002,7 @@ func TestResolveLock(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2107,7 +2107,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var clientWg sync.WaitGroup
clientWg.Add(1)
Expand Down Expand Up @@ -2260,7 +2260,7 @@ func testEventAfterFeedStop(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2447,7 +2447,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2665,7 +2665,7 @@ func TestResolveLockNoCandidate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2761,7 +2761,7 @@ func TestFailRegionReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2844,7 +2844,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2912,7 +2912,7 @@ func testClientErrNoPendingRegion(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2990,7 +2990,7 @@ func testKVClientForceReconnect(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3141,7 +3141,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 100)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3258,7 +3258,7 @@ func TestEvTimeUpdate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3384,7 +3384,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3476,7 +3476,7 @@ func TestPrewriteNotMatchError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "")
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
baseAllocatedID := currentRequestID()

Expand Down
20 changes: 20 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,17 @@ LOOP:
cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel

sourceID, err := pdutil.GetSourceID(ctx, c.upstream.PDClient)
if err != nil {
return errors.Trace(err)
}
c.state.Info.Config.Sink.TiDBSourceID = sourceID
log.Info("set source id",
zap.Uint64("sourceID", sourceID),
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
)

c.sink = c.newSink(c.id, c.state.Info, ctx.Throw)
c.sink.run(cancelCtx)

Expand Down Expand Up @@ -895,6 +906,15 @@ func (c *changefeed) asyncExecDDLEvent(ctx cdcContext.Context,
zap.String("changefeed", c.id.ID), zap.Any("event", ddlEvent))
return true, nil
}

// check whether in bdr mode, if so, we need to skip all DDLs
if c.state.Info.Config.BDRMode {
log.Info("ignore the DDL event in BDR mode",
zap.String("changefeed", c.id.ID),
zap.Any("ddl", ddlEvent.Query))
return true, nil
}

done, err = c.sink.emitDDLEvent(ctx, ddlEvent)
if err != nil {
return false, err
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (n *pullerNode) tableSpan() []regionspan.Span {

func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
up *upstream.Upstream, wg *errgroup.Group,
sorter *sorterNode,
sorter *sorterNode, filterLoop bool,
) error {
n.wg = wg
ctxC, cancel := context.WithCancel(ctx)
Expand All @@ -84,6 +84,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
n.changefeed,
n.tableID,
n.tableName,
filterLoop,
)
n.wg.Go(func() error {
ctx.Throw(errors.Trace(n.plr.Run(ctxC)))
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (t *tableActor) RemainEvents() int64 {

// for ut
var startPuller = func(t *tableActor, ctx *actorNodeContext) error {
return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode)
return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode, t.replicaConfig.BDRMode)
}

var startSorter = func(t *tableActor, ctx *actorNodeContext) error {
Expand Down
7 changes: 7 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -859,6 +860,12 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
p.sendError(p.mg.Run(ctx))
}()

sourceID, err := pdutil.GetSourceID(ctx, p.upstream.PDClient)
if err != nil {
return errors.Trace(err)
}
p.changefeed.Info.Config.Sink.TiDBSourceID = sourceID

start := time.Now()
conf := config.GetGlobalServerConfig()
p.pullBasedSinking = conf.Debug.EnablePullBasedSink
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/sourcemanager/puller/puller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (n *Wrapper) Start(
n.changefeed,
n.tableID,
n.tableName,
false,
)
n.wg.Add(1)
go func() {
Expand Down
Loading

0 comments on commit 3f4a8a7

Please sign in to comment.