Skip to content

Commit dfbaf61

Browse files
authored
[FLINK-36558][source-connector/mysql] Fix column metadata parsing compatibility with MySQL 8.0.17&8.0.18
This closes #3647.
1 parent 0b0bb02 commit dfbaf61

File tree

5 files changed

+259
-3
lines changed

5 files changed

+259
-3
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/ColumnType.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ public enum ColumnType {
5656
DATETIME_V2(18),
5757
TIME_V2(19),
5858
TYPED_ARRAY(20),
59+
// TYPED_ARRAY enum value has been changed from 244 to 20 in MySQL 8.0.18. Since the JSON_ARRAY
60+
// cast syntax was not added before MySQL 8.0.16, and the TYPED_ARRAY enum has been fixed in
61+
// MySQL 8.0.18, so the only affected version is 8.0.17.
62+
// https://github.com/mysql/mysql-server/commit/9082b6a820f3948fd563cc32a050f5e8775f2855#diff-b9bac49e04a17ad0503e56a4c53d979c90eb64618387d20b9ea2cf1dbf47e5e7L25
63+
TYPED_ARRAY_OLD(244),
5964
JSON(245),
6065
NEWDECIMAL(246),
6166
ENUM(247),

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TableMapEventDataDeserializer.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424
import java.io.IOException;
2525

2626
import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY;
27+
import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY_OLD;
2728

2829
/**
2930
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY.
3031
*
32+
* <p>Line 51 ~ 53: load column metadata bytes based on the length encoded before, instead of
33+
* relying on readMetadata to parse it.
34+
*
3135
* <p>Line 93 ~ 98: process MYSQL_TYPE_TYPED_ARRAY metadata, imitated the code in canal <a
3236
* href="https://github.com/alibaba/canal/blob/master/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java#L546">TableMapLogEvent#decodeFields</a>.
3337
*
@@ -50,8 +54,11 @@ public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IO
5054
eventData.setTable(inputStream.readZeroTerminatedString());
5155
int numberOfColumns = inputStream.readPackedInteger();
5256
eventData.setColumnTypes(inputStream.read(numberOfColumns));
53-
inputStream.readPackedInteger(); // metadata length
54-
eventData.setColumnMetadata(readMetadata(inputStream, eventData.getColumnTypes()));
57+
int columnMetadataLength = inputStream.readPackedInteger(); // column metadata length
58+
eventData.setColumnMetadata(
59+
readMetadata(
60+
new ByteArrayInputStream(inputStream.read(columnMetadataLength)),
61+
eventData.getColumnTypes()));
5562
eventData.setColumnNullability(inputStream.readBitSet(numberOfColumns, true));
5663
int metadataLength = inputStream.available();
5764
TableMapEventMetadata metadata = null;
@@ -93,7 +100,7 @@ private int[] readMetadata(ByteArrayInputStream inputStream, byte[] columnTypes)
93100
int[] metadata = new int[columnTypes.length];
94101
for (int i = 0; i < columnTypes.length; i++) {
95102
ColumnType columnType = ColumnType.byCode(columnTypes[i] & 0xFF);
96-
if (columnType == TYPED_ARRAY) {
103+
if (columnType == TYPED_ARRAY || columnType == TYPED_ARRAY_OLD) {
97104
byte[] arrayType = inputStream.read(1);
98105
columnType = ColumnType.byCode(arrayType[0] & 0xFF);
99106
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.table;
19+
20+
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
21+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
22+
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
23+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
24+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
25+
import org.apache.flink.table.api.EnvironmentSettings;
26+
import org.apache.flink.table.api.TableResult;
27+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
28+
import org.apache.flink.types.Row;
29+
import org.apache.flink.util.CloseableIterator;
30+
31+
import org.junit.After;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
import org.junit.runners.Parameterized;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
import org.testcontainers.lifecycle.Startables;
39+
40+
import java.sql.Connection;
41+
import java.sql.Statement;
42+
import java.util.ArrayList;
43+
import java.util.Arrays;
44+
import java.util.Iterator;
45+
import java.util.List;
46+
import java.util.Random;
47+
import java.util.stream.Stream;
48+
49+
import static org.apache.flink.api.common.JobStatus.RUNNING;
50+
51+
/** Integration tests for MySQL Table source. */
52+
@RunWith(Parameterized.class)
53+
public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase {
54+
55+
private static final Logger LOG = LoggerFactory.getLogger(MySqlJsonArrayAsKeyIndexITCase.class);
56+
57+
private static final String TEST_USER = "mysqluser";
58+
private static final String TEST_PASSWORD = "mysqlpw";
59+
60+
private final StreamExecutionEnvironment env =
61+
StreamExecutionEnvironment.getExecutionEnvironment();
62+
private final StreamTableEnvironment tEnv =
63+
StreamTableEnvironment.create(
64+
env, EnvironmentSettings.newInstance().inStreamingMode().build());
65+
66+
@Parameterized.Parameters(name = "incrementalSnapshot: {0}")
67+
public static Object[] parameters() {
68+
// MySQL 8.0.17 brought the `CAST(JSON_EXTRACT AS ARRAY)` syntax firstly, and originates the
69+
// "extra 0 byte" bug.
70+
// MySQL 8.0.18 changed the TYPED_ARRAY internal enum value from 244 to 20, but didn't fix
71+
// the bug.
72+
// MySQL 8.0.19 fixed this issue (eventually).
73+
return new Object[][] {
74+
new Object[] {MySqlVersion.V8_0_17},
75+
new Object[] {MySqlVersion.V8_0_18},
76+
new Object[] {MySqlVersion.V8_0_19}
77+
};
78+
}
79+
80+
private final MySqlVersion version;
81+
private final MySqlContainer container;
82+
83+
public MySqlJsonArrayAsKeyIndexITCase(MySqlVersion version) {
84+
this.version = version;
85+
this.container = createMySqlContainer(version, "docker/server-gtids/expire-seconds/my.cnf");
86+
}
87+
88+
@Before
89+
public void before() {
90+
LOG.info("Starting MySQL {} containers...", version);
91+
Startables.deepStart(Stream.of(container)).join();
92+
LOG.info("Container MySQL {} is started.", version);
93+
}
94+
95+
@After
96+
public void after() {
97+
LOG.info("Stopping MySQL {} containers...", version);
98+
container.stop();
99+
LOG.info("Container MySQL {} is stopped.", version);
100+
}
101+
102+
@Test
103+
public void testJsonArrayAsKeyIndex() {
104+
UniqueDatabase jaakiDatabase =
105+
new UniqueDatabase(container, "json_array_as_key", TEST_USER, TEST_PASSWORD);
106+
jaakiDatabase.createAndInitialize();
107+
108+
String sourceDDL =
109+
String.format(
110+
"CREATE TABLE json_array_as_key (\n"
111+
+ " id BIGINT NOT NULL,\n"
112+
+ " PRIMARY KEY(id) NOT ENFORCED"
113+
+ ") WITH ("
114+
+ " 'connector' = 'mysql-cdc',"
115+
+ " 'hostname' = '%s',"
116+
+ " 'port' = '%s',"
117+
+ " 'username' = '%s',"
118+
+ " 'password' = '%s',"
119+
+ " 'database-name' = '%s',"
120+
+ " 'table-name' = '%s',"
121+
+ " 'scan.startup.mode' = 'earliest-offset',"
122+
+ " 'server-time-zone' = 'UTC',"
123+
+ " 'server-id' = '%s',"
124+
+ " 'scan.incremental.snapshot.enabled' = 'true'"
125+
+ ")",
126+
container.getHost(),
127+
container.getDatabasePort(),
128+
TEST_USER,
129+
TEST_PASSWORD,
130+
jaakiDatabase.getDatabaseName(),
131+
"json_array_as_key",
132+
getServerId());
133+
tEnv.executeSql(sourceDDL);
134+
135+
try (Connection connection = jaakiDatabase.getJdbcConnection();
136+
Statement statement = connection.createStatement()) {
137+
statement.execute("INSERT INTO json_array_as_key(id) VALUES (18),(19);");
138+
statement.execute("DELETE FROM json_array_as_key WHERE id=19;");
139+
} catch (Exception e) {
140+
throw new RuntimeException(e);
141+
}
142+
143+
// async submit job
144+
TableResult result = tEnv.executeSql("SELECT * FROM json_array_as_key");
145+
146+
try {
147+
// wait for the source startup, we don't have a better way to wait it, use sleep for
148+
// now
149+
do {
150+
Thread.sleep(5000L);
151+
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
152+
} catch (Exception e) {
153+
throw new RuntimeException(e);
154+
}
155+
156+
CloseableIterator<Row> iterator = result.collect();
157+
158+
String[] expected =
159+
new String[] {
160+
// snapshot records
161+
"+I[17]", "+I[18]", "+I[19]", "-D[19]",
162+
};
163+
164+
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
165+
166+
try {
167+
result.getJobClient().get().cancel().get();
168+
} catch (Exception e) {
169+
throw new RuntimeException(e);
170+
}
171+
}
172+
173+
private static List<String> fetchRows(Iterator<Row> iter, int size) {
174+
List<String> rows = new ArrayList<>(size);
175+
while (size > 0 && iter.hasNext()) {
176+
Row row = iter.next();
177+
rows.add(row.toString());
178+
size--;
179+
}
180+
return rows;
181+
}
182+
183+
private String getServerId() {
184+
final Random random = new Random();
185+
int serverId = random.nextInt(100) + 5400;
186+
return serverId + "-" + (serverId + env.getParallelism());
187+
}
188+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ public enum MySqlVersion {
2222
V5_5("5.5"),
2323
V5_6("5.6"),
2424
V5_7("5.7"),
25+
V8_0_17("8.0.17"),
26+
V8_0_18("8.0.18"),
27+
V8_0_19("8.0.19"),
2528
V8_0("8.0");
2629

2730
private String version;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one or more
2+
-- contributor license agreements. See the NOTICE file distributed with
3+
-- this work for additional information regarding copyright ownership.
4+
-- The ASF licenses this file to You under the Apache License, Version 2.0
5+
-- (the `License`); you may not use this file except in compliance with
6+
-- the License. You may obtain a copy of the License at
7+
--
8+
-- http://www.apache.org/licenses/LICENSE-2.0
9+
--
10+
-- Unless required by applicable law or agreed to in writing, software
11+
-- distributed under the License is distributed on an `AS IS` BASIS,
12+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
-- See the License for the specific language governing permissions and
14+
-- limitations under the License.
15+
16+
-- Create a table with complicated metadata to make sure the following
17+
-- optional metadata parsing will not succeed unless the previous column
18+
-- metadata has been correctly parsed.
19+
20+
CREATE TABLE `json_array_as_key` (
21+
`id` bigint(20) unsigned NOT NULL,
22+
`c2` bigint(20) NOT NULL DEFAULT '0',
23+
`c3` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
24+
`c4` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
25+
`c5` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
26+
`c6` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
27+
`c7` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
28+
`c8` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
29+
`c9` int(11) DEFAULT '0',
30+
`c10` int(11) DEFAULT '0',
31+
`c11` int(11) NOT NULL DEFAULT '0',
32+
`c12` int(11) DEFAULT '0',
33+
`c13` json DEFAULT NULL,
34+
`c14` json DEFAULT NULL,
35+
`c15` json DEFAULT NULL,
36+
`c16` json DEFAULT NULL,
37+
`c17` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
38+
`c18` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
39+
`c19` tinyint(4) DEFAULT '0',
40+
`c20` tinyint(4) NOT NULL DEFAULT '0',
41+
`c21` tinyint(4) NOT NULL DEFAULT '0',
42+
`c22` tinyint(4) NOT NULL DEFAULT '0',
43+
`c23` tinyint(4) DEFAULT '0',
44+
`c24` tinyint(3) unsigned NOT NULL DEFAULT '1',
45+
`c25` int(10) unsigned NOT NULL DEFAULT '0',
46+
`c26` int(10) unsigned NOT NULL DEFAULT '0',
47+
`c27` int(10) unsigned NOT NULL DEFAULT '0',
48+
`c28` int(10) unsigned NOT NULL DEFAULT '0',
49+
PRIMARY KEY (`id`),
50+
KEY `k6` ((cast(json_extract(`c13`,_utf8mb4'$[*]') as CHAR(32) ARRAY)))
51+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
52+
53+
INSERT INTO json_array_as_key(ID) VALUES (17);

0 commit comments

Comments
 (0)