From 3f4a8a731c4cc7524ee69b621be2dd517170d776 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Wed, 30 Nov 2022 14:12:00 +0800 Subject: [PATCH] changefeed,kvClient, sink (ticdc): support bidirectional replication (#7630) close pingcap/tiflow#7736 --- cdc/api/v2/model.go | 3 + cdc/kv/client.go | 6 + cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 52 +++---- cdc/owner/changefeed.go | 20 +++ cdc/processor/pipeline/puller.go | 3 +- cdc/processor/pipeline/table_actor.go | 2 +- cdc/processor/processor.go | 7 + .../sourcemanager/puller/puller_wrapper.go | 1 + cdc/puller/ddl_puller.go | 8 +- cdc/puller/puller.go | 3 +- cdc/puller/puller_test.go | 3 +- cdc/sink/mysql/mysql.go | 2 +- cdc/sink/sink_test.go | 7 + cdc/sink/validator.go | 57 +++++++- cdc/sink/validator_test.go | 2 +- cdc/sinkv2/eventsink/txn/mysql/mysql.go | 45 ++++++- cdc/sinkv2/eventsink/txn/mysql/mysql_test.go | 40 +++--- go.mod | 2 +- go.sum | 4 +- pkg/config/config_test_data.go | 2 + pkg/config/replica_config.go | 17 ++- pkg/config/sink.go | 4 + pkg/pdutil/utils.go | 46 +++++++ pkg/sink/mysql/config.go | 6 +- pkg/sink/mysql/db_helper.go | 127 ++++++++++++------ pkg/txnutil/gc/testing.go | 13 ++ tests/integration_tests/bdr_mode/conf/cf.toml | 1 + .../bdr_mode/conf/diff_config.toml | 29 ++++ .../integration_tests/bdr_mode/data/down.sql | 16 +++ .../bdr_mode/data/finished.sql | 4 + .../integration_tests/bdr_mode/data/start.sql | 5 + tests/integration_tests/bdr_mode/data/up.sql | 17 +++ tests/integration_tests/bdr_mode/run.sh | 59 ++++++++ 34 files changed, 506 insertions(+), 111 deletions(-) create mode 100644 pkg/pdutil/utils.go create mode 100644 tests/integration_tests/bdr_mode/conf/cf.toml create mode 100644 tests/integration_tests/bdr_mode/conf/diff_config.toml create mode 100644 tests/integration_tests/bdr_mode/data/down.sql create mode 100644 tests/integration_tests/bdr_mode/data/finished.sql create mode 100644 tests/integration_tests/bdr_mode/data/start.sql create mode 100644 tests/integration_tests/bdr_mode/data/up.sql create mode 100644 tests/integration_tests/bdr_mode/run.sh diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index ec32508805e..0150a61e152 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -95,6 +95,7 @@ type ReplicaConfig struct { IgnoreIneligibleTable bool `json:"ignore_ineligible_table"` CheckGCSafePoint bool `json:"check_gc_safe_point"` EnableSyncPoint bool `json:"enable_sync_point"` + BDRMode bool `json:"bdr_mode"` SyncPointInterval time.Duration `json:"sync_point_interval"` SyncPointRetention time.Duration `json:"sync_point_retention"` Filter *FilterConfig `json:"filter"` @@ -113,6 +114,7 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig { res.EnableSyncPoint = c.EnableSyncPoint res.SyncPointInterval = c.SyncPointInterval res.SyncPointRetention = c.SyncPointRetention + res.BDRMode = c.BDRMode if c.Filter != nil { var mySQLReplicationRules *filter.MySQLReplicationRules @@ -221,6 +223,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { EnableSyncPoint: cloned.EnableSyncPoint, SyncPointInterval: cloned.SyncPointInterval, SyncPointRetention: cloned.SyncPointRetention, + BDRMode: cloned.BDRMode, } if cloned.Filter != nil { diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 7d6f0a5407a..128b118b051 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -234,6 +234,9 @@ type CDCClient struct { } ingressCommitTs model.Ts ingressResolvedTs model.Ts + // filterLoop is used in BDR mode, when it is true, tikv cdc component + // will filter data that are written by another TiCDC. + filterLoop bool } // NewCDCClient creates a CDCClient instance @@ -247,6 +250,7 @@ func NewCDCClient( changefeed model.ChangeFeedID, tableID model.TableID, tableName string, + filterLoop bool, ) (c CDCKVClient) { clusterID := pd.GetClusterID(ctx) @@ -268,6 +272,7 @@ func NewCDCClient( }{ counts: list.New(), }, + filterLoop: filterLoop, } return } @@ -674,6 +679,7 @@ func (s *eventFeedSession) requestRegionToStore( StartKey: sri.span.Start, EndKey: sri.span.End, ExtraOp: extraOp, + FilterLoop: s.client.filterLoop, } failpoint.Inject("kvClientPendingRegionDelay", nil) diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 30fece3afc5..3b31c125e0d 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -196,7 +196,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -290,7 +290,7 @@ func prepareBench(b *testing.B, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 3185a10dbad..38e91cb2529 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) { defer regionCache.Close() cli := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "") + config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false) require.NotNil(t, cli) } @@ -320,7 +320,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) // Take care of the eventCh, it's used to output resolvedTs event or kv event // It will stuck the normal routine eventCh := make(chan model.RegionFeedEvent, 50) @@ -422,7 +422,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -522,7 +522,7 @@ func TestHandleError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -681,7 +681,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup wg2.Add(1) @@ -748,7 +748,7 @@ func TestClusterIDMismatch(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup @@ -817,7 +817,7 @@ func testHandleFeedEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1278,7 +1278,7 @@ func TestStreamSendWithError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1390,7 +1390,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1523,7 +1523,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1732,7 +1732,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) @@ -1809,7 +1809,7 @@ func TestNoPendingRegionError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) @@ -1888,7 +1888,7 @@ func TestDropStaleRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2002,7 +2002,7 @@ func TestResolveLock(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2107,7 +2107,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2260,7 +2260,7 @@ func testEventAfterFeedStop(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2447,7 +2447,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2665,7 +2665,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2761,7 +2761,7 @@ func TestFailRegionReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2844,7 +2844,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2912,7 +2912,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2990,7 +2990,7 @@ func testKVClientForceReconnect(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3141,7 +3141,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { @@ -3258,7 +3258,7 @@ func TestEvTimeUpdate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3384,7 +3384,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3476,7 +3476,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "") + config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) baseAllocatedID := currentRequestID() diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 3b6b36c73f8..af81600e0b0 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -500,6 +500,17 @@ LOOP: cancelCtx, cancel := cdcContext.WithCancel(ctx) c.cancel = cancel + sourceID, err := pdutil.GetSourceID(ctx, c.upstream.PDClient) + if err != nil { + return errors.Trace(err) + } + c.state.Info.Config.Sink.TiDBSourceID = sourceID + log.Info("set source id", + zap.Uint64("sourceID", sourceID), + zap.String("namespace", c.id.Namespace), + zap.String("changefeed", c.id.ID), + ) + c.sink = c.newSink(c.id, c.state.Info, ctx.Throw) c.sink.run(cancelCtx) @@ -895,6 +906,15 @@ func (c *changefeed) asyncExecDDLEvent(ctx cdcContext.Context, zap.String("changefeed", c.id.ID), zap.Any("event", ddlEvent)) return true, nil } + + // check whether in bdr mode, if so, we need to skip all DDLs + if c.state.Info.Config.BDRMode { + log.Info("ignore the DDL event in BDR mode", + zap.String("changefeed", c.id.ID), + zap.Any("ddl", ddlEvent.Query)) + return true, nil + } + done, err = c.sink.emitDDLEvent(ctx, ddlEvent) if err != nil { return false, err diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index c98be3a6aeb..a6f2ce031d3 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -62,7 +62,7 @@ func (n *pullerNode) tableSpan() []regionspan.Span { func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext, up *upstream.Upstream, wg *errgroup.Group, - sorter *sorterNode, + sorter *sorterNode, filterLoop bool, ) error { n.wg = wg ctxC, cancel := context.WithCancel(ctx) @@ -84,6 +84,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext, n.changefeed, n.tableID, n.tableName, + filterLoop, ) n.wg.Go(func() error { ctx.Throw(errors.Trace(n.plr.Run(ctxC))) diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index f55174d7a63..66597fc99de 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -552,7 +552,7 @@ func (t *tableActor) RemainEvents() int64 { // for ut var startPuller = func(t *tableActor, ctx *actorNodeContext) error { - return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode) + return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode, t.replicaConfig.BDRMode) } var startSorter = func(t *tableActor, ctx *actorNodeContext) error { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 0787b60563b..cee6550d8a7 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -44,6 +44,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/orchestrator" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" @@ -859,6 +860,12 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { p.sendError(p.mg.Run(ctx)) }() + sourceID, err := pdutil.GetSourceID(ctx, p.upstream.PDClient) + if err != nil { + return errors.Trace(err) + } + p.changefeed.Info.Config.Sink.TiDBSourceID = sourceID + start := time.Now() conf := config.GetGlobalServerConfig() p.pullBasedSinking = conf.Debug.EnablePullBasedSink diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index c56e1e1ba09..3e6449070df 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -96,6 +96,7 @@ func (n *Wrapper) Start( n.changefeed, n.tableID, n.tableName, + false, ) n.wg.Add(1) go func() { diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 45b8a0ae006..61b5b8d8571 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -50,6 +50,10 @@ const ( ddlPullerStuckWarnDuration = 30 * time.Second // DDLPullerTableName is the fake table name for ddl puller DDLPullerTableName = "DDL_PULLER" + // ddl puller should never filter any DDL jobs even if + // the changefeed is in BDR mode, because the DDL jobs should + // be filtered before they are sent to the sink + ddLPullerFilterLoop = false ) // DDLJobPuller is used to pull ddl job from TiKV. @@ -481,7 +485,9 @@ func NewDDLJobPuller( regionspan.GetAllDDLSpan(), cfg, changefeed, - -1, DDLPullerTableName), + -1, DDLPullerTableName, + ddLPullerFilterLoop, + ), kvStorage: kvStorage, outputCh: make(chan *model.DDLJobEntry, defaultPullerOutputChanSize), metricDiscardedDDLCounter: discardedDDLCounter. diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 17d92f7c03d..b147ebbbc4f 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -88,6 +88,7 @@ func New(ctx context.Context, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, + filterLoop bool, ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -108,7 +109,7 @@ func New(ctx context.Context, WithLabelValues(changefeed.Namespace, changefeed.ID, pullerType) tsTracker := frontier.NewFrontier(0, metricMissedRegionCollectCounter, comparableSpans...) kvCli := kv.NewCDCKVClient( - ctx, pdCli, grpcPool, regionCache, pdClock, cfg, changefeed, tableID, tableName) + ctx, pdCli, grpcPool, regionCache, pdClock, cfg, changefeed, tableID, tableName, filterLoop) p := &pullerImpl{ kvCli: kvCli, kvStorage: tikvStorage, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index a305324344d..60407042b94 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -66,6 +66,7 @@ func newMockCDCKVClient( changefeed model.ChangeFeedID, tableID model.TableID, tableName string, + filterloop bool, ) kv.CDCKVClient { return &mockCDCKVClient{ expectations: make(chan model.RegionFeedEvent, 1024), @@ -130,7 +131,7 @@ func newPullerForTest( plr := New( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), checkpointTs, spans, config.GetDefaultServerConfig().KVClient, - model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test") + model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false) wg.Add(1) go func() { defer wg.Done() diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 5266b3f97dd..33e3b2cabb9 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -132,7 +132,7 @@ func NewMySQLSink( dsn.Params["writeTimeout"] = params.writeTimeout dsn.Params["timeout"] = params.dialTimeout - testDB, err := pmysql.CheckAndAdjustPassword(ctx, dsn, GetDBConnImpl) + testDB, err := pmysql.GetTestDB(ctx, dsn, GetDBConnImpl) if err != nil { return nil, err } diff --git a/cdc/sink/sink_test.go b/cdc/sink/sink_test.go index 3fd024be6c5..1be87c50972 100644 --- a/cdc/sink/sink_test.go +++ b/cdc/sink/sink_test.go @@ -38,4 +38,11 @@ func TestValidateSink(t *testing.T) { sinkURI = "blackhole://" err = Validate(ctx, sinkURI, replicateConfig) require.Nil(t, err) + + // test bdr mode error + replicateConfig.BDRMode = true + sinkURI = "blackhole://" + err = Validate(ctx, sinkURI, replicateConfig) + require.NotNil(t, err) + require.Contains(t, err.Error(), "sink uri scheme is not supported in BDR mode") } diff --git a/cdc/sink/validator.go b/cdc/sink/validator.go index 0d57219920c..b0a4142febe 100644 --- a/cdc/sink/validator.go +++ b/cdc/sink/validator.go @@ -22,6 +22,8 @@ import ( "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink" + pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/util" ) @@ -30,10 +32,18 @@ import ( // Maybe we should support the dry-run mode to validate sink. func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) error { var err error - if err = preCheckSinkURI(sinkURI); err != nil { + var uri *url.URL + if uri, err = preCheckSinkURI(sinkURI); err != nil { return err } + if cfg.BDRMode { + err = checkBDRMode(ctx, uri, cfg) + if err != nil { + return err + } + } + errCh := make(chan error) ctx, cancel := context.WithCancel(contextutil.PutRoleInCtx(ctx, util.RoleClient)) conf := config.GetGlobalServerConfig() @@ -74,14 +84,14 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) er // preCheckSinkURI do some pre-check for sink URI. // 1. Check if sink URI is empty. // 2. Check if we use correct IPv6 format in URI.(if needed) -func preCheckSinkURI(sinkURIStr string) error { +func preCheckSinkURI(sinkURIStr string) (*url.URL, error) { if sinkURIStr == "" { - return cerror.ErrSinkURIInvalid.GenWithStack("sink uri is empty") + return nil, cerror.ErrSinkURIInvalid.GenWithStack("sink uri is empty") } sinkURI, err := url.Parse(sinkURIStr) if err != nil { - return cerror.WrapError(cerror.ErrSinkURIInvalid, err) + return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) } // Check if we use the correct IPv6 address format. @@ -90,9 +100,46 @@ func preCheckSinkURI(sinkURIStr string) error { // Also notice the host name different from host(host+port). if util.IsIPv6Address(sinkURI.Hostname()) && !util.IsValidIPv6AddressFormatInURI(sinkURI.Host) { - return cerror.ErrSinkURIInvalid.GenWithStack("sink uri host is not valid IPv6 address, " + + return nil, cerror.ErrSinkURIInvalid.GenWithStack("sink uri host is not valid IPv6 address, " + "when using IPv6 address in URI, please use [ipv6-address]:port") } + return sinkURI, nil +} + +func checkBDRMode(ctx context.Context, sinkURI *url.URL, replicaConfig *config.ReplicaConfig) error { + maskSinkURI, err := util.MaskSinkURI(sinkURI.String()) + if err != nil { + return err + } + + if !sink.IsMySQLCompatibleScheme(sinkURI.Scheme) { + return cerror.ErrSinkURIInvalid. + GenWithStack("sink uri scheme is not supported in BDR mode, sink uri: %s", maskSinkURI) + } + cfg := pmysql.NewConfig() + id := model.DefaultChangeFeedID("sink-verify") + err = cfg.Apply(ctx, id, sinkURI, replicaConfig) + if err != nil { + return err + } + dsn, err := pmysql.GenBasicDSN(sinkURI, cfg) + if err != nil { + return err + } + testDB, err := pmysql.GetTestDB(ctx, dsn, pmysql.CreateMySQLDBConn) + if err != nil { + return err + } + defer testDB.Close() + supported, err := pmysql.CheckIfBDRModeIsSupported(ctx, testDB) + if err != nil { + return err + } + if !supported { + return cerror.ErrSinkURIInvalid. + GenWithStack("downstream database does not support BDR mode, "+ + "please check your config, sink uri: %s", maskSinkURI) + } return nil } diff --git a/cdc/sink/validator_test.go b/cdc/sink/validator_test.go index 8e40578a9ec..a176d21b557 100644 --- a/cdc/sink/validator_test.go +++ b/cdc/sink/validator_test.go @@ -78,7 +78,7 @@ func TestPreCheckSinkURI(t *testing.T) { test := tt t.Run(test.name, func(t *testing.T) { t.Parallel() - err := preCheckSinkURI(test.uri) + _, err := preCheckSinkURI(test.uri) if test.err != "" { require.Contains(t, err.Error(), test.err) } else { diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql.go b/cdc/sinkv2/eventsink/txn/mysql/mysql.go index b9afa4aec39..87111bcec40 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql.go @@ -93,6 +93,12 @@ func NewMySQLBackends( if err != nil { return nil, err } + + cfg.IsTiDB, err = pmysql.CheckIsTiDB(ctx, db) + if err != nil { + return nil, err + } + db.SetMaxIdleConns(cfg.WorkerCount) db.SetMaxOpenConns(cfg.WorkerCount) @@ -541,12 +547,28 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare } } + // we set write source for each txn, + // so we can use it to trace the data source + if err = s.setWriteSource(ctx, tx); err != nil { + err := logDMLTxnErr( + cerror.WrapError(cerror.ErrMySQLTxnError, err), + start, s.changefeed, + fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source", + s.cfg.SourceID), + dmls.rowCount, dmls.startTs) + if rbErr := tx.Rollback(); rbErr != nil { + if errors.Cause(rbErr) != context.Canceled { + log.Warn("failed to rollback txn", zap.Error(rbErr)) + } + } + return 0, err + } + if err = tx.Commit(); err != nil { return 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), start, s.changefeed, "COMMIT", dmls.rowCount, dmls.startTs) } - return dmls.rowCount, nil }) if err != nil { @@ -612,3 +634,24 @@ func getSQLErrCode(err error) (errors.ErrCode, bool) { func (s *mysqlBackend) setDMLMaxRetry(maxRetry uint64) { s.dmlMaxRetry = maxRetry } + +// setWriteSource sets write source for the transaction. +func (s *mysqlBackend) setWriteSource(ctx context.Context, txn *sql.Tx) error { + // we only set write source when donwstream is TiDB + if !s.cfg.IsTiDB { + return nil + } + // downstream is TiDB, set system variables. + // We should always try to set this variable, and ignore the error if + // downstream does not support this variable, it is by design. + query := fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source", s.cfg.SourceID) + _, err := txn.ExecContext(ctx, query) + if err != nil { + if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok && + mysqlErr.Number == mysql.ErrUnknownSystemVariable { + return nil + } + return err + } + return nil +} diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go index 2ab3f0f8507..48fe38f1f97 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go @@ -73,6 +73,16 @@ func newMySQLBackend( return backends[0], nil } +func newTestMockDB(t *testing.T) (db *sql.DB, mock sqlmock.Sqlmock) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + require.Nil(t, err) + return +} + func TestPrepareDML(t *testing.T) { t.Parallel() testCases := []struct { @@ -170,8 +180,7 @@ func TestAdjustSQLMode(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) + db, mock := newTestMockDB(t) mock.ExpectClose() return db, nil } @@ -262,8 +271,7 @@ func TestNewMySQLBackendExecDML(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) + db, mock := newTestMockDB(t) mock.ExpectBegin() mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). @@ -385,8 +393,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) + db, mock := newTestMockDB(t) mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). WithArgs(1, 2). @@ -457,8 +464,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) + db, mock := newTestMockDB(t) mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). WithArgs(1, 2). @@ -529,8 +535,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) + db, mock := newTestMockDB(t) for i := 0; i < 2; i++ { mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). @@ -593,8 +598,7 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) + db, mock := newTestMockDB(t) mock.ExpectBegin() mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`) VALUES (?)"). WithArgs(1). @@ -645,9 +649,8 @@ func TestNewMySQLBackend(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + db, mock := newTestMockDB(t) mock.ExpectClose() - require.Nil(t, err) return db, nil } @@ -681,9 +684,8 @@ func TestNewMySQLBackendWithIPv6Address(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + db, mock := newTestMockDB(t) mock.ExpectClose() - require.Nil(t, err) return db, nil } @@ -713,9 +715,8 @@ func TestGBKSupported(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + db, mock := newTestMockDB(t) mock.ExpectClose() - require.Nil(t, err) return db, nil } @@ -774,8 +775,7 @@ func TestMySQLSinkExecDMLError(t *testing.T) { } // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) + db, mock := newTestMockDB(t) mock.ExpectBegin() mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?)").WillDelayFor(1 * time.Second). WillReturnError(&dmysql.MySQLError{Number: mysql.ErrNoSuchTable}) diff --git a/go.mod b/go.mod index 5bdbd436496..07750f56db8 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 - github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 + github.com/pingcap/kvproto v0.0.0-20221121044741-fdbd9fa2b8f4 github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/tidb v1.1.0-beta.0.20221125090958-5775995ea17c github.com/pingcap/tidb-tools v6.1.1-0.20220715000306-1d2f00da8c3e+incompatible diff --git a/go.sum b/go.sum index ab7067e1e47..436a2c674b4 100644 --- a/go.sum +++ b/go.sum @@ -998,8 +998,8 @@ github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20220328072018-6e75c12dbd73/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20220429093005-2839fa5a1ed6/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM= -github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20221121044741-fdbd9fa2b8f4 h1:oqhwv/XLnghhs46+A/cFnRtICzY2drYsH1SZ51uxhZs= +github.com/pingcap/kvproto v0.0.0-20221121044741-fdbd9fa2b8f4/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 73930ebf83b..6e858051a7f 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -150,6 +150,7 @@ const ( "force-replicate": true, "check-gc-safe-point": true, "enable-sync-point": false, + "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, "filter": { @@ -204,6 +205,7 @@ const ( "force-replicate": true, "check-gc-safe-point": true, "enable-sync-point": false, + "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, "filter": { diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 90eaa8ec59b..2f3bfccc494 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -57,6 +57,7 @@ var defaultReplicaConfig = &ReplicaConfig{ Terminator: CRLF, DateSeparator: DateSeparatorNone.String(), EnablePartitionSeparator: false, + TiDBSourceID: 1, }, Consistent: &ConsistentConfig{ Level: "none", @@ -87,12 +88,16 @@ func (d *Duration) UnmarshalText(text []byte) error { type ReplicaConfig replicaConfig type replicaConfig struct { - MemoryQuota uint64 `toml:"memory-quota" json:"memory-quota"` - CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` - EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` - ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` - CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` - EnableSyncPoint bool `toml:"enable-sync-point" json:"enable-sync-point"` + MemoryQuota uint64 `toml:"memory-quota" json:"memory-quota"` + CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` + EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` + ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` + CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` + EnableSyncPoint bool `toml:"enable-sync-point" json:"enable-sync-point"` + // BDR(Bidirectional Replication) is a feature that allows users to + // replicate data of same tables from TiDB-1 to TiDB-2 and vice versa. + // This feature is only available for TiDB. + BDRMode bool `toml:"bdr-mode" json:"bdr-mode"` SyncPointInterval time.Duration `toml:"sync-point-interval" json:"sync-point-interval"` SyncPointRetention time.Duration `toml:"sync-point-retention" json:"sync-point-retention"` Filter *FilterConfig `toml:"filter" json:"filter"` diff --git a/pkg/config/sink.go b/pkg/config/sink.go index a53afde4c41..5a26a8c29bd 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -90,6 +90,10 @@ type SinkConfig struct { Terminator string `toml:"terminator" json:"terminator"` DateSeparator string `toml:"date-separator" json:"date-separator"` EnablePartitionSeparator bool `toml:"enable-partition-separator" json:"enable-partition-separator"` + // TiDBSourceID is the source ID of the upstream TiDB, + // which is used to set the `tidb_cdc_write_source` session variable. + // Note: This field is only used internally and only used in the MySQL sink. + TiDBSourceID uint64 `toml:"-" json:"-"` } // CSVConfig defines a series of configuration items for csv codec. diff --git a/pkg/pdutil/utils.go b/pkg/pdutil/utils.go new file mode 100644 index 00000000000..412707d5b79 --- /dev/null +++ b/pkg/pdutil/utils.go @@ -0,0 +1,46 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdutil + +import ( + "context" + "strconv" + + cerror "github.com/pingcap/tiflow/pkg/errors" + pd "github.com/tikv/pd/client" +) + +const sourceIDName = "source_id" + +// GetSourceID returns the source ID of the TiDB cluster that PD is belonged to. +func GetSourceID(ctx context.Context, pdClient pd.Client) (uint64, error) { + // only nil in test case + if pdClient == nil { + return 1, nil + } + // The default value of sourceID is 1, + // which means the sourceID is not changed by user. + sourceID := uint64(1) + sourceIDConfig, err := pdClient.LoadGlobalConfig(ctx, []string{sourceIDName}) + if err != nil { + return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + if len(sourceIDConfig) != 0 && sourceIDConfig[0].Value != "" { + sourceID, err = strconv.ParseUint(sourceIDConfig[0].Value, 10, 64) + if err != nil { + return 0, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) + } + } + return sourceID, nil +} diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index b7bb17f0261..1e91b4cf546 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -80,7 +80,10 @@ type Config struct { TLS string ForceReplicate bool EnableOldValue bool - BatchDMLEnable bool + + IsTiDB bool // IsTiDB is true if the downstream is TiDB + SourceID uint64 + BatchDMLEnable bool } // NewConfig returns the default mysql backend config. @@ -150,6 +153,7 @@ func (c *Config) Apply( } c.EnableOldValue = replicaConfig.EnableOldValue c.ForceReplicate = replicaConfig.ForceReplicate + c.SourceID = replicaConfig.Sink.TiDBSourceID return nil } diff --git a/pkg/sink/mysql/db_helper.go b/pkg/sink/mysql/db_helper.go index bdb5ad95fd1..9ccc70af824 100644 --- a/pkg/sink/mysql/db_helper.go +++ b/pkg/sink/mysql/db_helper.go @@ -55,40 +55,13 @@ func CreateMySQLDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) { func GenerateDSN(ctx context.Context, sinkURI *url.URL, cfg *Config, dbConnFactory Factory) (dsnStr string, err error) { // dsn format of the driver: // [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] - username := sinkURI.User.Username() - if username == "" { - username = "root" - } - password, _ := sinkURI.User.Password() - - hostName := sinkURI.Hostname() - port := sinkURI.Port() - if port == "" { - port = "4000" - } - - // This will handle the IPv6 address format. - var dsn *dmysql.Config - host := net.JoinHostPort(hostName, port) - dsnStr = fmt.Sprintf("%s:%s@tcp(%s)/%s", username, password, host, cfg.TLS) - if dsn, err = dmysql.ParseDSN(dsnStr); err != nil { - return - } - - // create test db used for parameter detection - // Refer https://github.com/go-sql-driver/mysql#parameters - if dsn.Params == nil { - dsn.Params = make(map[string]string, 1) - } - if cfg.Timezone != "" { - dsn.Params["time_zone"] = cfg.Timezone + dsn, err := GenBasicDSN(sinkURI, cfg) + if err != nil { + return "", err } - dsn.Params["readTimeout"] = cfg.ReadTimeout - dsn.Params["writeTimeout"] = cfg.WriteTimeout - dsn.Params["timeout"] = cfg.DialTimeout var testDB *sql.DB - testDB, err = CheckAndAdjustPassword(ctx, dsn, dbConnFactory) + testDB, err = GetTestDB(ctx, dsn, dbConnFactory) if err != nil { return } @@ -120,7 +93,7 @@ func GenerateDSN(ctx context.Context, sinkURI *url.URL, cfg *Config, dbConnFacto if !gbkSupported { log.Warn("GBK charset is not supported by the downstream. "+ "Some types of DDLs may fail to execute", - zap.String("hostname", hostName), zap.String("port", port)) + zap.String("host", dsn.Addr)) } return @@ -242,9 +215,9 @@ func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultVal return "", nil } -// CheckAndAdjustPassword checks and adjusts the password of the given DSN, +// GetTestDB checks and adjusts the password of the given DSN, // it will return a DB instance opened with the adjusted password. -func CheckAndAdjustPassword(ctx context.Context, dbConfig *dmysql.Config, dbConnFactory Factory) (*sql.DB, error) { +func GetTestDB(ctx context.Context, dbConfig *dmysql.Config, dbConnFactory Factory) (*sql.DB, error) { password := dbConfig.Passwd if dbConnFactory == nil { dbConnFactory = CreateMySQLDBConn @@ -256,13 +229,87 @@ func CheckAndAdjustPassword(ctx context.Context, dbConfig *dmysql.Config, dbConn if dePassword, decodeErr := base64.StdEncoding.DecodeString(password); decodeErr == nil && string(dePassword) != password { dbConfig.Passwd = string(dePassword) testDB, err = dbConnFactory(ctx, dbConfig.FormatDSN()) - if err != nil { - return testDB, err - } } - } else { - return nil, err } } - return testDB, nil + return testDB, err +} + +// GenBasicDSN generates a basic DSN from the given config. +func GenBasicDSN(sinkURI *url.URL, cfg *Config) (*dmysql.Config, error) { + // dsn format of the driver: + // [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] + username := sinkURI.User.Username() + if username == "" { + username = "root" + } + password, _ := sinkURI.User.Password() + + hostName := sinkURI.Hostname() + port := sinkURI.Port() + if port == "" { + port = "4000" + } + + // This will handle the IPv6 address format. + var dsn *dmysql.Config + var err error + host := net.JoinHostPort(hostName, port) + dsnStr := fmt.Sprintf("%s:%s@tcp(%s)/%s", username, password, host, cfg.TLS) + if dsn, err = dmysql.ParseDSN(dsnStr); err != nil { + return nil, errors.Trace(err) + } + + // create test db used for parameter detection + // Refer https://github.com/go-sql-driver/mysql#parameters + if dsn.Params == nil { + dsn.Params = make(map[string]string, 1) + } + if cfg.Timezone != "" { + dsn.Params["time_zone"] = cfg.Timezone + } + dsn.Params["readTimeout"] = cfg.ReadTimeout + dsn.Params["writeTimeout"] = cfg.WriteTimeout + dsn.Params["timeout"] = cfg.DialTimeout + return dsn, nil +} + +// CheckIfBDRModeIsSupported checks if the downstream supports BDR mode. +func CheckIfBDRModeIsSupported(ctx context.Context, db *sql.DB) (bool, error) { + isTiDB, err := CheckIsTiDB(ctx, db) + if err != nil || !isTiDB { + return false, err + } + testSourceID := 1 + // downstream is TiDB, set system variables. + // We should always try to set this variable, and ignore the error if + // downstream does not support this variable, it is by design. + query := fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source", testSourceID) + _, err = db.ExecContext(ctx, query) + if err != nil { + if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok && + mysqlErr.Number == tmysql.ErrUnknownSystemVariable { + return false, nil + } + return false, err + } + return true, nil +} + +// CheckIsTiDB checks if the downstream is TiDB. +func CheckIsTiDB(ctx context.Context, db *sql.DB) (bool, error) { + var tidbVer string + // check if downstream is TiDB + row := db.QueryRowContext(ctx, "select tidb_version()") + err := row.Scan(&tidbVer) + if err != nil { + log.Error("check tidb version error", zap.Error(err)) + // downstream is not TiDB, do nothing + if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok && (mysqlErr.Number == tmysql.ErrNoDB || + mysqlErr.Number == tmysql.ErrSpDoesNotExist) { + return false, nil + } + return false, errors.Trace(err) + } + return true, nil } diff --git a/pkg/txnutil/gc/testing.go b/pkg/txnutil/gc/testing.go index 8eb9c92cfc7..86cc6ff84f7 100644 --- a/pkg/txnutil/gc/testing.go +++ b/pkg/txnutil/gc/testing.go @@ -56,3 +56,16 @@ func (m *MockPDClient) GetAllStores( ) ([]*metapb.Store, error) { return m.GetAllStoresFunc(ctx, opts...) } + +// LoadGlobalConfig loads global config from PD. +func (m *MockPDClient) LoadGlobalConfig( + ctx context.Context, names []string, +) ([]pd.GlobalConfigItem, error) { + return []pd.GlobalConfigItem{ + { + Name: "source_id", + Value: "1", + Error: nil, + }, + }, nil +} diff --git a/tests/integration_tests/bdr_mode/conf/cf.toml b/tests/integration_tests/bdr_mode/conf/cf.toml new file mode 100644 index 00000000000..b6929758e72 --- /dev/null +++ b/tests/integration_tests/bdr_mode/conf/cf.toml @@ -0,0 +1 @@ +bdr-mode = true diff --git a/tests/integration_tests/bdr_mode/conf/diff_config.toml b/tests/integration_tests/bdr_mode/conf/diff_config.toml new file mode 100644 index 00000000000..9fd677fd534 --- /dev/null +++ b/tests/integration_tests/bdr_mode/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/big_txn/sync_diff/output" + +source-instances = ["tidb"] + +target-instance = "mysql" + +target-check-tables = ["bdr_mode.*"] + +[data-sources] +[data-sources.tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.mysql] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/bdr_mode/data/down.sql b/tests/integration_tests/bdr_mode/data/down.sql new file mode 100644 index 00000000000..1bd6e36fa97 --- /dev/null +++ b/tests/integration_tests/bdr_mode/data/down.sql @@ -0,0 +1,16 @@ +use `bdr_mode`; + +begin; +insert into `t1` values (2, '2'), (4, '4'), (6, '6'), (8, '8'), (10, '10'); +commit; + +begin; +update `t1` set `name` = '22' where `id` = 2; +delete from `t1` where `id` = 4; +update `t1` set `name` = '66' where `id` = 6; +delete from `t1` where `id` = 8; +commit; + +begin; +insert into `t1` values (1, '1'), (3, '3'), (5, '5'), (7, '7'), (9, '9'); +rollback; diff --git a/tests/integration_tests/bdr_mode/data/finished.sql b/tests/integration_tests/bdr_mode/data/finished.sql new file mode 100644 index 00000000000..25ef0810392 --- /dev/null +++ b/tests/integration_tests/bdr_mode/data/finished.sql @@ -0,0 +1,4 @@ +use `bdr_mode`; + +create table `finish_mark` (id int primary key, name varchar(20)); + diff --git a/tests/integration_tests/bdr_mode/data/start.sql b/tests/integration_tests/bdr_mode/data/start.sql new file mode 100644 index 00000000000..2d5007efbad --- /dev/null +++ b/tests/integration_tests/bdr_mode/data/start.sql @@ -0,0 +1,5 @@ +drop database if exists `bdr_mode`; +create database `bdr_mode`; +use `bdr_mode`; + +create table `t1` (id int primary key, name varchar(20)); diff --git a/tests/integration_tests/bdr_mode/data/up.sql b/tests/integration_tests/bdr_mode/data/up.sql new file mode 100644 index 00000000000..9f5e36c9db8 --- /dev/null +++ b/tests/integration_tests/bdr_mode/data/up.sql @@ -0,0 +1,17 @@ +use `bdr_mode`; + +begin; +insert into `t1` values (1, '1'), (3, '3'), (5, '5'), (7, '7'), (9, '9'); +commit; + +begin; +update `t1` set `name` = '11' where `id` = 1; +delete from `t1` where `id` = 3; +update `t1` set `name` = '55' where `id` = 5; +delete from `t1` where `id` = 7; +commit; + +begin; +insert into `t1` values (2, '2'), (4, '4'), (6, '6'), (8, '8'), (10, '10'); +rollback; + diff --git a/tests/integration_tests/bdr_mode/run.sh b/tests/integration_tests/bdr_mode/run.sh new file mode 100644 index 00000000000..2b4066b4a04 --- /dev/null +++ b/tests/integration_tests/bdr_mode/run.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # BDR mode only supports mysql sink + if [ "$SINK_TYPE" == "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # cdc server 1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + # cdc server 2 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8400" --pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" + + SINK_URI_1="mysql://root@127.0.0.1:3306" + SINK_URI_2="mysql://root@127.0.0.1:4000" + + # down -> up + run_cdc_cli changefeed create --sink-uri="$SINK_URI_1" -c "test-1" --config="$CUR/conf/cf.toml" + # up -> down + run_cdc_cli changefeed create --sink-uri="$SINK_URI_2" -c "test-2" --server "http://127.0.0.1:8400" --config="$CUR/conf/cf.toml" + + run_sql_file $CUR/data/start.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/start.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + check_table_exists "bdr_mode.t1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "bdr_mode.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_sql_file $CUR/data/up.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/down.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_sql_file $CUR/data/finished.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/finished.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + check_table_exists "bdr_mode.finish_mark" ${UP_TIDB_HOST} ${UP_TIDB_PORT} 60 + check_table_exists "bdr_mode.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"