Skip to content

Commit

Permalink
fixed extra table in session file (#1928)
Browse files Browse the repository at this point in the history
* fixed extra table in session file

* fixed extra table handling in identity mapper

* addressed review comments

* added test

---------

Co-authored-by: Aditya Bharadwaj <adibh@google.com>
  • Loading branch information
bharadwaj-aditya and Aditya Bharadwaj authored Oct 15, 2024
1 parent 0fa3bbf commit d0ae3ec
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ private static ImmutableList<TableConfig> autoInferTableConfigs(
ImmutableList<String> discoveredTables =
schemaDiscovery.discoverTables(dataSource, config.sourceSchemaReference());
ImmutableList<String> tables = getTablesToMigrate(config.tables(), discoveredTables);
if (tables.isEmpty()) {
logger.info("source does not contain matching tables: {}", config.tables());
return ImmutableList.of();
}
ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> indexes =
schemaDiscovery.discoverTableIndexes(dataSource, config.sourceSchemaReference(), tables);
ImmutableList.Builder<TableConfig> tableConfigsBuilder = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -66,35 +67,14 @@ static PipelineResult executeSingleInstanceMigration(
Ddl ddl = SpannerSchema.getInformationSchemaAsDdl(spannerConfig);
ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(options, ddl);

List<String> tablesToMigrate =
PipelineController.listTablesToMigrate(options.getTables(), schemaMapper, ddl);
Set<String> tablesToMigrateSet = new HashSet<>(tablesToMigrate);
List<String> srcTablesToMigrate = listTablesToMigrate(options.getTables(), schemaMapper, ddl);
Set<String> tablesToMigrateSet = new HashSet<>(srcTablesToMigrate);

// This list is all Spanner tables topologically ordered.
List<String> orderedSpTables = ddl.getTablesOrderedByReference();
List<String> spannerTablesToMigrate =
listSpannerTablesToMigrate(ddl, schemaMapper, tablesToMigrateSet);

Map<String, PCollection<Void>> outputs = new HashMap<>();
// This list will contain the final list of tables that actually get migrated, which will be the
// intersection of Spanner and source tables.
List<String> finalTablesToMigrate = new ArrayList<>();
for (String spTable : orderedSpTables) {
String srcTable = schemaMapper.getSourceTableName("", spTable);
if (!tablesToMigrateSet.contains(srcTable)) {
continue;
}
finalTablesToMigrate.add(spTable);
}
LOG.info(
"{} Spanner tables in final selection for migration: {}",
finalTablesToMigrate.size(),
finalTablesToMigrate);
if (finalTablesToMigrate.size() > MAX_RECOMMENDED_TABLES_PER_JOB) {
LOG.warn(
"Migrating {} tables in a single job (max recommended: {}). Consider splitting tables across jobs to avoid launch issues.",
finalTablesToMigrate.size(),
MAX_RECOMMENDED_TABLES_PER_JOB);
}
for (String spTable : finalTablesToMigrate) {
for (String spTable : spannerTablesToMigrate) {
String srcTable = schemaMapper.getSourceTableName("", spTable);
List<PCollection<?>> parentOutputs = new ArrayList<>();
for (String parentSpTable : ddl.tablesReferenced(spTable)) {
Expand Down Expand Up @@ -134,11 +114,15 @@ static PipelineResult executeSingleInstanceMigration(
"Output PCollection for parent table should not be null.");
parentOutputs.add(parentOutputPcollection);
}
ReaderImpl reader =
ReaderImpl.of(
JdbcIoWrapper.of(
OptionsToConfigBuilder.getJdbcIOWrapperConfigWithDefaults(
options, List.of(srcTable), null, Wait.on(parentOutputs))));
JdbcIoWrapper jdbcIoWrapper =
JdbcIoWrapper.of(
OptionsToConfigBuilder.getJdbcIOWrapperConfigWithDefaults(
options, List.of(srcTable), null, Wait.on(parentOutputs)));
if (jdbcIoWrapper.getTableReaders().isEmpty()) {
LOG.info("not creating reader as table is not found at source: {}", srcTable);
continue;
}
ReaderImpl reader = ReaderImpl.of(jdbcIoWrapper);
String suffix = generateSuffix("", srcTable);
String shardIdColumn = "";
PCollection<Void> output =
Expand All @@ -158,6 +142,44 @@ static PipelineResult executeSingleInstanceMigration(
return pipeline.run();
}

@NotNull
/**
* This list will contain the final list of tables that actually get migrated, which will be the
* intersection of Spanner and source tables.
*/
static List<String> listSpannerTablesToMigrate(
Ddl ddl, ISchemaMapper schemaMapper, Set<String> tablesToMigrateSet) {
// This list is all Spanner tables topologically ordered.
List<String> orderedSpTables = ddl.getTablesOrderedByReference();

// This list will contain the final list of tables that actually get migrated, which will be the
// intersection of Spanner and source tables.
List<String> finalTablesToMigrate = new ArrayList<>();
for (String spTable : orderedSpTables) {
try {
String srcTable = schemaMapper.getSourceTableName("", spTable);
if (!tablesToMigrateSet.contains(srcTable)) {
LOG.info("ignoring table as no source maps to this spanner table: {}", spTable);
continue;
}
finalTablesToMigrate.add(spTable);
} catch (NoSuchElementException e) {
LOG.info("ignoring table not identified by schema mapper: {}", spTable);
}
}
LOG.info(
"{} Spanner tables in final selection for migration: {}",
finalTablesToMigrate.size(),
finalTablesToMigrate);
if (finalTablesToMigrate.size() > MAX_RECOMMENDED_TABLES_PER_JOB) {
LOG.warn(
"Migrating {} tables in a single job (max recommended: {}). Consider splitting tables across jobs to avoid launch issues.",
finalTablesToMigrate.size(),
MAX_RECOMMENDED_TABLES_PER_JOB);
}
return finalTablesToMigrate;
}

private static String generateSuffix(String shardId, String tableName) {
String suffix = "";
if (!StringUtils.isEmpty(shardId)) {
Expand All @@ -184,36 +206,11 @@ static PipelineResult executeShardedMigration(
Ddl ddl = SpannerSchema.getInformationSchemaAsDdl(spannerConfig);
ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(options, ddl);

List<String> tablesToMigrate =
PipelineController.listTablesToMigrate(options.getTables(), schemaMapper, ddl);
Set<String> tablesToMigrateSet = new HashSet<>(tablesToMigrate);
// This list is all Spanner tables topologically ordered.
List<String> orderedSpTables = ddl.getTablesOrderedByReference();
// This list will contain the final list of tables that actually get migrated, which will be the
// intersection of Spanner and source tables.
List<String> finalTablesToMigrate = new ArrayList<>();
for (String spTable : orderedSpTables) {
String srcTable = schemaMapper.getSourceTableName("", spTable);
if (!tablesToMigrateSet.contains(srcTable)) {
continue;
}
finalTablesToMigrate.add(spTable);
}
LOG.info(
"{} Spanner tables in final selection for migration: {}",
finalTablesToMigrate.size(),
finalTablesToMigrate);
long totalTablesAcrossShards = findNumLogicalshards(shards) * finalTablesToMigrate.size();
if (totalTablesAcrossShards > MAX_RECOMMENDED_TABLES_PER_JOB) {
LOG.warn(
"Migrating {} tables ({} shards x {} tables/shard) in a single job. "
+ "This exceeds the recommended maximum of {} tables per job. "
+ "Consider splitting shards across multiple jobs to avoid launch issues.",
totalTablesAcrossShards,
findNumLogicalshards(shards),
finalTablesToMigrate.size(),
MAX_RECOMMENDED_TABLES_PER_JOB);
}
List<String> srcTablesToMigrate = listTablesToMigrate(options.getTables(), schemaMapper, ddl);
Set<String> tablesToMigrateSet = new HashSet<>(srcTablesToMigrate);

List<String> spannerTablesToMigrate =
listSpannerTablesToMigrate(ddl, schemaMapper, tablesToMigrateSet);

LOG.info(
"running migration for shards: {}",
Expand All @@ -223,7 +220,7 @@ static PipelineResult executeShardedMigration(
// Read data from source
String shardId = entry.getValue();
Map<String, PCollection<Void>> outputs = new HashMap<>();
for (String spTable : finalTablesToMigrate) {
for (String spTable : spannerTablesToMigrate) {
String srcTable = schemaMapper.getSourceTableName("", spTable);
List<PCollection<?>> parentOutputs = new ArrayList<>();
for (String parentSpTable : ddl.tablesReferenced(spTable)) {
Expand All @@ -247,25 +244,32 @@ static PipelineResult executeShardedMigration(
"Output PCollection for parent table should not be null.");
parentOutputs.add(parentOutputPcollection);
}
ReaderImpl reader =
ReaderImpl.of(
JdbcIoWrapper.of(
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
sqlDialect,
List.of(srcTable),
null,
shard.getHost(),
shard.getConnectionProperties(),
Integer.parseInt(shard.getPort()),
shard.getUserName(),
shard.getPassword(),
entry.getKey(),
shardId,
options.getJdbcDriverClassName(),
options.getJdbcDriverJars(),
options.getMaxConnections(),
options.getNumPartitions(),
Wait.on(parentOutputs))));
JdbcIoWrapper jdbcIoWrapper =
JdbcIoWrapper.of(
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
sqlDialect,
List.of(srcTable),
null,
shard.getHost(),
shard.getConnectionProperties(),
Integer.parseInt(shard.getPort()),
shard.getUserName(),
shard.getPassword(),
entry.getKey(),
shardId,
options.getJdbcDriverClassName(),
options.getJdbcDriverJars(),
options.getMaxConnections(),
options.getNumPartitions(),
Wait.on(parentOutputs)));
if (jdbcIoWrapper.getTableReaders().isEmpty()) {
LOG.info(
"not creating reader as table is not found at source: {} shard: {}",
srcTable,
shard.getLogicalShardId());
continue;
}
ReaderImpl reader = ReaderImpl.of(jdbcIoWrapper);
String suffix = generateSuffix(shardId, srcTable);
String shardIdColumn =
schemaMapper.getShardIdColumnName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,51 @@ public void testJdbcIoWrapperNoIndexException() throws RetriableSchemaDiscoveryE
.build()));
}

@Test
public void testJdbcIoWrapperDifferentTables() throws RetriableSchemaDiscoveryException {
// Test to check what happens if config passes tables not present in source
SourceSchemaReference testSourceSchemaReference =
SourceSchemaReference.builder().setDbName("testDB").build();
String testCol = "ID";
SourceColumnType testColType = new SourceColumnType("INTEGER", new Long[] {}, null);
when(mockDialectAdapter.discoverTables(any(), any())).thenReturn(ImmutableList.of("testTable"));
when(mockDialectAdapter.discoverTableIndexes(any(), any(), any()))
.thenReturn(
ImmutableMap.of(
"testTable",
ImmutableList.of(
SourceColumnIndexInfo.builder()
.setIndexType(IndexType.NUMERIC)
.setIndexName("PRIMARY")
.setIsPrimary(true)
.setCardinality(42L)
.setColumnName(testCol)
.setIsUnique(true)
.setOrdinalPosition(1)
.build())));
when(mockDialectAdapter.discoverTableSchema(any(), any(), any()))
.thenReturn(ImmutableMap.of("testTable", ImmutableMap.of(testCol, testColType)));
JdbcIoWrapper jdbcIoWrapper =
JdbcIoWrapper.of(
JdbcIOWrapperConfig.builderWithMySqlDefaults()
.setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true")
.setSourceSchemaReference(testSourceSchemaReference)
.setShardID("test")
.setDbAuth(
LocalCredentialsProvider.builder()
.setUserName("testUser")
.setPassword("testPassword")
.build())
.setJdbcDriverJars("")
.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver")
.setDialectAdapter(mockDialectAdapter)
.setTables(ImmutableList.of("spanner_table"))
.build());
ImmutableMap<SourceTableReference, PTransform<PBegin, PCollection<SourceRow>>> tableReaders =
jdbcIoWrapper.getTableReaders();
assertThat(tableReaders.size()).isEqualTo(0);
}

@Test
public void testGetTablesToMigrate() {
ImmutableList<String> tablesToMigrate =
Expand All @@ -260,6 +305,11 @@ public void testGetTablesToMigrate() {
assertTrue(tablesToMigrate3.contains("p"));
assertTrue(tablesToMigrate3.contains("q"));
assertTrue(tablesToMigrate3.contains("r"));

// To handle scenarios where the configured tables are different from the tables at source
ImmutableList<String> tablesToMigrateDiffTables =
JdbcIoWrapper.getTablesToMigrate(ImmutableList.of("a"), ImmutableList.of("x", "y"));
assertEquals(0, tablesToMigrateDiffTables.size());
}

@Test
Expand Down
Loading

0 comments on commit d0ae3ec

Please sign in to comment.