diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 2ee838cb178..a152d2382a4 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -15,13 +15,15 @@ package owner import ( "context" + "strings" "sync" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/format" timodel "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo" cdcContext "github.com/pingcap/tiflow/pkg/context" @@ -442,7 +444,11 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don if err != nil { return false, errors.Trace(err) } - ddlEvent.Query = binloginfo.AddSpecialComment(ddlEvent.Query) + ddlEvent.Query, err = addSpecialComment(ddlEvent.Query) + if err != nil { + return false, errors.Trace(err) + } + c.ddlEventCache = ddlEvent if c.redoManager.Enabled() { err = c.redoManager.EmitDDLEvent(ctx, ddlEvent) @@ -506,3 +512,27 @@ func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) { func (c *changefeed) Close(ctx context.Context) { c.releaseResources(ctx) } + +// addSpecialComment translate tidb feature to comment +func addSpecialComment(ddlQuery string) (string, error) { + stms, _, err := parser.New().ParseSQL(ddlQuery) + if err != nil { + return "", errors.Trace(err) + } + if len(stms) != 1 { + log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery)) + } + var sb strings.Builder + // translate TiDB feature to special comment + restoreFlags := format.RestoreTiDBSpecialComment + // escape the keyword + restoreFlags |= format.RestoreNameBackQuotes + // upper case keyword + restoreFlags |= format.RestoreKeyWordUppercase + // wrap string with single quote + restoreFlags |= format.RestoreStringSingleQuotes + if err = stms[0].Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { + return "", errors.Trace(err) + } + return sb.String(), nil +} diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 2657ce5fa8f..ec927e94a45 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -270,7 +270,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create database test1") + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE DATABASE `test1`") // executing the ddl finished mockAsyncSink.ddlDone = true @@ -285,7 +285,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)") + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE TABLE `test1`.`test1` (`id` INT PRIMARY KEY)") // executing the ddl finished mockAsyncSink.ddlDone = true @@ -352,3 +352,144 @@ func (s *changefeedSuite) TestFinished(c *check.C) { c.Assert(state.Status.CheckpointTs, check.Equals, state.Info.TargetTs) c.Assert(state.Info.State, check.Equals, model.StateFinished) } + +func (s *changefeedSuite) TestAddSpecialComment(c *check.C) { + defer testleak.AfterTest(c)() + testCase := []struct { + input string + result string + }{ + { + "create table t1 (id int ) shard_row_id_bits=2;", + "CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */", + }, + { + "create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;", + "CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */", + }, + { + "create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;", + "CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */", + }, + { + "create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;", + "CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */", + }, + { + "create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;", + "CREATE TABLE `t1` (`id` INT) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */", + }, + { + "create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;", + "CREATE TABLE `t6` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */", + }, + { + "create table t1 (id int primary key auto_random(2));", + "CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)", + }, + { + "create table t1 (id int primary key auto_random);", + "CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)", + }, + { + "create table t1 (id int auto_random ( 4 ) primary key);", + "CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)", + }, + { + "create table t1 (id int auto_random ( 4 ) primary key);", + "CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)", + }, + { + "create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;", + "CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */", + }, + { + "create table t1 (id int auto_random primary key) auto_random_base = 50;", + "CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */", + }, + { + "create table t1 (id int auto_increment key) auto_id_cache 100;", + "CREATE TABLE `t1` (`id` INT AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */", + }, + { + "create table t1 (id int auto_increment unique) auto_id_cache 10;", + "CREATE TABLE `t1` (`id` INT AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */", + }, + { + "create table t1 (id int) auto_id_cache = 5;", + "CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + }, + { + "create table t1 (id int) auto_id_cache=5;", + "CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + }, + { + "create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;", + "CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */", + }, + { + "create table t1 (id int, a varchar(255), primary key (a, b) clustered);", + "CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] CLUSTERED */)", + }, + { + "create table t1(id int, v int, primary key(a) clustered);", + "CREATE TABLE `t1` (`id` INT,`v` INT,PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */)", + }, + { + "create table t1(id int primary key clustered, v int);", + "CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![clustered_index] CLUSTERED */,`v` INT)", + }, + { + "alter table t add primary key(a) clustered;", + "ALTER TABLE `t` ADD PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */", + }, + { + "create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);", + "CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)", + }, + { + "create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);", + "CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)", + }, + { + "create table clustered_test(id int)", + "CREATE TABLE `clustered_test` (`id` INT)", + }, + { + "create database clustered_test", + "CREATE DATABASE `clustered_test`", + }, + { + "create database clustered", + "CREATE DATABASE `clustered`", + }, + { + "create table clustered (id int)", + "CREATE TABLE `clustered` (`id` INT)", + }, + { + "create table t1 (id int, a varchar(255) key clustered);", + "CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)", + }, + { + "alter table t force auto_increment = 12;", + "ALTER TABLE `t` /*T![force_inc] FORCE */ AUTO_INCREMENT = 12", + }, + { + "alter table t force, auto_increment = 12;", + "ALTER TABLE `t` FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12", + }, + { + "create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */", + "CREATE TABLE `cdc_test` (`id` VARCHAR(10) PRIMARY KEY,`c1` VARCHAR(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_BIN /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */", + }, + } + for _, ca := range testCase { + re, err := addSpecialComment(ca.input) + c.Check(err, check.IsNil) + c.Check(re, check.Equals, ca.result) + } + c.Assert(func() { + _, _ = addSpecialComment("alter table t force, auto_increment = 12;alter table t force, auto_increment = 12;") + }, check.Panics, "invalid ddlQuery statement size") +} diff --git a/tests/integration_tests/ddl_reentrant/run.sh b/tests/integration_tests/ddl_reentrant/run.sh index 6e788d9dcf6..7867179066d 100644 --- a/tests/integration_tests/ddl_reentrant/run.sh +++ b/tests/integration_tests/ddl_reentrant/run.sh @@ -8,26 +8,27 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 -ddls=("create database ddl_reentrant" false - "create table ddl_reentrant.t1 (id int primary key, id2 int not null, a varchar(10) not null, unique a(a), unique id2(id2))" false - "alter table ddl_reentrant.t1 add column b int" false - "alter table ddl_reentrant.t1 drop column b" false - "alter table ddl_reentrant.t1 add key index_a(a)" false - "alter table ddl_reentrant.t1 drop index index_a" false - "truncate table ddl_reentrant.t1" true - "alter table ddl_reentrant.t1 modify a varchar(20)" true - "rename table ddl_reentrant.t1 to ddl_reentrant.t2" false - "alter table ddl_reentrant.t2 alter a set default 'hello'" true - "alter table ddl_reentrant.t2 comment='modify comment'" true - "alter table ddl_reentrant.t2 rename index a to idx_a" false - "create table ddl_reentrant.t3 (a int primary key, b int) partition by range(a) (partition p0 values less than (1000), partition p1 values less than (2000))" false - "alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false - "alter table ddl_reentrant.t3 drop partition p2" false - "alter table ddl_reentrant.t3 truncate partition p0" true - "create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false - "drop view ddl_reentrant.t3_view" false - "alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true - "alter schema ddl_reentrant default character set utf8mb4 default collate utf8mb4_unicode_ci" true +# cdc parse and restore ddl with flags format.RestoreStringSingleQuotes|format.RestoreNameBackQuotes|format.RestoreKeyWordUppercase|format.RestoreTiDBSpecialComment +ddls=("create database ddl_reentrant" false 'CREATE DATABASE `ddl_reentrant`' + "create table ddl_reentrant.t1 (id int primary key, id2 int not null, a varchar(10) not null, unique a(a), unique id2(id2))" false 'CREATE TABLE `ddl_reentrant`.`t1` (`id` INT PRIMARY KEY,`id2` INT NOT NULL,`a` VARCHAR(10) NOT NULL,UNIQUE `a`(`a`),UNIQUE `id2`(`id2`))' + "alter table ddl_reentrant.t1 add column b int" false 'ALTER TABLE `ddl_reentrant`.`t1` ADD COLUMN `b` INT' + "alter table ddl_reentrant.t1 drop column b" false 'ALTER TABLE `ddl_reentrant`.`t1` DROP COLUMN `b`' + "alter table ddl_reentrant.t1 add key index_a(a)" false 'ALTER TABLE `ddl_reentrant`.`t1` ADD INDEX `index_a`(`a`)' + "alter table ddl_reentrant.t1 drop index index_a" false 'ALTER TABLE `ddl_reentrant`.`t1` DROP INDEX `index_a`' + "truncate table ddl_reentrant.t1" true 'TRUNCATE TABLE `ddl_reentrant`.`t1`' + "alter table ddl_reentrant.t1 modify a varchar(20)" true 'ALTER TABLE `ddl_reentrant`.`t1` MODIFY COLUMN `a` VARCHAR(20)' + "rename table ddl_reentrant.t1 to ddl_reentrant.t2" false 'RENAME TABLE `ddl_reentrant`.`t1` TO `ddl_reentrant`.`t2`' + "alter table ddl_reentrant.t2 alter a set default 'hello'" true 'ALTER TABLE `ddl_reentrant`.`t2` ALTER COLUMN `a` SET DEFAULT _UTF8MB4'"'hello'" + "alter table ddl_reentrant.t2 comment='modify comment'" true 'ALTER TABLE `ddl_reentrant`.`t2` COMMENT = '"'modify comment'" + "alter table ddl_reentrant.t2 rename index a to idx_a" false 'ALTER TABLE `ddl_reentrant`.`t2` RENAME INDEX `a` TO `idx_a`' + "create table ddl_reentrant.t3 (a int primary key, b int) partition by range(a) (partition p0 values less than (1000), partition p1 values less than (2000))" false 'CREATE TABLE `ddl_reentrant`.`t3` (`a` INT PRIMARY KEY,`b` INT) PARTITION BY RANGE (`a`) (PARTITION `p0` VALUES LESS THAN (1000),PARTITION `p1` VALUES LESS THAN (2000))' + "alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false 'ALTER TABLE `ddl_reentrant`.`t3` ADD PARTITION (PARTITION `p2` VALUES LESS THAN (3000))' + "alter table ddl_reentrant.t3 drop partition p2" false 'ALTER TABLE `ddl_reentrant`.`t3` DROP PARTITION `p2`' + "alter table ddl_reentrant.t3 truncate partition p0" true 'ALTER TABLE `ddl_reentrant`.`t3` TRUNCATE PARTITION `p0`' + "create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false 'CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `ddl_reentrant`.`t3_view` AS SELECT `a`,`b` FROM `ddl_reentrant`.`t3`' + "drop view ddl_reentrant.t3_view" false 'DROP VIEW `ddl_reentrant`.`t3_view`' + "alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER TABLE `ddl_reentrant`.`t3` CHARACTER SET UTF8MB4 COLLATE UTF8MB4_UNICODE_CI' + "alter schema ddl_reentrant default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER DATABASE `ddl_reentrant` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci' ) function complete_ddls() { @@ -36,14 +37,14 @@ function complete_ddls() { echo "skip some DDLs in tidb v4.0.x" else # DDLs that are supportted since 5.0 - ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false) - ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false) + ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD COLUMN `c1` INT, ADD COLUMN `c2` INT, ADD COLUMN `c3` INT') + ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP COLUMN `c1`, DROP COLUMN `c2`, DROP COLUMN `c3`') fi - ddls+=("alter table ddl_reentrant.t2 drop primary key" false) - ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false) - ddls+=("drop table ddl_reentrant.t2" false) - ddls+=("recover table ddl_reentrant.t2" false) - ddls+=("drop database ddl_reentrant" false) + ddls+=("alter table ddl_reentrant.t2 drop primary key" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP PRIMARY KEY') + ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD PRIMARY KEY `pk`(`id`)') + ddls+=("drop table ddl_reentrant.t2" false 'DROP TABLE `ddl_reentrant`.`t2`') + ddls+=("recover table ddl_reentrant.t2" false 'RECOVER TABLE `ddl_reentrant`.`t2`') + ddls+=("drop database ddl_reentrant" false 'DROP DATABASE `ddl_reentrant`') } changefeedid="" @@ -94,14 +95,15 @@ tidb_build_branch=$(mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} -e \ function ddl_test() { ddl=$1 is_reentrant=$2 + restored_sql=$3 echo "------------------------------------------" - echo "test ddl $ddl, is_reentrant: $is_reentrant" + echo "test ddl $ddl, is_reentrant: $is_reentrant restored_sql: $restored_sql" run_sql $ddl ${UP_TIDB_HOST} ${UP_TIDB_PORT} ensure 10 check_ts_forward $changefeedid - echo $ddl >${WORK_DIR}/ddl_temp.sql + echo $restored_sql >${WORK_DIR}/ddl_temp.sql ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true ddl_finished_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log | tail -n 1 | grep -oE '"CommitTs\\":[0-9]{18}' | awk -F: '{print $(NF)}') cdc cli changefeed remove --changefeed-id=${changefeedid} @@ -146,7 +148,9 @@ function run() { idx=$((idx + 1)) idxs_reentrant=${ddls[$idx]} idx=$((idx + 1)) - ddl_test $ddl $idxs_reentrant + restored_sql=${ddls[$idx]} + idx=$((idx + 1)) + ddl_test $ddl $idxs_reentrant $restored_sql done IFS=$OLDIFS