Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public enum ColumnType {
DATETIME_V2(18),
TIME_V2(19),
TYPED_ARRAY(20),
// TYPED_ARRAY enum value has been changed from 244 to 20 in MySQL 8.0.18. Since the JSON_ARRAY
// cast syntax was not added before MySQL 8.0.16, and the TYPED_ARRAY enum has been fixed in
// MySQL 8.0.18, so the only affected version is 8.0.17.
// https://github.com/mysql/mysql-server/commit/9082b6a820f3948fd563cc32a050f5e8775f2855#diff-b9bac49e04a17ad0503e56a4c53d979c90eb64618387d20b9ea2cf1dbf47e5e7L25
TYPED_ARRAY_OLD(244),
JSON(245),
NEWDECIMAL(246),
ENUM(247),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import java.io.IOException;

import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY;
import static com.github.shyiko.mysql.binlog.event.deserialization.ColumnType.TYPED_ARRAY_OLD;

/**
* Copied from mysql-binlog-connector 0.25.3 to support MYSQL_TYPE_TYPED_ARRAY.
*
* <p>Line 51 ~ 53: load column metadata bytes based on the length encoded before, instead of
* relying on readMetadata to parse it.
*
* <p>Line 93 ~ 98: process MYSQL_TYPE_TYPED_ARRAY metadata, imitated the code in canal <a
* href="https://github.com/alibaba/canal/blob/master/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/TableMapLogEvent.java#L546">TableMapLogEvent#decodeFields</a>.
*
Expand All @@ -50,8 +54,11 @@ public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IO
eventData.setTable(inputStream.readZeroTerminatedString());
int numberOfColumns = inputStream.readPackedInteger();
eventData.setColumnTypes(inputStream.read(numberOfColumns));
inputStream.readPackedInteger(); // metadata length
eventData.setColumnMetadata(readMetadata(inputStream, eventData.getColumnTypes()));
int columnMetadataLength = inputStream.readPackedInteger(); // column metadata length
eventData.setColumnMetadata(
readMetadata(
new ByteArrayInputStream(inputStream.read(columnMetadataLength)),
eventData.getColumnTypes()));
eventData.setColumnNullability(inputStream.readBitSet(numberOfColumns, true));
int metadataLength = inputStream.available();
TableMapEventMetadata metadata = null;
Expand Down Expand Up @@ -93,7 +100,7 @@ private int[] readMetadata(ByteArrayInputStream inputStream, byte[] columnTypes)
int[] metadata = new int[columnTypes.length];
for (int i = 0; i < columnTypes.length; i++) {
ColumnType columnType = ColumnType.byCode(columnTypes[i] & 0xFF);
if (columnType == TYPED_ARRAY) {
if (columnType == TYPED_ARRAY || columnType == TYPED_ARRAY_OLD) {
byte[] arrayType = inputStream.read(1);
columnType = ColumnType.byCode(arrayType[0] & 0xFF);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.mysql.table;

import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;

import static org.apache.flink.api.common.JobStatus.RUNNING;

/** Integration tests for MySQL Table source. */
@RunWith(Parameterized.class)
public class MySqlJsonArrayAsKeyIndexITCase extends MySqlSourceTestBase {

private static final Logger LOG = LoggerFactory.getLogger(MySqlJsonArrayAsKeyIndexITCase.class);

private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";

private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

@Parameterized.Parameters(name = "incrementalSnapshot: {0}")
public static Object[] parameters() {
// MySQL 8.0.17 brought the `CAST(JSON_EXTRACT AS ARRAY)` syntax firstly, and originates the
// "extra 0 byte" bug.
// MySQL 8.0.18 changed the TYPED_ARRAY internal enum value from 244 to 20, but didn't fix
// the bug.
// MySQL 8.0.19 fixed this issue (eventually).
return new Object[][] {
new Object[] {MySqlVersion.V8_0_17},
new Object[] {MySqlVersion.V8_0_18},
new Object[] {MySqlVersion.V8_0_19}
};
}

private final MySqlVersion version;
private final MySqlContainer container;

public MySqlJsonArrayAsKeyIndexITCase(MySqlVersion version) {
this.version = version;
this.container = createMySqlContainer(version, "docker/server-gtids/expire-seconds/my.cnf");
}

@Before
public void before() {
LOG.info("Starting MySQL {} containers...", version);
Startables.deepStart(Stream.of(container)).join();
LOG.info("Container MySQL {} is started.", version);
}

@After
public void after() {
LOG.info("Stopping MySQL {} containers...", version);
container.stop();
LOG.info("Container MySQL {} is stopped.", version);
}

@Test
public void testJsonArrayAsKeyIndex() {
UniqueDatabase jaakiDatabase =
new UniqueDatabase(container, "json_array_as_key", TEST_USER, TEST_PASSWORD);
jaakiDatabase.createAndInitialize();

String sourceDDL =
String.format(
"CREATE TABLE json_array_as_key (\n"
+ " id BIGINT NOT NULL,\n"
+ " PRIMARY KEY(id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = 'earliest-offset',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = 'true'"
+ ")",
container.getHost(),
container.getDatabasePort(),
TEST_USER,
TEST_PASSWORD,
jaakiDatabase.getDatabaseName(),
"json_array_as_key",
getServerId());
tEnv.executeSql(sourceDDL);

try (Connection connection = jaakiDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("INSERT INTO json_array_as_key(id) VALUES (18),(19);");
statement.execute("DELETE FROM json_array_as_key WHERE id=19;");
} catch (Exception e) {
throw new RuntimeException(e);
}

// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM json_array_as_key");

try {
// wait for the source startup, we don't have a better way to wait it, use sleep for
// now
do {
Thread.sleep(5000L);
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
} catch (Exception e) {
throw new RuntimeException(e);
}

CloseableIterator<Row> iterator = result.collect();

String[] expected =
new String[] {
// snapshot records
"+I[17]", "+I[18]", "+I[19]", "-D[19]",
};

assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));

try {
result.getJobClient().get().cancel().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}

private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + env.getParallelism());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public enum MySqlVersion {
V5_5("5.5"),
V5_6("5.6"),
V5_7("5.7"),
V8_0_17("8.0.17"),
V8_0_18("8.0.18"),
V8_0_19("8.0.19"),
V8_0("8.0");

private String version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the `License`); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an `AS IS` BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- Create a table with complicated metadata to make sure the following
-- optional metadata parsing will not succeed unless the previous column
-- metadata has been correctly parsed.

CREATE TABLE `json_array_as_key` (
`id` bigint(20) unsigned NOT NULL,
`c2` bigint(20) NOT NULL DEFAULT '0',
`c3` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`c4` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`c5` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`c6` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
`c7` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`c8` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
`c9` int(11) DEFAULT '0',
`c10` int(11) DEFAULT '0',
`c11` int(11) NOT NULL DEFAULT '0',
`c12` int(11) DEFAULT '0',
`c13` json DEFAULT NULL,
`c14` json DEFAULT NULL,
`c15` json DEFAULT NULL,
`c16` json DEFAULT NULL,
`c17` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
`c18` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '',
`c19` tinyint(4) DEFAULT '0',
`c20` tinyint(4) NOT NULL DEFAULT '0',
`c21` tinyint(4) NOT NULL DEFAULT '0',
`c22` tinyint(4) NOT NULL DEFAULT '0',
`c23` tinyint(4) DEFAULT '0',
`c24` tinyint(3) unsigned NOT NULL DEFAULT '1',
`c25` int(10) unsigned NOT NULL DEFAULT '0',
`c26` int(10) unsigned NOT NULL DEFAULT '0',
`c27` int(10) unsigned NOT NULL DEFAULT '0',
`c28` int(10) unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (`id`),
KEY `k6` ((cast(json_extract(`c13`,_utf8mb4'$[*]') as CHAR(32) ARRAY)))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

INSERT INTO json_array_as_key(ID) VALUES (17);
Loading