Skip to content

Commit

Permalink
sink(ticdc): convert values of pre-columns properly (#8421) (#8437)
Browse files Browse the repository at this point in the history
close #8420
  • Loading branch information
ti-chi-bot authored Mar 7, 2023
1 parent 7831ce2 commit cc2a701
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 59 deletions.
9 changes: 5 additions & 4 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,15 +803,15 @@ func convert2RowChanges(
return res
}

func convertBinaryToString(row *model.RowChangedEvent) {
for i, col := range row.Columns {
func convertBinaryToString(cols []*model.Column) {
for i, col := range cols {
if col == nil {
continue
}
if col.Charset != "" && col.Charset != charset.CharsetBin {
colValBytes, ok := col.Value.([]byte)
if ok {
row.Columns[i].Value = string(colValBytes)
cols[i].Value = string(colValBytes)
}
}
}
Expand All @@ -833,7 +833,8 @@ func (s *mysqlSink) groupRowsByType(
deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize)

for _, row := range singleTxnDMLs {
convertBinaryToString(row)
convertBinaryToString(row.Columns)
convertBinaryToString(row.PreColumns)

if row.IsInsert() {
insertRow = append(
Expand Down
126 changes: 77 additions & 49 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2577,10 +2577,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("你好"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -2594,17 +2596,19 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("世界"),
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"},
values: [][]interface{}{{1, 1, 2, 2}},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
},
},
Expand All @@ -2620,10 +2624,11 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag,
Value: "你好",
}},
IndexColumns: [][]int{{1, 1}},
},
Expand All @@ -2637,17 +2642,19 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.HandleKeyFlag,
Value: "世界",
}},
IndexColumns: [][]int{{2, 2}},
},
},
expected: &preparedDMLs{
sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"},
values: [][]interface{}{{1, 1, 2, 2}},
values: [][]interface{}{{1, "你好", 2, "世界"}},
rowCount: 2,
},
},
Expand All @@ -2664,21 +2671,25 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("开发"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("测试"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -2692,28 +2703,37 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 3,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 3,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("纽约"),
}},
Columns: []*model.Column{nil, {
Name: "a1",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 4,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 4,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("北京"),
}},
IndexColumns: [][]int{{1, 2}},
},
},
expected: &preparedDMLs{
sqls: []string{"UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, `a3` = ? WHERE `a1` = ? AND `a3` = ? LIMIT 1", "UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, `a3` = ? WHERE `a1` = ? AND `a3` = ? LIMIT 1"},
values: [][]interface{}{{2, 2, 1, 1}, {4, 4, 3, 3}},
sqls: []string{
"UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, `a3` = ? " +
"WHERE `a1` = ? AND `a3` = ? LIMIT 1",
"UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, `a3` = ? " +
"WHERE `a1` = ? AND `a3` = ? LIMIT 1",
},
values: [][]interface{}{{2, "测试", 1, "开发"}, {4, "北京", 3, "纽约"}},
rowCount: 2,
},
},
Expand All @@ -2730,10 +2750,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("你好"),
}},

IndexColumns: [][]int{{1, 2}},
Expand All @@ -2748,10 +2770,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 1,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: []byte("世界"),
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -2765,10 +2789,12 @@ func TestPrepareBatchDMLs(t *testing.T) {
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
}, {
Name: "a3",
Type: mysql.TypeLong,
Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag,
Value: 2,
Name: "a3",
Type: mysql.TypeVarchar,
Charset: charset.CharsetGBK,
Flag: model.BinaryFlag | model.MultipleKeyFlag |
model.HandleKeyFlag | model.UniqueKeyFlag,
Value: "你好",
}},
IndexColumns: [][]int{{1, 2}},
},
Expand All @@ -2778,7 +2804,7 @@ func TestPrepareBatchDMLs(t *testing.T) {
"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))",
"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)",
},
values: [][]interface{}{{1, 1, 2, 2}, {2, 2}},
values: [][]interface{}{{1, "世界", 2, "你好"}, {2, "你好"}},
rowCount: 3,
},
},
Expand All @@ -2794,6 +2820,8 @@ func TestPrepareBatchDMLs(t *testing.T) {
txns := []*model.SingleTableTxn{{Rows: tc.input}}
dmls := ms.prepareDMLs(txns, 1, 1)
require.Equal(t, tc.expected, dmls)
fmt.Println(dmls.sqls)
fmt.Println(dmls.values)
}
}

Expand Down
38 changes: 32 additions & 6 deletions tests/integration_tests/charset_gbk/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE DATABASE `charset_gbk_test0` CHARACTER SET utf8mb4;

USE `charset_gbk_test0`;

/* this is a test for columns which charset is gbk*/
/* this is a test for columns which charset is gbk, with pk*/
CREATE TABLE t0 (
id INT,
name varchar(128) CHARACTER SET gbk,
Expand All @@ -30,8 +30,34 @@ WHERE name = '测试';
DELETE FROM t0
WHERE name = '部署';

/* this is a test for table which charset is gbk*/
/* this is a test for table which charset is gbk, without pk but with uk */
CREATE TABLE t1 (
id INT NOT NULL,
name varchar(128) CHARACTER SET gbk NOT NULL,
country char(32) CHARACTER SET gbk,
city varchar(64),
description text CHARACTER SET gbk,
image tinyblob,
UNIQUE KEY (id, name)
) ENGINE = InnoDB CHARSET = utf8mb4;

INSERT INTO t1
VALUES (1, '测试', "中国", "上海", "你好,世界"
, 0xC4E3BAC3CAC0BDE7);

INSERT INTO t1
VALUES (2, '部署', "美国", "纽约", "世界,你好"
, 0xCAC0BDE7C4E3BAC3);

UPDATE t1
SET name = '开发'
WHERE name = '测试';

DELETE FROM t1
WHERE name = '部署';

/* this is a test for table which charset is gbk*/
CREATE TABLE t2 (
id INT,
name varchar(128),
country char(32),
Expand All @@ -41,19 +67,19 @@ CREATE TABLE t1 (
PRIMARY KEY (id)
) ENGINE = InnoDB CHARSET = gbk;

INSERT INTO t1
INSERT INTO t2
VALUES (1, '测试', "中国", "上海", "你好,世界"
, 0xC4E3BAC3CAC0BDE7);

INSERT INTO t1
INSERT INTO t2
VALUES (2, '部署', "美国", "纽约", "世界,你好"
, 0xCAC0BDE7C4E3BAC3);

UPDATE t1
UPDATE t2
SET name = '开发'
WHERE name = '测试';

DELETE FROM t1
DELETE FROM t2
WHERE name = '部署';

/* this is a test for db which charset is gbk*/
Expand Down

0 comments on commit cc2a701

Please sign in to comment.