Skip to content

Commit

Permalink
[mysql] Use the right column charset in the snapshot phase (#1166)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanhang1993 committed May 17, 2022
1 parent 9c905f6 commit 782c907
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.util.FlinkRuntimeException;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.mysql.cj.CharsetMapping;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
Expand All @@ -34,12 +35,15 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
Expand All @@ -52,6 +56,9 @@ public class DebeziumUtils {

private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class);

private static final CharsetMappingWrapper CHARSET_MAPPING_WRAPPER =
new CharsetMappingWrapper();

/** Creates and opens a new {@link JdbcConnection} backing connection pool. */
public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig) {
JdbcConnection jdbc =
Expand Down Expand Up @@ -204,4 +211,50 @@ private static Map<String, String> querySystemVariables(

return variables;
}

/**
* Return the {@link Charset} instance with the MySQL-specific character set name used by the
* given column. Copy of Debezium {@link MySqlValueConverters#charsetFor(Column)}.
*
* @param column the column in which the character set is used; never null
* @return the Java {@link Charset}, or null if there is no mapping
*/
public static Charset charsetFor(Column column) {
String mySqlCharsetName = column.charsetName();
if (mySqlCharsetName == null) {
LOG.warn("Column is missing a character set: {}", column);
return null;
}
String encoding = CHARSET_MAPPING_WRAPPER.getJavaEncodingForMysqlCharSet(mySqlCharsetName);
if (encoding == null) {
LOG.debug(
"Column uses MySQL character set '{}', which has no mapping to a Java character set, will try it in lowercase",
mySqlCharsetName);
encoding =
CHARSET_MAPPING_WRAPPER.getJavaEncodingForMysqlCharSet(
mySqlCharsetName.toLowerCase());
}
if (encoding == null) {
LOG.warn(
"Column uses MySQL character set '{}', which has no mapping to a Java character set",
mySqlCharsetName);
} else {
try {
return Charset.forName(encoding);
} catch (IllegalCharsetNameException e) {
LOG.error(
"Unable to load Java charset '{}' for column with MySQL character set '{}'",
encoding,
mySqlCharsetName);
}
}
return null;
}

/** Helper to gain access to protected method. */
private static final class CharsetMappingWrapper extends CharsetMapping {
String getJavaEncodingForMysqlCharSet(String mySqlCharsetName) {
return CharsetMapping.getStaticJavaEncodingForMysqlCharset(mySqlCharsetName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.ververica.cdc.connectors.mysql.debezium.task;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
Expand Down Expand Up @@ -50,6 +51,8 @@
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -313,6 +316,12 @@ else if (actualColumn.jdbcType() == Types.TINYINT
else if ("CHAR".equals(actualColumn.typeName())
|| "VARCHAR".equals(actualColumn.typeName())
|| "TEXT".equals(actualColumn.typeName())) {
Charset columnCharset = DebeziumUtils.charsetFor(actualColumn);
if (columnCharset != null) {
// UTF_8 for the jdbc connection charset encoding
return new String(rs.getBytes(fieldNo), StandardCharsets.UTF_8)
.getBytes(columnCharset);
}
return rs.getBytes(fieldNo);
} else {
return rs.getObject(fieldNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
private final UniqueDatabase userDatabase2 =
new UniqueDatabase(MYSQL_CONTAINER, "user_2", TEST_USER, TEST_PASSWORD);

private final UniqueDatabase charsetTestDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "charset_test", TEST_USER, TEST_PASSWORD);

private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
Expand Down Expand Up @@ -1423,6 +1426,76 @@ public void testColumnOptionalWithDefaultValue() throws Exception {
result.getJobClient().get().cancel().get();
}

@Test
public void testUcs2Charset() throws Exception {
testCharset(
"ucs2_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"});
}

@Test
public void testUtf8Charset() throws Exception {
testCharset(
"utf8_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"});
}

@Test
public void testAsciiCharset() throws Exception {
testCharset(
"ascii_test",
new String[] {"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"});
}

@Test
public void testBig5Charset() throws Exception {
testCharset("big5_test", new String[] {"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"});
}

private void testCharset(String tableName, String[] expected) throws Exception {
if (useLegacyDezMySql || !incrementalSnapshot) {
return;
}
charsetTestDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE %s (\n"
+ " table_id BIGINT,\n"
+ " table_name STRING,\n"
+ " primary key(table_id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'\n"
+ ")",
tableName,
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
charsetTestDatabase.getUsername(),
charsetTestDatabase.getPassword(),
charsetTestDatabase.getDatabaseName(),
tableName,
incrementalSnapshot,
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result =
tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", tableName));

CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}

// ------------------------------------------------------------------------------------

private String getDezImplementation() {
Expand Down
172 changes: 172 additions & 0 deletions flink-connector-mysql-cdc/src/test/resources/ddl/charset_test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: charset_test
-- ----------------------------------------------------------------------------------------------------------------

CREATE TABLE `ascii_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET ascii DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ascii;

CREATE TABLE `big5_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET big5 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=big5;

CREATE TABLE `gbk_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET gbk DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=gbk;

CREATE TABLE `sjis_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET sjis DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=sjis;

CREATE TABLE `cp932_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp932 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp932;

CREATE TABLE `gb2312_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET gb2312 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=gb2312;

CREATE TABLE `ujis_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET ujis DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ujis;

CREATE TABLE `euckr_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET euckr DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=euckr;

CREATE TABLE `latin1_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET latin1 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=latin1;


CREATE TABLE `latin2_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET latin2 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=latin2;


CREATE TABLE `greek_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET greek DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=greek;


CREATE TABLE `hebrew_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET hebrew DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=hebrew;


CREATE TABLE `cp866_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp866 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp866;


CREATE TABLE `tis620_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET tis620 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=tis620;


CREATE TABLE `cp1250_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp1250 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1250;

CREATE TABLE `cp1251_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp1251 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1251;

CREATE TABLE `cp1257_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp1257 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1257;

CREATE TABLE `macroman_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET macroman DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=macroman;

CREATE TABLE `macce_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET macce DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=macce;

CREATE TABLE `utf8_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET utf8 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=utf8;

CREATE TABLE `ucs2_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET ucs2 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ucs2;

INSERT into `ascii_test` values (1, 'ascii test!?'), (2, 'Craig Marshall'), (3, '{test}');
INSERT into `big5_test` values (1, '大五'), (2, 'Craig Marshall'), (3, '丹店');
--INSERT into `gbk_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
---- INSERT into `sjis_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
---- INSERT into `cp932_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
----INSERT into `gb2312_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `ujis_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `euckr_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `latin1_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `latin2_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `greek_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `hebrew_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `cp866_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `tis620_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `cp1250_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `cp1251_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `cp1257_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `macroman_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
--INSERT into `macce_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
INSERT into `utf8_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
INSERT into `ucs2_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');

0 comments on commit 782c907

Please sign in to comment.