Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc/owner: Fix ddl special comment syntax error #3845

Merged
merged 14 commits into from
Dec 20, 2021
Merged
26 changes: 24 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package owner

import (
"context"
"strings"
"sync"

"github.com/pingcap/errors"
Expand All @@ -28,8 +29,9 @@ import (
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -449,7 +451,11 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
if err != nil {
return false, errors.Trace(err)
}
ddlEvent.Query = binloginfo.AddSpecialComment(ddlEvent.Query)
ddlEvent.Query, err = addSpecialComment(ddlEvent.Query)
if err != nil {
return false, errors.Trace(err)
}

c.ddlEventCache = ddlEvent
if c.redoManager.Enabled() {
err = c.redoManager.EmitDDLEvent(ctx, ddlEvent)
Expand Down Expand Up @@ -494,3 +500,19 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode
func (c *changefeed) Close(ctx context.Context) {
c.releaseResources(ctx)
}

// addSpecialComment translate tidb feature to comment
func addSpecialComment(ddlQuery string) (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are special comments compatible with older version of TiDB (v4.0.0)?

stms, _, err := parser.New().ParseSQL(ddlQuery)
if err != nil {
return "", errors.Trace(err)
}
if len(stms) != 1 {
log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery))
}
var sb strings.Builder
if err = stms[0].Restore(format.NewRestoreCtx(format.RestoreTiDBSpecialComment, &sb)); err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil
}
143 changes: 143 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package owner
import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/check"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

Expand Down Expand Up @@ -352,3 +354,144 @@ func (s *changefeedSuite) TestFinished(c *check.C) {
c.Assert(state.Status.CheckpointTs, check.Equals, state.Info.TargetTs)
c.Assert(state.Info.State, check.Equals, model.StateFinished)
}

func TestAddSpecialComment(t *testing.T) {
testCase := []struct {
input string
result string
}{
{
"create table t1 (id int ) shard_row_id_bits=2;",
"CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},

{
"create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;",
"CREATE TABLE t1 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;",
"CREATE TABLE t1 (id int) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;",
"CREATE TABLE t6 (id int) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int primary key auto_random(2));",
"CREATE TABLE t1 (id int PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)",
},
{
"create table t1 (id int primary key auto_random);",
"CREATE TABLE t1 (id int PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;",
"CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */",
},
{
"create table t1 (id int auto_random primary key) auto_random_base = 50;",
"CREATE TABLE t1 (id int /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */",
},
{
"create table t1 (id int auto_increment key) auto_id_cache 100;",
"CREATE TABLE t1 (id int AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */",
},
{
"create table t1 (id int auto_increment unique) auto_id_cache 10;",
"CREATE TABLE t1 (id int AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */",
},
{
"create table t1 (id int) auto_id_cache = 5;",
"CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) auto_id_cache=5;",
"CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;",
"CREATE TABLE t1 (id int) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) clustered);",
"CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int, v int, primary key(a) clustered);",
"CREATE TABLE t1 (id int,v int,PRIMARY KEY(a) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int primary key clustered, v int);",
"CREATE TABLE t1 (id int PRIMARY KEY /*T![clustered_index] CLUSTERED */,v int)",
},
{
"alter table t add primary key(a) clustered;",
"ALTER TABLE t ADD PRIMARY KEY(a) /*T![clustered_index] CLUSTERED */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);",
"CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);",
"CREATE TABLE t1 (id int,a varchar(255),PRIMARY KEY(a, b) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table clustered_test(id int)",
"CREATE TABLE clustered_test (id int)",
},
{
"create database clustered_test",
"CREATE DATABASE clustered_test",
},
{
"create database clustered",
"CREATE DATABASE clustered",
},
{
"create table clustered (id int)",
"CREATE TABLE clustered (id int)",
},
{
"create table t1 (id int, a varchar(255) key clustered);",
"CREATE TABLE t1 (id int,a varchar(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)",
},
{
"alter table t force auto_increment = 12;",
"ALTER TABLE t /*T![force_inc] FORCE */ AUTO_INCREMENT = 12",
},
{
"alter table t force, auto_increment = 12;",
"ALTER TABLE t FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12",
},
{
"create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */",
"CREATE TABLE cdc_test (id varchar(10) PRIMARY KEY,c1 varchar(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 DEFAULT COLLATE = utf8mb4_bin /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */",
},
}
for _, ca := range testCase {
re, err := addSpecialComment(ca.input)
require.Nil(t, err)
require.Equal(t, ca.result, re)
}
require.Panics(t, func() {
_, _ = addSpecialComment("alter table t force, auto_increment = 12;alter table t force, auto_increment = 12;")
})
}