From 5730a8f34fc9dab5d54eea4b81b5db59fbdd592a Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 14 Nov 2023 09:44:14 +0800 Subject: [PATCH] This is an automated cherry-pick of #10080 Signed-off-by: ti-chi-bot --- cdc/owner/ddl_manager.go | 35 +++++++++++++++++++ .../integration_tests/bdr_mode/data/start.sql | 1 + tests/integration_tests/bdr_mode/data/up.sql | 5 +++ 3 files changed, 41 insertions(+) diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index f825465cc55..a61c6f58838 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -219,6 +219,7 @@ func (m *ddlManager) tick( } for _, event := range events { +<<<<<<< HEAD // If changefeed is in BDRMode, skip ddl. if m.BDRMode { log.Info("changefeed is in BDRMode, skip a ddl event", @@ -233,6 +234,22 @@ func (m *ddlManager) tick( log.Warn("ignore the DDL event of ineligible table", zap.String("changefeed", m.changfeedID.ID), zap.Any("ddl", event)) continue +======= + // TODO: find a better place to do this check + // check if the ddl event is belong to an ineligible table. + // If so, we should ignore it. + if !filter.IsSchemaDDL(event.Type) { + ignore, err := m.schema. + IsIneligibleTable(ctx, event.TableInfo.TableName.TableID, event.CommitTs) + if err != nil { + return nil, nil, errors.Trace(err) + } + if ignore { + log.Warn("ignore the DDL event of ineligible table", + zap.String("changefeed", m.changfeedID.ID), zap.Any("ddl", event)) + continue + } +>>>>>>> c0c6e94fb6 (ddl_manager (ticdc): fix in bdr mode cdc can not replicate a table's dmls after drop and re-create it (#10080)) } tableName := event.TableInfo.TableName // Add all valid DDL events to the pendingDDLs. @@ -337,6 +354,24 @@ func (m *ddlManager) executeDDL(ctx context.Context) error { if m.executingDDL == nil { return nil } + + // If changefeed is in BDRMode, skip ddl. + if m.BDRMode { + log.Info("changefeed is in BDRMode, skip a ddl event", + zap.String("namespace", m.changfeedID.Namespace), + zap.String("ID", m.changfeedID.ID), + zap.Any("ddlEvent", m.executingDDL)) + tableName := m.executingDDL.TableInfo.TableName + // Set it to nil first to accelerate GC. + m.pendingDDLs[tableName][0] = nil + m.pendingDDLs[tableName] = m.pendingDDLs[tableName][1:] + m.schema.DoGC(m.executingDDL.CommitTs - 1) + m.justSentDDL = m.executingDDL + m.executingDDL = nil + m.cleanCache() + return nil + } + failpoint.Inject("ExecuteNotDone", func() { // This ddl will never finish executing. // It is used to test the logic that a ddl only block the related table diff --git a/tests/integration_tests/bdr_mode/data/start.sql b/tests/integration_tests/bdr_mode/data/start.sql index 2d5007efbad..33e6cc75cba 100644 --- a/tests/integration_tests/bdr_mode/data/start.sql +++ b/tests/integration_tests/bdr_mode/data/start.sql @@ -3,3 +3,4 @@ create database `bdr_mode`; use `bdr_mode`; create table `t1` (id int primary key, name varchar(20)); +create table `t2` (id int primary key, name varchar(20)); \ No newline at end of file diff --git a/tests/integration_tests/bdr_mode/data/up.sql b/tests/integration_tests/bdr_mode/data/up.sql index dd926b15515..4e36df6157b 100644 --- a/tests/integration_tests/bdr_mode/data/up.sql +++ b/tests/integration_tests/bdr_mode/data/up.sql @@ -16,3 +16,8 @@ insert into `t1` values (22, '22'), (44, '44'), (66, '66'), (88, '88'), (108, '1 rollback; insert into `t1` values (100, '100'), (300, '300'), (500, '500'), (700, '700'), (900, '900'); + +drop table `t2`; +create table `t2` (id int primary key, name varchar(20)); +insert into `t2` values (1, '1'), (3, '3'), (5, '5'), (7, '7'), (9, '9'); +insert into `t2` values (2, '2'), (4, '4'), (6, '6'), (8, '8'), (10, '10'); \ No newline at end of file