Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.JobState;
Expand Down Expand Up @@ -420,6 +421,23 @@
};
}

public GcsResourceManager setUpSpannerITGcsResourceManager() {
GcsResourceManager spannerTestsGcsClient;
if (TestProperties.project().equals("cloud-teleport-testing")) {
List<String> bucketList = TestConstants.SPANNER_TEST_BUCKETS;
Random random = new Random();
int randomIndex = random.nextInt(bucketList.size());
String randomBucketName = bucketList.get(randomIndex);
spannerTestsGcsClient =
GcsResourceManager.builder(randomBucketName, getClass().getSimpleName(), credentials)
.build();

Check warning on line 433 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java#L427-L433

Added lines #L427 - L433 were not covered by tests

} else {
spannerTestsGcsClient = gcsClient;

Check warning on line 436 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java#L435-L436

Added lines #L435 - L436 were not covered by tests
}
return spannerTestsGcsClient;

Check warning on line 438 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java#L438

Added line #L438 was not covered by tests
}

private List<String> getModulesBuild(String pomPath) {
List<String> modules = new ArrayList<>();
modules.add("metadata");
Expand Down Expand Up @@ -589,7 +607,10 @@

protected String getGcsPath(String artifactId, GcsResourceManager gcsResourceManager) {
return ArtifactUtils.getFullGcsPath(
artifactBucketName, getClass().getSimpleName(), gcsResourceManager.runId(), artifactId);
gcsResourceManager.getBucket(),
getClass().getSimpleName(),
gcsResourceManager.runId(),

Check warning on line 612 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java#L610-L612

Added lines #L610 - L612 were not covered by tests
artifactId);
}

/** Create the default configuration {@link PipelineOperator.Config} for a specific job info. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.beam.it.gcp;

import java.util.List;

public class TestConstants {
public static final List<String> SPANNER_TEST_BUCKETS =
List.of(

Check warning on line 7 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TestConstants.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TestConstants.java#L5-L7

Added lines #L5 - L7 were not covered by tests
"cloud-teleport-spanner-it-0",
"cloud-teleport-spanner-it-1",
"cloud-teleport-spanner-it-2",
"cloud-teleport-spanner-it-3",
"cloud-teleport-spanner-it-4",
"cloud-teleport-spanner-it-5",
"cloud-teleport-spanner-it-6",
"cloud-teleport-spanner-it-7",
"cloud-teleport-spanner-it-8",
"cloud-teleport-spanner-it-9");
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@
notificationList.clear();
}

public String getBucket() {
return bucket;

Check warning on line 351 in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java

View check run for this annotation

Codecov / codecov/patch

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/storage/GcsResourceManager.java#L351

Added line #L351 was not covered by tests
}

private void consumePages(Page<Blob> firstPage, Consumer<Iterable<Blob>> consumeBlobs) {
Page<Blob> currentPage = firstPage;
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
import org.apache.beam.it.gcp.spanner.matchers.SpannerAsserts;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class DataStreamToSpannerDDLIT extends DataStreamToSpannerITBase {

public static PubsubResourceManager pubsubResourceManager;
public static SpannerResourceManager spannerResourceManager;
public static GcsResourceManager gcsResourceManager;

/**
* Setup resource managers and Launch dataflow job once during the execution of this test class.
Expand All @@ -84,8 +86,9 @@ public void setUp() throws IOException, InterruptedException {
if (jobInfo == null) {
spannerResourceManager = setUpSpannerResourceManager();
pubsubResourceManager = setUpPubSubResourceManager();
gcsResourceManager = setUpSpannerITGcsResourceManager();
createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE);
createAndUploadJarToGcs("DatatypeIT");
createAndUploadJarToGcs("DatatypeIT", gcsResourceManager);
CustomTransformation customTransformation =
CustomTransformation.builder(
"customTransformation.jar", "com.custom.CustomTransformationWithShardForLiveIT")
Expand All @@ -104,7 +107,8 @@ public void setUp() throws IOException, InterruptedException {
}
},
customTransformation,
null);
null,
gcsResourceManager);
}
}
}
Expand All @@ -119,7 +123,8 @@ public static void cleanUp() throws IOException {
for (DataStreamToSpannerDDLIT instance : testInstances) {
instance.tearDownBase();
}
ResourceManagerUtils.cleanResources(spannerResourceManager, pubsubResourceManager);
ResourceManagerUtils.cleanResources(
spannerResourceManager, pubsubResourceManager, gcsResourceManager);
}

@Test
Expand All @@ -134,7 +139,8 @@ public void migrationTestWithAllDatatypeConversionMapping() {
jobInfo,
TABLE1,
"backfill.avro",
"DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeColumns.avro"),
"DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeColumns.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE1)
.setMinRows(2)
.setMaxRows(2)
Expand All @@ -158,12 +164,14 @@ public void migrationTestWithAllDatatypeConversionMapping() {
jobInfo,
TABLE1,
"cdc1.avro",
"DataStreamToSpannerDDLIT/mysql-cdc1-AllDatatypeColumns.avro"),
"DataStreamToSpannerDDLIT/mysql-cdc1-AllDatatypeColumns.avro",
gcsResourceManager),
uploadDataStreamFile(
jobInfo,
TABLE1,
"cdc2.avro",
"DataStreamToSpannerDDLIT/mysql-cdc2-AllDatatypeColumns.avro"),
"DataStreamToSpannerDDLIT/mysql-cdc2-AllDatatypeColumns.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE1)
.setMinRows(1)
.setMaxRows(1)
Expand Down Expand Up @@ -198,7 +206,8 @@ public void migrationTestWithAllDatatypeDefaultMapping() {
jobInfo,
TABLE2,
"backfill.avro",
"DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeColumns2.avro"),
"DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeColumns2.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE2)
.setMinRows(2)
.setMaxRows(2)
Expand All @@ -222,7 +231,8 @@ public void migrationTestWithAllDatatypeDefaultMapping() {
jobInfo,
TABLE2,
"cdc1.avro",
"DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeColumns2.avro"),
"DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeColumns2.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE2)
.setMinRows(1)
.setMaxRows(1)
Expand Down Expand Up @@ -256,7 +266,8 @@ public void migrationTestWithAllDatatypeTransformation() {
jobInfo,
TRANSFORMATION_TABLE,
"backfill.avro",
"DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeTransformation.avro"),
"DataStreamToSpannerDDLIT/mysql-backfill-AllDatatypeTransformation.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TRANSFORMATION_TABLE)
.setMinRows(3)
.setMaxRows(3)
Expand All @@ -280,7 +291,8 @@ public void migrationTestWithAllDatatypeTransformation() {
jobInfo,
TRANSFORMATION_TABLE,
"cdc.avro",
"DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeTransformation.avro"),
"DataStreamToSpannerDDLIT/mysql-cdc-AllDatatypeTransformation.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TRANSFORMATION_TABLE)
.setMinRows(2)
.setMaxRows(2)
Expand Down Expand Up @@ -314,7 +326,8 @@ public void migrationTestWithDatatypeSizeConversion() {
jobInfo,
TABLE3,
"datatypesizes-backfill.avro",
"DataStreamToSpannerDDLIT/DatatypeColumnsWithSizes-backfill.avro"),
"DataStreamToSpannerDDLIT/DatatypeColumnsWithSizes-backfill.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE3)
.setMinRows(2)
.setMaxRows(2)
Expand Down Expand Up @@ -343,7 +356,8 @@ public void migrationTestWithDatatypeSizeReducedConversion() throws IOException
jobInfo,
TABLE4,
"datatypesizes-reduced-backfill.avro",
"DataStreamToSpannerDDLIT/DatatypeColumnsReducedSizes-backfill.avro"),
"DataStreamToSpannerDDLIT/DatatypeColumnsReducedSizes-backfill.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE4)
.setMinRows(1)
.setMaxRows(1)
Expand All @@ -369,7 +383,11 @@ public void migrationTestWithGeneratedColumns() {
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo, TABLE5, "gencols.avro", "DataStreamToSpannerDDLIT/Users.avro"),
jobInfo,
TABLE5,
"gencols.avro",
"DataStreamToSpannerDDLIT/Users.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE5)
.setMinRows(3)
.setMaxRows(3)
Expand All @@ -395,7 +413,11 @@ public void migrationTestWithCharsetConversion() {
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo, TABLE7, "charsets.avro", "DataStreamToSpannerDDLIT/Authors.avro"),
jobInfo,
TABLE7,
"charsets.avro",
"DataStreamToSpannerDDLIT/Authors.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE7)
.setMinRows(3)
.setMaxRows(3)
Expand All @@ -421,7 +443,11 @@ public void migrationTestWithSequenceColumns() {
ChainedConditionCheck.builder(
List.of(
uploadDataStreamFile(
jobInfo, TABLE8, "sequence.avro", "DataStreamToSpannerDDLIT/Singers.avro"),
jobInfo,
TABLE8,
"sequence.avro",
"DataStreamToSpannerDDLIT/Singers.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE8)
.setMinRows(2)
.setMaxRows(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
import org.apache.beam.it.gcp.spanner.matchers.SpannerAsserts;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class DataStreamToSpannerEventsIT extends DataStreamToSpannerITBase {

public static PubsubResourceManager pubsubResourceManager;
public static SpannerResourceManager spannerResourceManager;
public static GcsResourceManager gcsResourceManager;

/**
* Setup resource managers and Launch dataflow job once during the execution of this test class.
Expand All @@ -85,6 +87,7 @@ public void setUp() throws IOException {
if (jobInfo == null) {
spannerResourceManager = setUpSpannerResourceManager();
pubsubResourceManager = setUpPubSubResourceManager();
gcsResourceManager = setUpSpannerITGcsResourceManager();
createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE);
jobInfo =
launchDataflowJob(
Expand All @@ -100,7 +103,8 @@ public void setUp() throws IOException {
}
},
null,
null);
null,
gcsResourceManager);
}
}
}
Expand All @@ -115,7 +119,8 @@ public static void cleanUp() throws IOException {
for (DataStreamToSpannerEventsIT instance : testInstances) {
instance.tearDownBase();
}
ResourceManagerUtils.cleanResources(spannerResourceManager, pubsubResourceManager);
ResourceManagerUtils.cleanResources(
spannerResourceManager, pubsubResourceManager, gcsResourceManager);
}

@Test
Expand All @@ -132,7 +137,8 @@ public void migrationTestWithUpdatesAndDeletes() {
jobInfo,
TABLE1,
"backfill_users.avro",
"DataStreamToSpannerEventsIT/mysql-backfill-Users.avro"),
"DataStreamToSpannerEventsIT/mysql-backfill-Users.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE1)
.setMinRows(2)
.setMaxRows(2)
Expand All @@ -141,7 +147,8 @@ public void migrationTestWithUpdatesAndDeletes() {
jobInfo,
TABLE1,
"cdc_users.avro",
"DataStreamToSpannerEventsIT/mysql-cdc-Users.avro"),
"DataStreamToSpannerEventsIT/mysql-cdc-Users.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE1)
.setMinRows(3)
.setMaxRows(3)
Expand Down Expand Up @@ -178,7 +185,8 @@ public void migrationTestWithInsertsOnly() {
jobInfo,
TABLE2,
"backfill_movie.avro",
"DataStreamToSpannerEventsIT/mysql-backfill-Movie.avro"),
"DataStreamToSpannerEventsIT/mysql-backfill-Movie.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, TABLE2)
.setMinRows(2)
.setMaxRows(2)
Expand Down Expand Up @@ -206,17 +214,20 @@ public void interleavedAndFKAndIndexTest() {
jobInfo,
"Articles",
"mysql_articles.avro",
"DataStreamToSpannerEventsIT/mysql-Articles.avro"),
"DataStreamToSpannerEventsIT/mysql-Articles.avro",
gcsResourceManager),
uploadDataStreamFile(
jobInfo,
"Authors",
"mysql_authors.avro",
"DataStreamToSpannerEventsIT/mysql-Authors.avro"),
"DataStreamToSpannerEventsIT/mysql-Authors.avro",
gcsResourceManager),
uploadDataStreamFile(
jobInfo,
"Books",
"mysql_books.avro",
"DataStreamToSpannerEventsIT/mysql-Books.avro"),
"DataStreamToSpannerEventsIT/mysql-Books.avro",
gcsResourceManager),
SpannerRowsCheck.builder(spannerResourceManager, "Articles")
.setMinRows(4)
.setMaxRows(4)
Expand Down
Loading
Loading