Skip to content

Commit

Permalink
[mysql] add custom jdbc properties to debezium mysql connection (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanhang1993 committed Jul 29, 2022
1 parent 74f0a65 commit 3640a5b
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public void validate() {
connection = DebeziumUtils.openJdbcConnection(sourceConfig);
} else {
// for the legacy source
connection = DebeziumUtils.createMySqlConnection(from(dbzProperties));
connection =
DebeziumUtils.createMySqlConnection(from(dbzProperties), new Properties());
}
checkVersion(connection);
checkBinlogFormat(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory;
import com.ververica.cdc.connectors.mysql.source.connection.MySqlConnectionWithJdbcProperties;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
Expand All @@ -42,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables;

Expand All @@ -66,9 +68,11 @@ public static JdbcConnection openJdbcConnection(MySqlSourceConfig sourceConfig)
}

/** Creates a new {@link MySqlConnection}, but not open the connection. */
public static MySqlConnection createMySqlConnection(Configuration dbzConfiguration) {
return new MySqlConnection(
new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration));
public static MySqlConnection createMySqlConnection(
Configuration dbzConfiguration, Properties jdbcProperties) {
return new MySqlConnectionWithJdbcProperties(
new MySqlConnectionWithJdbcProperties.MySqlConnectionConfigurationWithCustomUrl(
dbzConfiguration, jdbcProperties));
}

/** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.mysql.source.connection;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.jdbc.JdbcConnection;

import java.util.Properties;

/** {@link MySqlConnection} extension to be used with MySQL Server. */
public class MySqlConnectionWithJdbcProperties extends MySqlConnection {
private final String urlPattern;
/**
* Creates a new connection using the supplied configuration.
*
* @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null.
*/
public MySqlConnectionWithJdbcProperties(
MySqlConnectionConfigurationWithCustomUrl connectionConfig) {
super(connectionConfig);
this.urlPattern = connectionConfig.getUrlPattern();
}

@Override
public String connectionString() {
return connectionString(urlPattern);
}

/**
* {@link MySqlConnectionConfiguration} extension to be used with {@link
* MySqlConnectionWithJdbcProperties}.
*/
public static class MySqlConnectionConfigurationWithCustomUrl
extends io.debezium.connector.mysql.MySqlConnection.MySqlConnectionConfiguration {
private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
private static final String JDBC_URL_PATTERN =
"jdbc:mysql://${hostname}:${port}/?useSSL=${useSSL}&connectTimeout=${connectTimeout}";
private static final String JDBC_URL_PATTERN_WITH_CUSTOM_USE_SSL =
"jdbc:mysql://${hostname}:${port}/?connectTimeout=${connectTimeout}";

private final ConnectionFactory customFactory;
private final String urlPattern;

public MySqlConnectionConfigurationWithCustomUrl(
Configuration config, Properties jdbcProperties) {
// Set up the JDBC connection without actually connecting, with extra MySQL-specific
// properties
// to give us better JDBC database metadata behavior, including using UTF-8 for the
// client-side character encoding
// per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html
super(config);
this.urlPattern = formatJdbcUrl(jdbcProperties);
String driverClassName = config().getString(MySqlConnectorConfig.JDBC_DRIVER);
customFactory =
JdbcConnection.patternBasedFactory(
urlPattern, driverClassName, getClass().getClassLoader());
}

public String getUrlPattern() {
return urlPattern;
}

private String formatJdbcUrl(Properties jdbcProperties) {
Properties combinedProperties = new Properties();
combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES);
combinedProperties.putAll(jdbcProperties);

StringBuilder jdbcUrlStringBuilder =
jdbcProperties.getProperty("useSSL") == null
? new StringBuilder(JDBC_URL_PATTERN)
: new StringBuilder(JDBC_URL_PATTERN_WITH_CUSTOM_USE_SSL);
combinedProperties.forEach(
(key, value) -> {
jdbcUrlStringBuilder.append("&").append(key).append("=").append(value);
});

return jdbcUrlStringBuilder.toString();
}

private static Properties initializeDefaultJdbcProperties() {
Properties defaultJdbcProperties = new Properties();
defaultJdbcProperties.setProperty("useInformationSchema", "true");
defaultJdbcProperties.setProperty("nullCatalogMeansCurrent", "false");
defaultJdbcProperties.setProperty("useUnicode", "true");
defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL");
defaultJdbcProperties.setProperty("characterEncoding", "UTF-8");
defaultJdbcProperties.setProperty("characterSetResults", "UTF-8");
defaultJdbcProperties.setProperty("useSSL", "false");
return defaultJdbcProperties;
}

@Override
public ConnectionFactory factory() {
return customFactory;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit spl
final String splitId = split.splitId();
if (split.getTableSchemas().isEmpty()) {
try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties())) {
Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
LOG.info("The table schema discovery for binlog split {} success", splitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ private void checkSplitOrStartNext() throws IOException {
}
if (currentReader == null) {
final MySqlConnection jdbcConnection =
createMySqlConnection(sourceConfig.getDbzConfiguration());
createMySqlConnection(
sourceConfig.getDbzConfiguration(),
sourceConfig.getJdbcProperties());
final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
Expand All @@ -150,7 +152,9 @@ private void checkSplitOrStartNext() throws IOException {
currentReader.close();
}
final MySqlConnection jdbcConnection =
createMySqlConnection(sourceConfig.getDbzConfiguration());
createMySqlConnection(
sourceConfig.getDbzConfiguration(),
sourceConfig.getJdbcProperties());
final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public void testReadSingleBinlogSplit() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
mySqlConnection =
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
Expand Down Expand Up @@ -121,7 +123,9 @@ public void testReadAllBinlogSplitsForOneTable() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
mySqlConnection =
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
Expand Down Expand Up @@ -169,7 +173,9 @@ public void testReadAllBinlogForTableWithSingleLine() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
mySqlConnection =
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());

final DataType dataType =
DataTypes.ROW(
Expand Down Expand Up @@ -205,7 +211,9 @@ public void testReadAllBinlogSplitsForTables() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(new String[] {"customer_card", "customer_card_single_line"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
mySqlConnection =
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
Expand Down Expand Up @@ -262,7 +270,9 @@ public void testReadBinlogFromLatestOffset() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(StartupOptions.latest(), new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
mySqlConnection =
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());

final DataType dataType =
DataTypes.ROW(
Expand Down Expand Up @@ -314,7 +324,9 @@ public void testHeartbeatEvent() throws Exception {
.debeziumProperties(dbzProps)
.createConfig(0);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
mySqlConnection =
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());

// Create binlog reader and submit split
BinlogSplitReader binlogReader = createBinlogReader(sourceConfig);
Expand Down Expand Up @@ -350,7 +362,8 @@ private MySqlBinlogSplit createBinlogSplitFromLatestOffset(MySqlSourceConfig sou
MySqlBinlogSplitAssigner binlogSplitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
binlogSplitAssigner.open();
try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties())) {
Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
return MySqlBinlogSplit.fillTableSchemas(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public static void init() {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}, 10);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
mySqlConnection =
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -753,7 +754,7 @@ private MySqlConnection getConnection() {
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties);
return DebeziumUtils.createMySqlConnection(configuration);
return DebeziumUtils.createMySqlConnection(configuration, new Properties());
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public void testBinlogReadFailoverCrossTransaction() throws Exception {
DataTypes.FIELD("phone_number", DataTypes.STRING()));
MySqlSplit binlogSplit;
try (MySqlConnection jdbc =
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
DebeziumUtils.createMySqlConnection(
sourceConfig.getDbzConfiguration(), sourceConfig.getJdbcProperties())) {
Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
binlogSplit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ public void before() {

@Test
public void testConsumingAllEvents() throws Exception {
runConsumingAllEventsTest("");
}

@Test
public void testConsumingAllEventsUseSSL() throws Exception {
runConsumingAllEventsTest(
", 'jdbc.properties.useSSL'= 'true',"
+ " 'jdbc.properties.requireSSL'= 'true',"
+ " 'jdbc.properties.verifyServerCerticate'= 'false'");
}

private void runConsumingAllEventsTest(String otherTableOptions) throws Exception {
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
Expand All @@ -154,6 +166,7 @@ public void testConsumingAllEvents() throws Exception {
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ " %s"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
Expand All @@ -164,7 +177,8 @@ public void testConsumingAllEvents() throws Exception {
getDezImplementation(),
incrementalSnapshot,
getServerId(),
getSplitSize());
getSplitSize(),
otherTableOptions);
String sinkDDL =
"CREATE TABLE sink ("
+ " name STRING,"
Expand Down

0 comments on commit 3640a5b

Please sign in to comment.