From 782c9077465148346883d6f72b4f155cd6e1c296 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Tue, 17 May 2022 15:12:59 +0800 Subject: [PATCH] [mysql] Use the right column charset in the snapshot phase (#1166) --- .../mysql/debezium/DebeziumUtils.java | 53 ++++++ .../task/MySqlSnapshotSplitReadTask.java | 9 + .../mysql/table/MySqlConnectorITCase.java | 73 ++++++++ .../src/test/resources/ddl/charset_test.sql | 172 ++++++++++++++++++ 4 files changed, 307 insertions(+) create mode 100644 flink-connector-mysql-cdc/src/test/resources/ddl/charset_test.sql diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java index 143fca2cd12..5b3c487f388 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.util.FlinkRuntimeException; import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.mysql.cj.CharsetMapping; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -34,12 +35,15 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.Column; import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; import java.sql.SQLException; import java.util.HashMap; import java.util.List; @@ -52,6 +56,9 @@ public class DebeziumUtils { private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class); + private static final CharsetMappingWrapper CHARSET_MAPPING_WRAPPER = + new CharsetMappingWrapper(); + /** Creates and opens a new {@link JdbcConnection} backing connection pool. */ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) { JdbcConnection jdbc = @@ -204,4 +211,50 @@ private static Map querySystemVariables( return variables; } + + /** + * Return the {@link Charset} instance with the MySQL-specific character set name used by the + * given column. Copy of Debezium {@link MySqlValueConverters#charsetFor(Column)}. + * + * @param column the column in which the character set is used; never null + * @return the Java {@link Charset}, or null if there is no mapping + */ + public static Charset charsetFor(Column column) { + String mySqlCharsetName = column.charsetName(); + if (mySqlCharsetName == null) { + LOG.warn("Column is missing a character set: {}", column); + return null; + } + String encoding = CHARSET_MAPPING_WRAPPER.getJavaEncodingForMysqlCharSet(mySqlCharsetName); + if (encoding == null) { + LOG.debug( + "Column uses MySQL character set '{}', which has no mapping to a Java character set, will try it in lowercase", + mySqlCharsetName); + encoding = + CHARSET_MAPPING_WRAPPER.getJavaEncodingForMysqlCharSet( + mySqlCharsetName.toLowerCase()); + } + if (encoding == null) { + LOG.warn( + "Column uses MySQL character set '{}', which has no mapping to a Java character set", + mySqlCharsetName); + } else { + try { + return Charset.forName(encoding); + } catch (IllegalCharsetNameException e) { + LOG.error( + "Unable to load Java charset '{}' for column with MySQL character set '{}'", + encoding, + mySqlCharsetName); + } + } + return null; + } + + /** Helper to gain access to protected method. */ + private static final class CharsetMappingWrapper extends CharsetMapping { + String getJavaEncodingForMysqlCharSet(String mySqlCharsetName) { + return CharsetMapping.getStaticJavaEncodingForMysqlCharset(mySqlCharsetName); + } + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index be6ba32d26d..7923bad6a7c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.debezium.task; +import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher; import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader; @@ -50,6 +51,8 @@ import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.sql.Blob; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -313,6 +316,12 @@ else if (actualColumn.jdbcType() == Types.TINYINT else if ("CHAR".equals(actualColumn.typeName()) || "VARCHAR".equals(actualColumn.typeName()) || "TEXT".equals(actualColumn.typeName())) { + Charset columnCharset = DebeziumUtils.charsetFor(actualColumn); + if (columnCharset != null) { + // UTF_8 for the jdbc connection charset encoding + return new String(rs.getBytes(fieldNo), StandardCharsets.UTF_8) + .getBytes(columnCharset); + } return rs.getBytes(fieldNo); } else { return rs.getObject(fieldNo); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 5ec59108d91..23b30fa665f 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -87,6 +87,9 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { private final UniqueDatabase userDatabase2 = new UniqueDatabase(MYSQL_CONTAINER, "user_2", TEST_USER, TEST_PASSWORD); + private final UniqueDatabase charsetTestDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "charset_test", TEST_USER, TEST_PASSWORD); + private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private final StreamTableEnvironment tEnv = @@ -1423,6 +1426,76 @@ public void testColumnOptionalWithDefaultValue() throws Exception { result.getJobClient().get().cancel().get(); } + @Test + public void testUcs2Charset() throws Exception { + testCharset( + "ucs2_test", + new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}); + } + + @Test + public void testUtf8Charset() throws Exception { + testCharset( + "utf8_test", + new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}); + } + + @Test + public void testAsciiCharset() throws Exception { + testCharset( + "ascii_test", + new String[] {"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"}); + } + + @Test + public void testBig5Charset() throws Exception { + testCharset("big5_test", new String[] {"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"}); + } + + private void testCharset(String tableName, String[] expected) throws Exception { + if (useLegacyDezMySql || !incrementalSnapshot) { + return; + } + charsetTestDatabase.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE %s (\n" + + " table_id BIGINT,\n" + + " table_name STRING,\n" + + " primary key(table_id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'\n" + + ")", + tableName, + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + charsetTestDatabase.getUsername(), + charsetTestDatabase.getPassword(), + charsetTestDatabase.getDatabaseName(), + tableName, + incrementalSnapshot, + getServerId(), + getSplitSize()); + tEnv.executeSql(sourceDDL); + // async submit job + TableResult result = + tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", tableName)); + + CloseableIterator iterator = result.collect(); + waitForSnapshotStarted(iterator); + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + result.getJobClient().get().cancel().get(); + } + // ------------------------------------------------------------------------------------ private String getDezImplementation() { diff --git a/flink-connector-mysql-cdc/src/test/resources/ddl/charset_test.sql b/flink-connector-mysql-cdc/src/test/resources/ddl/charset_test.sql new file mode 100644 index 00000000000..8c5cc7a9ced --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/resources/ddl/charset_test.sql @@ -0,0 +1,172 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: charset_test +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE TABLE `ascii_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET ascii DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ascii; + +CREATE TABLE `big5_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET big5 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=big5; + +CREATE TABLE `gbk_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET gbk DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=gbk; + +CREATE TABLE `sjis_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET sjis DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=sjis; + +CREATE TABLE `cp932_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET cp932 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp932; + +CREATE TABLE `gb2312_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET gb2312 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=gb2312; + +CREATE TABLE `ujis_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET ujis DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ujis; + +CREATE TABLE `euckr_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET euckr DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=euckr; + +CREATE TABLE `latin1_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET latin1 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=latin1; + + +CREATE TABLE `latin2_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET latin2 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=latin2; + + +CREATE TABLE `greek_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET greek DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=greek; + + +CREATE TABLE `hebrew_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET hebrew DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=hebrew; + + +CREATE TABLE `cp866_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET cp866 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp866; + + +CREATE TABLE `tis620_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET tis620 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=tis620; + + +CREATE TABLE `cp1250_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET cp1250 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1250; + +CREATE TABLE `cp1251_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET cp1251 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1251; + +CREATE TABLE `cp1257_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET cp1257 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1257; + +CREATE TABLE `macroman_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET macroman DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=macroman; + +CREATE TABLE `macce_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET macce DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=macce; + +CREATE TABLE `utf8_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET utf8 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=utf8; + +CREATE TABLE `ucs2_test` ( + `table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号', + `table_name` varchar(200) CHARACTER SET ucs2 DEFAULT '' COMMENT '表名称', + PRIMARY KEY (`table_id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ucs2; + +INSERT into `ascii_test` values (1, 'ascii test!?'), (2, 'Craig Marshall'), (3, '{test}'); +INSERT into `big5_test` values (1, '大五'), (2, 'Craig Marshall'), (3, '丹店'); +--INSERT into `gbk_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +---- INSERT into `sjis_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +---- INSERT into `cp932_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +----INSERT into `gb2312_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `ujis_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `euckr_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `latin1_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `latin2_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `greek_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `hebrew_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `cp866_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `tis620_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `cp1250_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `cp1251_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `cp1257_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `macroman_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +--INSERT into `macce_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +INSERT into `utf8_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); +INSERT into `ucs2_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据'); \ No newline at end of file