Skip to content

Commit

Permalink
Reducing MySQL SchemaDiscovery error timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle committed Oct 1, 2024
1 parent e122b9e commit 866f0d9
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public ImmutableList<String> discoverTables(
} catch (SQLException e) {
logger.error(
String.format(
"Sql exception while discovering table list for datasource=%s", dataSource, e));
"Sql exception while discovering table list for datasource=%s cause=%s",
dataSource, e));
schemaDiscoveryErrors.inc();
throw new SchemaDiscoveryException(e);
}
Expand Down Expand Up @@ -184,7 +185,7 @@ public ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTabl
throws SchemaDiscoveryException, RetriableSchemaDiscoveryException {
logger.info(
String.format(
"Discovering tale schema for Datasource: %s, SourceSchemaReference: %s, tables: %s",
"Discovering table schema for Datasource: %s, SourceSchemaReference: %s, tables: %s",
dataSource, sourceSchemaReference, tables));

String discoveryQuery = getSchemaDiscoveryQuery(sourceSchemaReference);
Expand Down Expand Up @@ -224,7 +225,7 @@ public ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTabl
tableSchemaBuilder.build();
logger.info(
String.format(
"Discovered tale schema for Datasource: %s, SourceSchemaReference: %s, tables: %s, schema: %s",
"Discovered table schema for Datasource: %s, SourceSchemaReference: %s, tables: %s, schema: %s",
dataSource, sourceSchemaReference, tables, tableSchema));

return tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public ImmutableList<String> discoverTables(
schemaDiscoveryErrors.inc();
throw new SchemaDiscoveryException(e);
} catch (SQLException e) {
logger.error("Sql exception while discovering table list for datasource={}", dataSource, e);
logger.error(
"Sql exception while discovering table list for datasource={} cause={}", dataSource, e);
schemaDiscoveryErrors.inc();
throw new SchemaDiscoveryException(e);
}
Expand Down Expand Up @@ -233,7 +234,7 @@ public ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTabl
throw new SchemaDiscoveryException(e);
} catch (SQLException e) {
logger.error(
"Sql exception while discovering table schema for datasource={} db={} tables={}",
"Sql exception while discovering table schema for datasource={} db={} tables={} cause={}",
dataSource,
sourceSchemaReference,
tables,
Expand All @@ -248,7 +249,7 @@ public ImmutableMap<String, ImmutableMap<String, SourceColumnType>> discoverTabl
ImmutableMap<String, ImmutableMap<String, SourceColumnType>> tableSchema =
tableSchemaBuilder.build();
logger.info(
"Discovered tale schema for Datasource: {}, SourceSchemaReference: {}, tables: {}, schema: {}",
"Discovered table schema for Datasource: {}, SourceSchemaReference: {}, tables: {}, schema: {}",
dataSource,
sourceSchemaReference,
tables,
Expand Down Expand Up @@ -368,7 +369,7 @@ public ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> discoverTableI
throw new SchemaDiscoveryException(e);
} catch (SQLException e) {
logger.error(
"Sql exception while discovering table schema for datasource={} db={} tables={}",
"Sql exception while discovering table schema for datasource={} db={} tables={} cause={}",
dataSource,
sourceSchemaReference,
tables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
// Call initializeSuper after deserialization
initializeSuper();
}

@Override
public String toString() {
return String.format(
"JdbcDataSource: {\"sourceDbURL\":\"%s\", \"initSql\":\"%s\", \"maxConnections\",\"%s\" }",
sourceDbURL, initSql, maxConnections);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.dbcp2.BasicDataSource;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
Expand Down Expand Up @@ -80,6 +81,7 @@ public static JdbcIoWrapper of(JdbcIOWrapperConfig config) throws SuitableIndexN
DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration(config);

DataSource dataSource = dataSourceConfiguration.buildDatasource();
setDataSourceLoginTimeout((BasicDataSource) dataSource, config);

SchemaDiscovery schemaDiscovery =
new SchemaDiscoveryImpl(config.dialectAdapter(), config.schemaDiscoveryBackOff());
Expand All @@ -92,6 +94,34 @@ public static JdbcIoWrapper of(JdbcIOWrapperConfig config) throws SuitableIndexN
return new JdbcIoWrapper(tableReaders, sourceSchema);
}

/**
* Set's the login timeout for the DataSource used for schema and index discoveries. This helps in
* early error reporting to the customer in case of unreachable or unavailable source database.
* The default login timeout for the {@link BasicDataSource} is infinite. Unfortunately, {@link
* BasicDataSource} does not directly support {@link DataSource#setLoginTimeout(int)}. This can be
* achieved by setting {@link BasicDataSource#setMaxWaitMillis} and connect timeout at the driver
* layer.
*
* @param dataSource
* @param config
*/
@VisibleForTesting
protected static void setDataSourceLoginTimeout(
BasicDataSource dataSource, JdbcIOWrapperConfig config) {

dataSource.setMaxWaitMillis(config.schemaDiscoveryConnectivityTimeoutMilliSeconds());

if (!(dataSource.getUrl().contains("connectTimeout"))) {
dataSource.addConnectionProperty(
"connectTimeout",
String.valueOf(config.schemaDiscoveryConnectivityTimeoutMilliSeconds()));
}
if (!(dataSource.getUrl().contains("socketTimeout"))) {
dataSource.addConnectionProperty(
"socketTimeout", String.valueOf(config.schemaDiscoveryConnectivityTimeoutMilliSeconds()));
}
}

/**
* Return a read transforms for the tables to migrate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public abstract class JdbcIOWrapperConfig {

private static final String DEFAULT_VALIDATEION_QUERY = "SELECT 1";

/** Sets the connectivity timeout in seconds during schema discovery. * */
public abstract Integer schemaDiscoveryConnectivityTimeoutMilliSeconds();

private static final Integer DEFAULT_SCHEMA_DISCOVERY_CONNECTIVITY_TIMEOUT_MILLISECONDS =
30 * 1000;

/**
* The timeout in seconds before an abandoned connection can be removed.
*
Expand Down Expand Up @@ -252,7 +258,9 @@ public static Builder builderWithMySqlDefaults() {
.setTestWhileIdle(DEFAULT_TEST_WILE_IDLE)
.setValidationQuery(DEFAULT_VALIDATEION_QUERY)
.setRemoveAbandonedTimeout(DEFAULT_REMOVE_ABANDONED_TIMEOUT)
.setMinEvictableIdleTimeMillis(DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
.setMinEvictableIdleTimeMillis(DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS)
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(
DEFAULT_SCHEMA_DISCOVERY_CONNECTIVITY_TIMEOUT_MILLISECONDS);
}

public static Builder builderWithPostgreSQLDefaults() {
Expand Down Expand Up @@ -280,7 +288,9 @@ public static Builder builderWithPostgreSQLDefaults() {
.setTestWhileIdle(DEFAULT_TEST_WILE_IDLE)
.setValidationQuery(DEFAULT_VALIDATEION_QUERY)
.setRemoveAbandonedTimeout(DEFAULT_REMOVE_ABANDONED_TIMEOUT)
.setMinEvictableIdleTimeMillis(DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
.setMinEvictableIdleTimeMillis(DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS)
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(
DEFAULT_SCHEMA_DISCOVERY_CONNECTIVITY_TIMEOUT_MILLISECONDS);
}

@AutoValue.Builder
Expand Down Expand Up @@ -339,6 +349,8 @@ public abstract Builder setAdditionalOperationsOnRanges(

public abstract Builder setValidationQuery(String value);

public abstract Builder setSchemaDiscoveryConnectivityTimeoutMilliSeconds(Integer value);

public abstract Builder setRemoveAbandonedTimeout(Integer value);

public abstract Builder setMinEvictableIdleTimeMillis(Integer value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import java.util.Calendar;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;

// TODO: Fine-tune the defaults based on benchmarking.

Expand Down Expand Up @@ -62,7 +63,8 @@ public class MySqlConfigDefaults {
"autoReconnect", "true",
"maxReconnects", "10");

public static final FluentBackoff DEFAULT_MYSQL_SCHEMA_DISCOVERY_BACKOFF = FluentBackoff.DEFAULT;
public static final FluentBackoff DEFAULT_MYSQL_SCHEMA_DISCOVERY_BACKOFF =
FluentBackoff.DEFAULT.withMaxCumulativeBackoff(Duration.standardMinutes(5L));

/**
* Default Initialization Sequence for the JDBC connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public void testJdbcDataSourceBasic() throws IOException, ClassNotFoundException
.isEqualTo(jdbcIOWrapperConfig.minEvictableIdleTimeMillis());
assertThat(testJdbcDataSource.getValidationQuery())
.isEqualTo(jdbcIOWrapperConfig.validationQuery());
assertThat(testJdbcDataSource.toString())
.isEqualTo(
"JdbcDataSource: {\"sourceDbURL\":\"jdbc:derby://myhost/memory:TestingDB;create=true\", \"initSql\":\"[SET TIME_ZONE = '+00:00', SET SESSION NET_WRITE_TIMEOUT=1200, SET SESSION NET_READ_TIMEOUT=1200]\", \"maxConnections\",\"160\" }");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.cloud.teleport.v2.source.reader.auth.dbauth.LocalCredentialsProvider;
Expand All @@ -44,6 +47,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.dbcp2.BasicDataSource;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -56,6 +60,8 @@
public class JdbcIoWrapperTest {
@Mock DialectAdapter mockDialectAdapter;

@Mock BasicDataSource mockBasicDataSource;

@BeforeClass
public static void beforeClass() {
// by default, derby uses a lock timeout of 60 seconds. In order to speed up the test
Expand Down Expand Up @@ -311,6 +317,66 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc
.isInstanceOf(ReadWithUniformPartitions.class);
}

@Test
public void testLoginTimeout() throws RetriableSchemaDiscoveryException {

int testLoginTimeout = 1000;

doNothing().when(mockBasicDataSource).setMaxWaitMillis(testLoginTimeout);
when(mockBasicDataSource.getUrl())
.thenReturn("jdbc://testIp:3306/testDB")
.thenReturn("jdbc://testIp:3306/testDB")
.thenReturn("jdbc://testIp:3306/testDB?connectTimeout=2000")
.thenReturn("jdbc://testIp:3306/testDB?socketTimeout=2000");
doNothing()
.when(mockBasicDataSource)
.addConnectionProperty("connectTimeout", String.valueOf(testLoginTimeout));
doNothing()
.when(mockBasicDataSource)
.addConnectionProperty("socketTimeout", String.valueOf(testLoginTimeout));

SourceSchemaReference testSourceSchemaReference =
SourceSchemaReference.builder().setDbName("testDB").build();

JdbcIOWrapperConfig config =
JdbcIOWrapperConfig.builderWithMySqlDefaults()
.setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true")
.setSourceSchemaReference(testSourceSchemaReference)
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(testLoginTimeout)
.setShardID("test")
.setTableVsPartitionColumns(ImmutableMap.of("testTable", ImmutableList.of("ID")))
.setDbAuth(
LocalCredentialsProvider.builder()
.setUserName("testUser")
.setPassword("testPassword")
.build())
.setJdbcDriverJars("")
.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver")
.setDialectAdapter(mockDialectAdapter)
.build();
JdbcIOWrapperConfig configWithTimeoutSet =
config.toBuilder()
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(testLoginTimeout)
.build();
JdbcIOWrapperConfig configWithUrlTimeout =
config.toBuilder()
.setSourceDbURL(
"jdbc:derby://myhost/memory:TestingDB;create=true?socketTimeout=10&connectTimeout=10")
.setSchemaDiscoveryConnectivityTimeoutMilliSeconds(testLoginTimeout)
.build();

JdbcIoWrapper.setDataSourceLoginTimeout(mockBasicDataSource, configWithTimeoutSet);
JdbcIoWrapper.setDataSourceLoginTimeout(mockBasicDataSource, configWithUrlTimeout);

assertThat(configWithTimeoutSet.schemaDiscoveryConnectivityTimeoutMilliSeconds())
.isEqualTo(testLoginTimeout);
verify(mockBasicDataSource, times(2)).setMaxWaitMillis(testLoginTimeout);
verify(mockBasicDataSource, times(1))
.addConnectionProperty("connectTimeout", String.valueOf(testLoginTimeout));
verify(mockBasicDataSource, times(1))
.addConnectionProperty("socketTimeout", String.valueOf(testLoginTimeout));
}

@Test
public void testIndexTypeToColumnClass() {

Expand Down

0 comments on commit 866f0d9

Please sign in to comment.