Skip to content

Commit

Permalink
Reducing MySQL SchemaDiscovery error timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle committed Oct 3, 2024
1 parent 0ac9251 commit 609d14a
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public ImmutableList<String> discoverTables(
} catch (SQLException e) {
logger.error(
String.format(
"Sql exception while discovering table list for datasource=%s", dataSource, e));
"Sql exception while discovering table list for datasource=%s cause=%s",
dataSource, e));
schemaDiscoveryErrors.inc();
throw new SchemaDiscoveryException(e);
}
Expand Down Expand Up @@ -184,7 +185,7 @@ public ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTabl
throws SchemaDiscoveryException, RetriableSchemaDiscoveryException {
logger.info(
String.format(
"Discovering tale schema for Datasource: %s, SourceSchemaReference: %s, tables: %s",
"Discovering table schema for Datasource: %s, SourceSchemaReference: %s, tables: %s",
dataSource, sourceSchemaReference, tables));

String discoveryQuery = getSchemaDiscoveryQuery(sourceSchemaReference);
Expand Down Expand Up @@ -224,7 +225,7 @@ public ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTabl
tableSchemaBuilder.build();
logger.info(
String.format(
"Discovered tale schema for Datasource: %s, SourceSchemaReference: %s, tables: %s, schema: %s",
"Discovered table schema for Datasource: %s, SourceSchemaReference: %s, tables: %s, schema: %s",
dataSource, sourceSchemaReference, tables, tableSchema));

return tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTabl
ImmutableMap<String, ImmutableMap<String, SourceColumnType>> tableSchema =
tableSchemaBuilder.build();
logger.info(
"Discovered tale schema for Datasource: {}, SourceSchemaReference: {}, tables: {}, schema: {}",
"Discovered table schema for Datasource: {}, SourceSchemaReference: {}, tables: {}, schema: {}",
dataSource,
sourceSchemaReference,
tables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
// Call initializeSuper after deserialization
initializeSuper();
}

@Override
public String toString() {
return String.format(
"JdbcDataSource: {\"sourceDbURL\":\"%s\", \"initSql\":\"%s\", \"maxConnections\",\"%s\" }",
sourceDbURL, initSql, maxConnections);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.dbcp2.BasicDataSource;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
Expand Down Expand Up @@ -80,6 +81,7 @@ public static JdbcIoWrapper of(JdbcIOWrapperConfig config) throws SuitableIndexN
DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration(config);

DataSource dataSource = dataSourceConfiguration.buildDatasource();
setDataSourceLoginTimeout((BasicDataSource) dataSource, config);

SchemaDiscovery schemaDiscovery =
new SchemaDiscoveryImpl(config.dialectAdapter(), config.schemaDiscoveryBackOff());
Expand All @@ -92,6 +94,60 @@ public static JdbcIoWrapper of(JdbcIOWrapperConfig config) throws SuitableIndexN
return new JdbcIoWrapper(tableReaders, sourceSchema);
}

/**
* Set's the login timeout for the DataSource used for schema and index discoveries. This helps in
* early error reporting to the customer in case of unreachable or unavailable source database.
* The default login timeout for the {@link BasicDataSource} is infinite. Unfortunately, {@link
* BasicDataSource} does not directly support {@link DataSource#setLoginTimeout(int)}. This can be
* achieved by setting {@link BasicDataSource#setMaxWaitMillis} and connect timeout at the driver
* layer.
*
* @param dataSource
* @param config
*/
@VisibleForTesting
protected static void setDataSourceLoginTimeout(
BasicDataSource dataSource, JdbcIOWrapperConfig config) {

dataSource.setMaxWaitMillis(config.schemaDiscoveryConnectivityTimeoutMilliSeconds());

String connectivityTimeout;
switch (config.sourceDbDialect()) {
case MYSQL:
connectivityTimeout =
String.valueOf(config.schemaDiscoveryConnectivityTimeoutMilliSeconds());
setConnectionProperty(dataSource, "connectTimeout", connectivityTimeout);
setConnectionProperty(dataSource, "socketTimeout", connectivityTimeout);
break;
case POSTGRESQL:
connectivityTimeout =
String.valueOf(config.schemaDiscoveryConnectivityTimeoutMilliSeconds() / 1000);
setConnectionProperty(dataSource, "loginTimeout", connectivityTimeout);
setConnectionProperty(dataSource, "connectTimeout", connectivityTimeout);
setConnectionProperty(dataSource, "socketTimeout", connectivityTimeout);
break;
default:
logger.error(

Check warning on line 130 in v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java

View check run for this annotation

Codecov / codecov/patch

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java#L130

Added line #L130 was not covered by tests
"No connectivity timeout overrides implemented for dialect {}. In case of misconfigured network connectivity, schema discovery could timeout without correct error reporting.");
}
}

private static void setConnectionProperty(
BasicDataSource dataSource, String property, String value) {

String url = dataSource.getUrl();
if (!url.contains(property)) {
dataSource.addConnectionProperty(property, value);
logger.info("Set {} = {} for schema discovery of {}", property, value, dataSource);
} else {
logger.warn(
"Property {} already set in URL {}. Not overriding with {} for schema discovery. The default over-ride helps in failing fast in case of misconfigured network connectivity.",
property,
url,
value);
}
}

/**
* Return a read transforms for the tables to migrate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public abstract class JdbcIOWrapperConfig {

private static final String DEFAULT_VALIDATEION_QUERY = "SELECT 1";

/** Sets the connectivity timeout in seconds during schema discovery. * */
public abstract Integer schemaDiscoveryConnectivityTimeoutMilliSeconds();

private static final Integer DEFAULT_SCHEMA_DISCOVERY_CONNECTIVITY_TIMEOUT_MILLISECONDS =
30 * 1000;

/**
* The timeout in seconds before an abandoned connection can be removed.
*
Expand Down Expand Up @@ -252,7 +258,9 @@ public static Builder builderWithMySqlDefaults() {
.setTestWhileIdle(DEFAULT_TEST_WILE_IDLE)
.setValidationQuery(DEFAULT_VALIDATEION_QUERY)
.setRemoveAbandonedTimeout(DEFAULT_REMOVE_ABANDONED_TIMEOUT)
.setMinEvictableIdleTimeMillis(DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
.setMinEvictableIdleTimeMillis(DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS)
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(
DEFAULT_SCHEMA_DISCOVERY_CONNECTIVITY_TIMEOUT_MILLISECONDS);
}

public static Builder builderWithPostgreSQLDefaults() {
Expand Down Expand Up @@ -280,7 +288,9 @@ public static Builder builderWithPostgreSQLDefaults() {
.setTestWhileIdle(DEFAULT_TEST_WILE_IDLE)
.setValidationQuery(DEFAULT_VALIDATEION_QUERY)
.setRemoveAbandonedTimeout(DEFAULT_REMOVE_ABANDONED_TIMEOUT)
.setMinEvictableIdleTimeMillis(DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
.setMinEvictableIdleTimeMillis(DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS)
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(
DEFAULT_SCHEMA_DISCOVERY_CONNECTIVITY_TIMEOUT_MILLISECONDS);
}

@AutoValue.Builder
Expand Down Expand Up @@ -339,6 +349,8 @@ public abstract Builder setAdditionalOperationsOnRanges(

public abstract Builder setValidationQuery(String value);

public abstract Builder setSchemaDiscoveryConnectivityTimeoutMilliSeconds(Integer value);

public abstract Builder setRemoveAbandonedTimeout(Integer value);

public abstract Builder setMinEvictableIdleTimeMillis(Integer value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import java.util.Calendar;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;

// TODO: Fine-tune the defaults based on benchmarking.

Expand Down Expand Up @@ -62,7 +63,8 @@ public class MySqlConfigDefaults {
"autoReconnect", "true",
"maxReconnects", "10");

public static final FluentBackoff DEFAULT_MYSQL_SCHEMA_DISCOVERY_BACKOFF = FluentBackoff.DEFAULT;
public static final FluentBackoff DEFAULT_MYSQL_SCHEMA_DISCOVERY_BACKOFF =
FluentBackoff.DEFAULT.withMaxCumulativeBackoff(Duration.standardMinutes(5L));

/**
* Default Initialization Sequence for the JDBC connection.
Expand All @@ -83,6 +85,8 @@ public class MySqlConfigDefaults {
// TODO: Add innodb_parallel_read_threads for better performance tuning.
public static final ImmutableList<String> DEFAULT_MYSQL_INIT_SEQ =
ImmutableList.of(
// Using an offset of 0 instead of UTC, as it's possible for a customer's database to not
// have named timezone information pre-installed.
"SET TIME_ZONE = '+00:00'",
"SET SESSION NET_WRITE_TIMEOUT=1200",
"SET SESSION NET_READ_TIMEOUT=1200");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapper;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;

// TODO(thiagotnunes): Fine-tune the defaults based on benchmarking.

Expand All @@ -41,11 +42,14 @@ public class PostgreSQLConfigDefaults {
public static final Long DEFAULT_POSTGRESQL_MAX_CONNECTIONS = 160L;

public static final FluentBackoff DEFAULT_POSTGRESQL_SCHEMA_DISCOVERY_BACKOFF =
FluentBackoff.DEFAULT;
FluentBackoff.DEFAULT.withMaxCumulativeBackoff(Duration.standardMinutes(5L));

/** Default Initialization Sequence for the JDBC connection. */
public static final ImmutableList<String> DEFAULT_POSTGRESQL_INIT_SEQ =
ImmutableList.of("SET TIME ZONE 'UTC'");
ImmutableList.of(
// Using an offset of 0 instead of UTC, as it's possible for a customer's database to not
// have named timezone information pre-installed.
"SET TIME ZONE '+00:00'");

private PostgreSQLConfigDefaults() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public void testJdbcDataSourceBasic() throws IOException, ClassNotFoundException
.isEqualTo(jdbcIOWrapperConfig.minEvictableIdleTimeMillis());
assertThat(testJdbcDataSource.getValidationQuery())
.isEqualTo(jdbcIOWrapperConfig.validationQuery());
assertThat(testJdbcDataSource.toString())
.isEqualTo(
"JdbcDataSource: {\"sourceDbURL\":\"jdbc:derby://myhost/memory:TestingDB;create=true\", \"initSql\":\"[SET TIME_ZONE = '+00:00', SET SESSION NET_WRITE_TIMEOUT=1200, SET SESSION NET_READ_TIMEOUT=1200]\", \"maxConnections\",\"160\" }");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.cloud.teleport.v2.source.reader.auth.dbauth.LocalCredentialsProvider;
import com.google.cloud.teleport.v2.source.reader.io.exception.RetriableSchemaDiscoveryException;
import com.google.cloud.teleport.v2.source.reader.io.exception.SuitableIndexNotFoundException;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.DialectAdapter;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.JdbcIOWrapperConfig;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms.ReadWithUniformPartitions;
import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo;
Expand All @@ -44,6 +48,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.dbcp2.BasicDataSource;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -56,6 +61,8 @@
public class JdbcIoWrapperTest {
@Mock DialectAdapter mockDialectAdapter;

@Mock BasicDataSource mockBasicDataSource;

@BeforeClass
public static void beforeClass() {
// by default, derby uses a lock timeout of 60 seconds. In order to speed up the test
Expand Down Expand Up @@ -311,6 +318,75 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc
.isInstanceOf(ReadWithUniformPartitions.class);
}

@Test
public void testLoginTimeout() throws RetriableSchemaDiscoveryException {

int testLoginTimeoutMilliseconds = 1000;
int testLoginTimeoutSeconds = 1;

doNothing().when(mockBasicDataSource).setMaxWaitMillis(testLoginTimeoutMilliseconds);
when(mockBasicDataSource.getUrl())
.thenReturn("jdbc://testIp:3306/testDB")
.thenReturn("jdbc://testIp:3306/testDB")
.thenReturn("jdbc://testIp:3306/testDB?connectTimeout=2000&socketTimeout=2000")
.thenReturn("jdbc://testIp:3306/testDB?connectTimeout=2000&socketTimeout=2000")
.thenReturn("jdbc://testIp:3306/testDB?connectTimeout=2000&socketTimeout=2000");
doNothing()
.when(mockBasicDataSource)
.addConnectionProperty("connectTimeout", String.valueOf(testLoginTimeoutMilliseconds));
doNothing()
.when(mockBasicDataSource)
.addConnectionProperty("socketTimeout", String.valueOf(testLoginTimeoutMilliseconds));
doNothing()
.when(mockBasicDataSource)
.addConnectionProperty("loginTimeout", String.valueOf(testLoginTimeoutSeconds));

SourceSchemaReference testSourceSchemaReference =
SourceSchemaReference.builder().setDbName("testDB").build();

JdbcIOWrapperConfig config =
JdbcIOWrapperConfig.builderWithMySqlDefaults()
.setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true")
.setSourceSchemaReference(testSourceSchemaReference)
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(testLoginTimeoutMilliseconds)
.setShardID("test")
.setTableVsPartitionColumns(ImmutableMap.of("testTable", ImmutableList.of("ID")))
.setDbAuth(
LocalCredentialsProvider.builder()
.setUserName("testUser")
.setPassword("testPassword")
.build())
.setJdbcDriverJars("")
.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver")
.setDialectAdapter(mockDialectAdapter)
.build();
JdbcIOWrapperConfig configWithTimeoutSet =
config.toBuilder()
.setSourceDbDialect(SQLDialect.MYSQL)
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(testLoginTimeoutMilliseconds)
.build();
JdbcIOWrapperConfig configWithUrlTimeout =
config.toBuilder()
.setSourceDbDialect(SQLDialect.POSTGRESQL)
.setSourceDbURL(
"jdbc:derby://myhost/memory:TestingDB;create=true?socketTimeout=10&connectTimeout=10")
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(testLoginTimeoutMilliseconds)
.build();

JdbcIoWrapper.setDataSourceLoginTimeout(mockBasicDataSource, configWithTimeoutSet);
JdbcIoWrapper.setDataSourceLoginTimeout(mockBasicDataSource, configWithUrlTimeout);

assertThat(configWithTimeoutSet.schemaDiscoveryConnectivityTimeoutMilliSeconds())
.isEqualTo(testLoginTimeoutMilliseconds);
verify(mockBasicDataSource, times(2)).setMaxWaitMillis(testLoginTimeoutMilliseconds);
verify(mockBasicDataSource, times(1))
.addConnectionProperty("connectTimeout", String.valueOf(testLoginTimeoutMilliseconds));
verify(mockBasicDataSource, times(1))
.addConnectionProperty("socketTimeout", String.valueOf(testLoginTimeoutMilliseconds));
verify(mockBasicDataSource, times(1))
.addConnectionProperty("loginTimeout", String.valueOf(testLoginTimeoutSeconds));
}

@Test
public void testIndexTypeToColumnClass() {

Expand Down

0 comments on commit 609d14a

Please sign in to comment.