diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java index ef40913443..b6c2e1eb33 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java @@ -258,6 +258,10 @@ private static ImmutableList autoInferTableConfigs( ImmutableList discoveredTables = schemaDiscovery.discoverTables(dataSource, config.sourceSchemaReference()); ImmutableList tables = getTablesToMigrate(config.tables(), discoveredTables); + if (tables.isEmpty()) { + logger.info("source does not contain matching tables: {}", config.tables()); + return ImmutableList.of(); + } ImmutableMap> indexes = schemaDiscovery.discoverTableIndexes(dataSource, config.sourceSchemaReference(), tables); ImmutableList.Builder tableConfigsBuilder = ImmutableList.builder(); diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java index 6d19bc4ef5..7d883cecb5 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,35 +67,14 @@ static PipelineResult executeSingleInstanceMigration( Ddl ddl = SpannerSchema.getInformationSchemaAsDdl(spannerConfig); ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(options, ddl); - List tablesToMigrate = - PipelineController.listTablesToMigrate(options.getTables(), schemaMapper, ddl); - Set tablesToMigrateSet = new HashSet<>(tablesToMigrate); + List srcTablesToMigrate = listTablesToMigrate(options.getTables(), schemaMapper, ddl); + Set tablesToMigrateSet = new HashSet<>(srcTablesToMigrate); - // This list is all Spanner tables topologically ordered. - List orderedSpTables = ddl.getTablesOrderedByReference(); + List spannerTablesToMigrate = + listSpannerTablesToMigrate(ddl, schemaMapper, tablesToMigrateSet); Map> outputs = new HashMap<>(); - // This list will contain the final list of tables that actually get migrated, which will be the - // intersection of Spanner and source tables. - List finalTablesToMigrate = new ArrayList<>(); - for (String spTable : orderedSpTables) { - String srcTable = schemaMapper.getSourceTableName("", spTable); - if (!tablesToMigrateSet.contains(srcTable)) { - continue; - } - finalTablesToMigrate.add(spTable); - } - LOG.info( - "{} Spanner tables in final selection for migration: {}", - finalTablesToMigrate.size(), - finalTablesToMigrate); - if (finalTablesToMigrate.size() > MAX_RECOMMENDED_TABLES_PER_JOB) { - LOG.warn( - "Migrating {} tables in a single job (max recommended: {}). Consider splitting tables across jobs to avoid launch issues.", - finalTablesToMigrate.size(), - MAX_RECOMMENDED_TABLES_PER_JOB); - } - for (String spTable : finalTablesToMigrate) { + for (String spTable : spannerTablesToMigrate) { String srcTable = schemaMapper.getSourceTableName("", spTable); List> parentOutputs = new ArrayList<>(); for (String parentSpTable : ddl.tablesReferenced(spTable)) { @@ -134,11 +114,15 @@ static PipelineResult executeSingleInstanceMigration( "Output PCollection for parent table should not be null."); parentOutputs.add(parentOutputPcollection); } - ReaderImpl reader = - ReaderImpl.of( - JdbcIoWrapper.of( - OptionsToConfigBuilder.getJdbcIOWrapperConfigWithDefaults( - options, List.of(srcTable), null, Wait.on(parentOutputs)))); + JdbcIoWrapper jdbcIoWrapper = + JdbcIoWrapper.of( + OptionsToConfigBuilder.getJdbcIOWrapperConfigWithDefaults( + options, List.of(srcTable), null, Wait.on(parentOutputs))); + if (jdbcIoWrapper.getTableReaders().isEmpty()) { + LOG.info("not creating reader as table is not found at source: {}", srcTable); + continue; + } + ReaderImpl reader = ReaderImpl.of(jdbcIoWrapper); String suffix = generateSuffix("", srcTable); String shardIdColumn = ""; PCollection output = @@ -158,6 +142,44 @@ static PipelineResult executeSingleInstanceMigration( return pipeline.run(); } + @NotNull + /** + * This list will contain the final list of tables that actually get migrated, which will be the + * intersection of Spanner and source tables. + */ + static List listSpannerTablesToMigrate( + Ddl ddl, ISchemaMapper schemaMapper, Set tablesToMigrateSet) { + // This list is all Spanner tables topologically ordered. + List orderedSpTables = ddl.getTablesOrderedByReference(); + + // This list will contain the final list of tables that actually get migrated, which will be the + // intersection of Spanner and source tables. + List finalTablesToMigrate = new ArrayList<>(); + for (String spTable : orderedSpTables) { + try { + String srcTable = schemaMapper.getSourceTableName("", spTable); + if (!tablesToMigrateSet.contains(srcTable)) { + LOG.info("ignoring table as no source maps to this spanner table: {}", spTable); + continue; + } + finalTablesToMigrate.add(spTable); + } catch (NoSuchElementException e) { + LOG.info("ignoring table not identified by schema mapper: {}", spTable); + } + } + LOG.info( + "{} Spanner tables in final selection for migration: {}", + finalTablesToMigrate.size(), + finalTablesToMigrate); + if (finalTablesToMigrate.size() > MAX_RECOMMENDED_TABLES_PER_JOB) { + LOG.warn( + "Migrating {} tables in a single job (max recommended: {}). Consider splitting tables across jobs to avoid launch issues.", + finalTablesToMigrate.size(), + MAX_RECOMMENDED_TABLES_PER_JOB); + } + return finalTablesToMigrate; + } + private static String generateSuffix(String shardId, String tableName) { String suffix = ""; if (!StringUtils.isEmpty(shardId)) { @@ -184,36 +206,11 @@ static PipelineResult executeShardedMigration( Ddl ddl = SpannerSchema.getInformationSchemaAsDdl(spannerConfig); ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(options, ddl); - List tablesToMigrate = - PipelineController.listTablesToMigrate(options.getTables(), schemaMapper, ddl); - Set tablesToMigrateSet = new HashSet<>(tablesToMigrate); - // This list is all Spanner tables topologically ordered. - List orderedSpTables = ddl.getTablesOrderedByReference(); - // This list will contain the final list of tables that actually get migrated, which will be the - // intersection of Spanner and source tables. - List finalTablesToMigrate = new ArrayList<>(); - for (String spTable : orderedSpTables) { - String srcTable = schemaMapper.getSourceTableName("", spTable); - if (!tablesToMigrateSet.contains(srcTable)) { - continue; - } - finalTablesToMigrate.add(spTable); - } - LOG.info( - "{} Spanner tables in final selection for migration: {}", - finalTablesToMigrate.size(), - finalTablesToMigrate); - long totalTablesAcrossShards = findNumLogicalshards(shards) * finalTablesToMigrate.size(); - if (totalTablesAcrossShards > MAX_RECOMMENDED_TABLES_PER_JOB) { - LOG.warn( - "Migrating {} tables ({} shards x {} tables/shard) in a single job. " - + "This exceeds the recommended maximum of {} tables per job. " - + "Consider splitting shards across multiple jobs to avoid launch issues.", - totalTablesAcrossShards, - findNumLogicalshards(shards), - finalTablesToMigrate.size(), - MAX_RECOMMENDED_TABLES_PER_JOB); - } + List srcTablesToMigrate = listTablesToMigrate(options.getTables(), schemaMapper, ddl); + Set tablesToMigrateSet = new HashSet<>(srcTablesToMigrate); + + List spannerTablesToMigrate = + listSpannerTablesToMigrate(ddl, schemaMapper, tablesToMigrateSet); LOG.info( "running migration for shards: {}", @@ -223,7 +220,7 @@ static PipelineResult executeShardedMigration( // Read data from source String shardId = entry.getValue(); Map> outputs = new HashMap<>(); - for (String spTable : finalTablesToMigrate) { + for (String spTable : spannerTablesToMigrate) { String srcTable = schemaMapper.getSourceTableName("", spTable); List> parentOutputs = new ArrayList<>(); for (String parentSpTable : ddl.tablesReferenced(spTable)) { @@ -247,25 +244,32 @@ static PipelineResult executeShardedMigration( "Output PCollection for parent table should not be null."); parentOutputs.add(parentOutputPcollection); } - ReaderImpl reader = - ReaderImpl.of( - JdbcIoWrapper.of( - OptionsToConfigBuilder.getJdbcIOWrapperConfig( - sqlDialect, - List.of(srcTable), - null, - shard.getHost(), - shard.getConnectionProperties(), - Integer.parseInt(shard.getPort()), - shard.getUserName(), - shard.getPassword(), - entry.getKey(), - shardId, - options.getJdbcDriverClassName(), - options.getJdbcDriverJars(), - options.getMaxConnections(), - options.getNumPartitions(), - Wait.on(parentOutputs)))); + JdbcIoWrapper jdbcIoWrapper = + JdbcIoWrapper.of( + OptionsToConfigBuilder.getJdbcIOWrapperConfig( + sqlDialect, + List.of(srcTable), + null, + shard.getHost(), + shard.getConnectionProperties(), + Integer.parseInt(shard.getPort()), + shard.getUserName(), + shard.getPassword(), + entry.getKey(), + shardId, + options.getJdbcDriverClassName(), + options.getJdbcDriverJars(), + options.getMaxConnections(), + options.getNumPartitions(), + Wait.on(parentOutputs))); + if (jdbcIoWrapper.getTableReaders().isEmpty()) { + LOG.info( + "not creating reader as table is not found at source: {} shard: {}", + srcTable, + shard.getLogicalShardId()); + continue; + } + ReaderImpl reader = ReaderImpl.of(jdbcIoWrapper); String suffix = generateSuffix(shardId, srcTable); String shardIdColumn = schemaMapper.getShardIdColumnName( diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java index eca70d9a53..27696af8e6 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java @@ -240,6 +240,51 @@ public void testJdbcIoWrapperNoIndexException() throws RetriableSchemaDiscoveryE .build())); } + @Test + public void testJdbcIoWrapperDifferentTables() throws RetriableSchemaDiscoveryException { + // Test to check what happens if config passes tables not present in source + SourceSchemaReference testSourceSchemaReference = + SourceSchemaReference.builder().setDbName("testDB").build(); + String testCol = "ID"; + SourceColumnType testColType = new SourceColumnType("INTEGER", new Long[] {}, null); + when(mockDialectAdapter.discoverTables(any(), any())).thenReturn(ImmutableList.of("testTable")); + when(mockDialectAdapter.discoverTableIndexes(any(), any(), any())) + .thenReturn( + ImmutableMap.of( + "testTable", + ImmutableList.of( + SourceColumnIndexInfo.builder() + .setIndexType(IndexType.NUMERIC) + .setIndexName("PRIMARY") + .setIsPrimary(true) + .setCardinality(42L) + .setColumnName(testCol) + .setIsUnique(true) + .setOrdinalPosition(1) + .build()))); + when(mockDialectAdapter.discoverTableSchema(any(), any(), any())) + .thenReturn(ImmutableMap.of("testTable", ImmutableMap.of(testCol, testColType))); + JdbcIoWrapper jdbcIoWrapper = + JdbcIoWrapper.of( + JdbcIOWrapperConfig.builderWithMySqlDefaults() + .setSourceDbURL("jdbc:derby://myhost/memory:TestingDB;create=true") + .setSourceSchemaReference(testSourceSchemaReference) + .setShardID("test") + .setDbAuth( + LocalCredentialsProvider.builder() + .setUserName("testUser") + .setPassword("testPassword") + .build()) + .setJdbcDriverJars("") + .setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver") + .setDialectAdapter(mockDialectAdapter) + .setTables(ImmutableList.of("spanner_table")) + .build()); + ImmutableMap>> tableReaders = + jdbcIoWrapper.getTableReaders(); + assertThat(tableReaders.size()).isEqualTo(0); + } + @Test public void testGetTablesToMigrate() { ImmutableList tablesToMigrate = @@ -260,6 +305,11 @@ public void testGetTablesToMigrate() { assertTrue(tablesToMigrate3.contains("p")); assertTrue(tablesToMigrate3.contains("q")); assertTrue(tablesToMigrate3.contains("r")); + + // To handle scenarios where the configured tables are different from the tables at source + ImmutableList tablesToMigrateDiffTables = + JdbcIoWrapper.getTablesToMigrate(ImmutableList.of("a"), ImmutableList.of("x", "y")); + assertEquals(0, tablesToMigrateDiffTables.size()); } @Test diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PipelineControllerTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PipelineControllerTest.java new file mode 100644 index 0000000000..f8f28e5e20 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PipelineControllerTest.java @@ -0,0 +1,276 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions; +import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidOptionsException; +import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaMapper; +import com.google.cloud.teleport.v2.spanner.migrations.schema.IdentityMapper; +import com.google.cloud.teleport.v2.spanner.migrations.schema.SessionBasedMapper; +import com.google.common.io.Resources; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class PipelineControllerTest { + + private Ddl spannerDdl; + + private Ddl spannerDdlWithExtraTable; + + @Before + public void setup() { + spannerDdl = + Ddl.builder() + .createTable("new_cart") + .column("new_quantity") + .int64() + .notNull() + .endColumn() + .column("new_user_id") + .string() + .size(10) + .endColumn() + .primaryKey() + .asc("new_user_id") + .asc("new_quantity") + .end() + .endTable() + .createTable("new_people") + .column("synth_id") + .int64() + .notNull() + .endColumn() + .column("new_name") + .string() + .size(10) + .endColumn() + .primaryKey() + .asc("synth_id") + .end() + .endTable() + .build(); + + spannerDdlWithExtraTable = + Ddl.builder() + .createTable("new_cart") + .column("new_quantity") + .int64() + .notNull() + .endColumn() + .column("new_user_id") + .string() + .size(10) + .endColumn() + .primaryKey() + .asc("new_user_id") + .asc("new_quantity") + .end() + .endTable() + .createTable("new_people") + .column("synth_id") + .int64() + .notNull() + .endColumn() + .column("new_name") + .string() + .size(10) + .endColumn() + .primaryKey() + .asc("synth_id") + .end() + .endTable() + .createTable("extra_table") + .column("synth_id") + .int64() + .notNull() + .endColumn() + .column("new_name") + .string() + .size(10) + .endColumn() + .primaryKey() + .asc("synth_id") + .end() + .endTable() + .build(); + } + + @Test + public void createIdentitySchemaMapper() { + SourceDbToSpannerOptions mockOptions = createOptionsHelper("", ""); + ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); + assertTrue(schemaMapper instanceof IdentityMapper); + } + + @Test + public void createSessionSchemaMapper() { + SourceDbToSpannerOptions mockOptions = + createOptionsHelper( + Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) + .toString(), + null); + ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); + assertTrue(schemaMapper instanceof SessionBasedMapper); + } + + @Test(expected = Exception.class) + public void createInvalidSchemaMapper_withException() { + SourceDbToSpannerOptions mockOptions = createOptionsHelper("invalid-file", ""); + PipelineController.getSchemaMapper(mockOptions, spannerDdl); + } + + private SourceDbToSpannerOptions createOptionsHelper(String sessionFile, String tables) { + SourceDbToSpannerOptions mockOptions = + mock(SourceDbToSpannerOptions.class, Mockito.withSettings().serializable()); + when(mockOptions.getSessionFilePath()).thenReturn(sessionFile); + when(mockOptions.getTables()).thenReturn(tables); + return mockOptions; + } + + @Test + public void listTablesToMigrateIdentity() { + SourceDbToSpannerOptions mockOptions = createOptionsHelper("", ""); + ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); + List tables = PipelineController.listTablesToMigrate("", schemaMapper, spannerDdl); + List ddlTables = + spannerDdl.allTables().stream().map(t -> t.name()).collect(Collectors.toList()); + assertEquals(2, tables.size()); + assertTrue(ddlTables.containsAll(tables)); + } + + @Test + public void listTablesToMigrateIdentityOverride() { + SourceDbToSpannerOptions mockOptions = createOptionsHelper("", "new_cart"); + ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); + List tables = + PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); + List ddlTables = + spannerDdl.allTables().stream().map(t -> t.name()).collect(Collectors.toList()); + assertEquals(1, tables.size()); + assertTrue(ddlTables.containsAll(tables)); + } + + @Test + public void listTablesToMigrateSession() { + SourceDbToSpannerOptions mockOptions = + createOptionsHelper( + Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) + .toString(), + "cart,people"); + ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); + List tables = + PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); + + assertEquals(2, tables.size()); + assertTrue(tables.contains("cart")); + assertTrue(tables.contains("people")); + } + + @Test + public void listTablesToMigrateSessionOverride() { + SourceDbToSpannerOptions mockOptions = + createOptionsHelper( + Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) + .toString(), + "cart"); + ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); + List tables = + PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); + + assertEquals(1, tables.size()); + assertTrue(tables.contains("cart")); + } + + @Test(expected = InvalidOptionsException.class) + public void listTablesToMigrateSessionOverrideInvalid() { + SourceDbToSpannerOptions mockOptions = + createOptionsHelper( + Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) + .toString(), + "asd"); + ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); + List tables = + PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); + } + + @Test + public void spannerTablesToMigrateSession() { + SourceDbToSpannerOptions mockOptions = + createOptionsHelper( + Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) + .toString(), + "cart,people"); + ISchemaMapper schemaMapper = + PipelineController.getSchemaMapper(mockOptions, spannerDdlWithExtraTable); + List tables = + PipelineController.listSpannerTablesToMigrate( + spannerDdl, schemaMapper, new HashSet<>(schemaMapper.getSourceTablesToMigrate(""))); + + assertEquals(2, tables.size()); + assertTrue(tables.contains("new_cart")); + assertTrue(tables.contains("new_people")); + } + + @Test + public void spannerTablesToMigrateSessionWithExtraTable() { + SourceDbToSpannerOptions mockOptions = + createOptionsHelper( + Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) + .toString(), + "cart,people"); + ISchemaMapper schemaMapper = + PipelineController.getSchemaMapper(mockOptions, spannerDdlWithExtraTable); + List tables = + PipelineController.listSpannerTablesToMigrate( + spannerDdlWithExtraTable, + schemaMapper, + new HashSet<>(schemaMapper.getSourceTablesToMigrate(""))); + + assertEquals(2, tables.size()); + assertTrue(tables.contains("new_cart")); + assertTrue(tables.contains("new_people")); + } + + @Test + public void spannerTablesToMigrateSessionWithLessTables() { + SourceDbToSpannerOptions mockOptions = + createOptionsHelper( + Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) + .toString(), + "cart,people"); + ISchemaMapper schemaMapper = + PipelineController.getSchemaMapper(mockOptions, spannerDdlWithExtraTable); + Set srcTables = new HashSet(); + srcTables.add("cart"); + List tables = + PipelineController.listSpannerTablesToMigrate(spannerDdl, schemaMapper, srcTables); + + assertEquals(1, tables.size()); + assertTrue(tables.contains("new_cart")); + } +} diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java index 35b7accd6b..d74272c8fe 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java @@ -16,63 +16,16 @@ package com.google.cloud.teleport.v2.templates; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions; -import com.google.cloud.teleport.v2.spanner.ddl.Ddl; -import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidOptionsException; -import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaMapper; -import com.google.cloud.teleport.v2.spanner.migrations.schema.IdentityMapper; -import com.google.cloud.teleport.v2.spanner.migrations.schema.SessionBasedMapper; -import com.google.common.io.Resources; -import java.nio.file.Paths; -import java.util.List; -import java.util.stream.Collectors; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; -import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; public class SourceDbToSpannerTest { - private Ddl spannerDdl; - - @Before - public void setup() { - spannerDdl = - Ddl.builder() - .createTable("new_cart") - .column("new_quantity") - .int64() - .notNull() - .endColumn() - .column("new_user_id") - .string() - .size(10) - .endColumn() - .primaryKey() - .asc("new_user_id") - .asc("new_quantity") - .end() - .endTable() - .createTable("new_people") - .column("synth_id") - .int64() - .notNull() - .endColumn() - .column("new_name") - .string() - .size(10) - .endColumn() - .primaryKey() - .asc("synth_id") - .end() - .endTable() - .build(); - } - @Test public void testCreateSpannerConfig() { SourceDbToSpannerOptions mockOptions = @@ -87,102 +40,4 @@ public void testCreateSpannerConfig() { assertEquals(config.getInstanceId().get(), "testInstance"); assertEquals(config.getDatabaseId().get(), "testDatabaseId"); } - - @Test - public void createIdentitySchemaMapper() { - SourceDbToSpannerOptions mockOptions = createOptionsHelper("", ""); - ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); - assertTrue(schemaMapper instanceof IdentityMapper); - } - - @Test - public void createSessionSchemaMapper() { - SourceDbToSpannerOptions mockOptions = - createOptionsHelper( - Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) - .toString(), - null); - ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); - assertTrue(schemaMapper instanceof SessionBasedMapper); - } - - @Test(expected = Exception.class) - public void createInvalidSchemaMapper_withException() { - SourceDbToSpannerOptions mockOptions = createOptionsHelper("invalid-file", ""); - PipelineController.getSchemaMapper(mockOptions, spannerDdl); - } - - private SourceDbToSpannerOptions createOptionsHelper(String sessionFile, String tables) { - SourceDbToSpannerOptions mockOptions = - mock(SourceDbToSpannerOptions.class, Mockito.withSettings().serializable()); - when(mockOptions.getSessionFilePath()).thenReturn(sessionFile); - when(mockOptions.getTables()).thenReturn(tables); - return mockOptions; - } - - @Test - public void listTablesToMigrateIdentity() { - SourceDbToSpannerOptions mockOptions = createOptionsHelper("", ""); - ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); - List tables = PipelineController.listTablesToMigrate("", schemaMapper, spannerDdl); - List ddlTables = - spannerDdl.allTables().stream().map(t -> t.name()).collect(Collectors.toList()); - assertEquals(2, tables.size()); - assertTrue(ddlTables.containsAll(tables)); - } - - @Test - public void listTablesToMigrateIdentityOverride() { - SourceDbToSpannerOptions mockOptions = createOptionsHelper("", "new_cart"); - ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); - List tables = - PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); - List ddlTables = - spannerDdl.allTables().stream().map(t -> t.name()).collect(Collectors.toList()); - assertEquals(1, tables.size()); - assertTrue(ddlTables.containsAll(tables)); - } - - @Test - public void listTablesToMigrateSession() { - SourceDbToSpannerOptions mockOptions = - createOptionsHelper( - Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) - .toString(), - "cart,people"); - ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); - List tables = - PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); - - assertEquals(2, tables.size()); - assertTrue(tables.contains("cart")); - assertTrue(tables.contains("people")); - } - - @Test - public void listTablesToMigrateSessionOverride() { - SourceDbToSpannerOptions mockOptions = - createOptionsHelper( - Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) - .toString(), - "cart"); - ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); - List tables = - PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); - - assertEquals(1, tables.size()); - assertTrue(tables.contains("cart")); - } - - @Test(expected = InvalidOptionsException.class) - public void listTablesToMigrateSessionOverrideInvalid() { - SourceDbToSpannerOptions mockOptions = - createOptionsHelper( - Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) - .toString(), - "asd"); - ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); - List tables = - PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); - } }