Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ public class FlexTemplateDataflowJobResourceManager implements ResourceManager {
private FlexTemplateDataflowJobResourceManager(Builder builder) {
this.parameters = builder.parameters;
this.templateName = builder.templateName;
pipelineLauncher = FlexTemplateClient.builder(CREDENTIALS).build();
if (builder.pipelineLauncher != null) {
pipelineLauncher = builder.pipelineLauncher;
} else {
pipelineLauncher = FlexTemplateClient.builder(CREDENTIALS).build();
}
synchronized (specPaths) {
if (!specPaths.containsKey(builder.templateName)) {
buildAndStageTemplate(
Expand Down Expand Up @@ -113,6 +117,10 @@ public LaunchInfo getJobInfo() {
return jobInfo;
}

public LaunchConfig getLaunchConfig() {
return launchConfig;
}

public String getTemplateName() {
return templateName;
}
Expand All @@ -125,6 +133,7 @@ public static final class Builder {
private String templateName;
private String templateModulePath;
private String additionalMavenProfile;
private PipelineLauncher pipelineLauncher;

private Builder(String jobName) {
this.jobName = jobName;
Expand Down Expand Up @@ -168,6 +177,12 @@ public FlexTemplateDataflowJobResourceManager.Builder withAdditionalMavenProfile
return this;
}

public FlexTemplateDataflowJobResourceManager.Builder withPipelineLauncher(
PipelineLauncher pipelineLauncher) {
this.pipelineLauncher = pipelineLauncher;
return this;
}

public FlexTemplateDataflowJobResourceManager build() {
if (templateName == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.conditions.ConditionCheck;
import org.apache.beam.it.gcp.TemplateLoadTestBase;
import org.apache.beam.it.gcp.dataflow.FlexTemplateDataflowJobResourceManager;
import org.apache.beam.it.gcp.datastream.DatastreamResourceManager;
import org.apache.beam.it.gcp.datastream.JDBCSource;
import org.apache.beam.it.gcp.datastream.MySQLSource;
Expand Down Expand Up @@ -149,51 +149,47 @@ public void runLoadTest(
Stream stream =
createDatastreamResources(
artifactBucket, gcsPrefix, mySQLSource, datastreamResourceManager);

// Setup Parameters
Map<String, String> params =
new HashMap<>() {
{
put("inputFilePattern", getGcsPath(artifactBucket, gcsPrefix));
put("streamName", stream.getName());
put("instanceId", spannerResourceManager.getInstanceId());
put("databaseId", spannerResourceManager.getDatabaseId());
put("projectId", project);
put("deadLetterQueueDirectory", getGcsPath(artifactBucket, dlqGcsPrefix));
put("gcsPubSubSubscription", subscription.toString());
put("dlqGcsPubSubSubscription", dlqSubscription.toString());
put("datastreamSourceType", "mysql");
put("inputFileFormat", "avro");
put("workerMachineType", "n2-standard-4");
}
};
FlexTemplateDataflowJobResourceManager.Builder flexTemplateBuilder =
FlexTemplateDataflowJobResourceManager.builder(testName)
.withTemplateName("Cloud_Datastream_to_Spanner")
.withTemplateModulePath("v2/datastream-to-spanner")
.withPipelineLauncher(pipelineLauncher)
.addParameter("inputFilePattern", getGcsPath(artifactBucket, gcsPrefix))
.addParameter("streamName", stream.getName())
.addParameter("instanceId", spannerResourceManager.getInstanceId())
.addParameter("databaseId", spannerResourceManager.getDatabaseId())
.addParameter("projectId", project)
.addParameter("deadLetterQueueDirectory", getGcsPath(artifactBucket, dlqGcsPrefix))
.addParameter("gcsPubSubSubscription", subscription.toString())
.addParameter("dlqGcsPubSubSubscription", dlqSubscription.toString())
.addParameter("datastreamSourceType", "mysql")
.addParameter("inputFileFormat", "avro")
.addParameter("workerMachineType", "n2-standard-4");

// Add shadow table parameters if shadowTableSpannerResourceManager is not null
if (shadowTableSpannerResourceManager != null) {
params.putAll(
new HashMap<>() {
{
put(
"shadowTableSpannerInstanceId",
shadowTableSpannerResourceManager.getInstanceId());
put(
"shadowTableSpannerDatabaseId",
shadowTableSpannerResourceManager.getDatabaseId());
}
});
flexTemplateBuilder
.addParameter(
"shadowTableSpannerInstanceId", shadowTableSpannerResourceManager.getInstanceId())
.addParameter(
"shadowTableSpannerDatabaseId", shadowTableSpannerResourceManager.getDatabaseId());
}
// Add all parameters for the template
params.putAll(templateParameters);
for (Map.Entry<String, String> entry : templateParameters.entrySet()) {
flexTemplateBuilder.addParameter(entry.getKey(), entry.getValue());
}

LaunchConfig.Builder options = LaunchConfig.builder(getClass().getSimpleName(), SPEC_PATH);
options.addEnvironment("maxWorkers", maxWorkers).addEnvironment("numWorkers", numWorkers);
flexTemplateBuilder
.addEnvironmentVariable("maxWorkers", maxWorkers)
.addEnvironmentVariable("numWorkers", numWorkers);

// Set all environment options
environmentOptions.forEach((key, value) -> options.addEnvironment(key, value));
options.setParameters(params);
environmentOptions.forEach(
(key, value) -> flexTemplateBuilder.addEnvironmentVariable(key, value));

// Act
PipelineLauncher.LaunchInfo jobInfo = pipelineLauncher.launch(project, region, options.build());
PipelineLauncher.LaunchInfo jobInfo = flexTemplateBuilder.build().launchJob();
assertThatPipeline(jobInfo).isRunning();

ConditionCheck[] checks = new ConditionCheck[tables.size()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineOperator.Result;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.conditions.ConditionCheck;
import org.apache.beam.it.gcp.TemplateLoadTestBase;
import org.apache.beam.it.gcp.dataflow.FlexTemplateDataflowJobResourceManager;
import org.apache.beam.it.gcp.secretmanager.SecretManagerResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
Expand Down Expand Up @@ -72,7 +72,6 @@ public class SourceDbToSpannerLTBase extends TemplateLoadTestBase {
protected static final String VPC_NAME = "spanner-wide-row-pr-test-vpc";
protected static final String VPC_REGION = "us-central1";
protected static final String SUBNET_NAME = "regions/" + VPC_REGION + "/subnetworks/" + VPC_NAME;
protected static final Map<String, String> ADDITIONAL_JOB_PARAMS = new HashMap<>();

public SourceDbToSpannerLTBase() {
try {
Expand Down Expand Up @@ -140,34 +139,34 @@ public void runLoadTest(
String outputDirectory =
String.join(
"/", new String[] {testRootDir, gcsResourceManager.runId(), testName, "output"});
Map<String, String> params =
new HashMap<>() {
{
put("projectId", project);
put("instanceId", spannerResourceManager.getInstanceId());
put("databaseId", spannerResourceManager.getDatabaseId());
put("sourceDbDialect", dialect.name());
put("sourceConfigURL", sourceDatabaseResource.getconnectionURL());
put("username", sourceDatabaseResource.username());
put("password", sourceDatabaseResource.password());
put("outputDirectory", "gs://" + artifactBucket + "/" + outputDirectory);
put("jdbcDriverClassName", driverClassName());
put("workerMachineType", "n2-standard-4");
}
};
params.putAll(ADDITIONAL_JOB_PARAMS);
params.putAll(templateParameters);
FlexTemplateDataflowJobResourceManager.Builder flexTemplateBuilder =
FlexTemplateDataflowJobResourceManager.builder(getClass().getSimpleName())
.withTemplateName("Sourcedb_to_Spanner_Flex")
.withTemplateModulePath("v2/sourcedb-to-spanner")
.withPipelineLauncher(pipelineLauncher)
.addParameter("projectId", project)
.addParameter("instanceId", spannerResourceManager.getInstanceId())
.addParameter("databaseId", spannerResourceManager.getDatabaseId())
.addParameter("sourceDbDialect", dialect.name())
.addParameter("sourceConfigURL", sourceDatabaseResource.getconnectionURL())
.addParameter("username", sourceDatabaseResource.username())
.addParameter("password", sourceDatabaseResource.password())
.addParameter("outputDirectory", "gs://" + artifactBucket + "/" + outputDirectory)
.addParameter("jdbcDriverClassName", driverClassName())
.addParameter("workerMachineType", "n2-standard-4");

for (Map.Entry<String, String> entry : templateParameters.entrySet()) {
flexTemplateBuilder.addParameter(entry.getKey(), entry.getValue());
}

// Configure job
LaunchConfig.Builder options =
LaunchConfig.builder(getClass().getSimpleName(), SPEC_PATH)
.addEnvironment("maxWorkers", MAX_WORKERS)
.addEnvironment("numWorkers", NUM_WORKERS)
.setParameters(params);
environmentOptions.forEach(options::addEnvironment);
flexTemplateBuilder
.addEnvironmentVariable("maxWorkers", MAX_WORKERS)
.addEnvironmentVariable("numWorkers", NUM_WORKERS);
environmentOptions.forEach(flexTemplateBuilder::addEnvironmentVariable);

// Act
PipelineLauncher.LaunchInfo jobInfo = pipelineLauncher.launch(project, region, options.build());
PipelineLauncher.LaunchInfo jobInfo = flexTemplateBuilder.build().launchJob();
assertThatPipeline(jobInfo).isRunning();

ConditionCheck[] checks =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.IORedirectUtil;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateLoadTestBase;
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
import org.apache.beam.it.gcp.dataflow.FlexTemplateDataflowJobResourceManager;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
Expand Down Expand Up @@ -197,45 +196,44 @@ public PipelineLauncher.LaunchInfo launchDataflowJob(
throws IOException {
// default parameters

Map<String, String> params =
new HashMap<>() {
{
put(
FlexTemplateDataflowJobResourceManager.Builder flexTemplateBuilder =
FlexTemplateDataflowJobResourceManager.builder(getClass().getSimpleName())
.withTemplateName("Spanner_to_SourceDb")
.withTemplateModulePath("v2/spanner-to-sourcedb")
.withPipelineLauncher(pipelineLauncher)
.addParameter(
"sessionFilePath",
getGcsPath(artifactBucket, "input/session.json", gcsResourceManager));
put("instanceId", spannerResourceManager.getInstanceId());
put("databaseId", spannerResourceManager.getDatabaseId());
put("spannerProjectId", project);
put("metadataDatabase", spannerMetadataResourceManager.getDatabaseId());
put("metadataInstance", spannerMetadataResourceManager.getInstanceId());
put(
getGcsPath(artifactBucket, "input/session.json", gcsResourceManager))
.addParameter("instanceId", spannerResourceManager.getInstanceId())
.addParameter("databaseId", spannerResourceManager.getDatabaseId())
.addParameter("spannerProjectId", project)
.addParameter("metadataDatabase", spannerMetadataResourceManager.getDatabaseId())
.addParameter("metadataInstance", spannerMetadataResourceManager.getInstanceId())
.addParameter(
"sourceShardsFilePath",
getGcsPath(artifactBucket, shardFileName, gcsResourceManager));
put("changeStreamName", "allstream");
put("dlqGcsPubSubSubscription", subscriptionName.toString());
put("deadLetterQueueDirectory", getGcsPath(artifactBucket, "dlq", gcsResourceManager));
put("maxShardConnections", "100");
put("sourceType", sourceType);
put("workerMachineType", "n2-standard-4");
}
};
getGcsPath(artifactBucket, shardFileName, gcsResourceManager))
.addParameter("changeStreamName", "allstream")
.addParameter("dlqGcsPubSubSubscription", subscriptionName.toString())
.addParameter(
"deadLetterQueueDirectory", getGcsPath(artifactBucket, "dlq", gcsResourceManager))
.addParameter("maxShardConnections", "100")
.addParameter("sourceType", sourceType)
.addParameter("workerMachineType", "n2-standard-4");

if (customTransformation != null) {
params.put(
flexTemplateBuilder.addParameter(
"transformationJarPath",
getGcsPath(artifactBucket, customTransformation.jarPath(), gcsResourceManager));
params.put("transformationClassName", customTransformation.classPath());
flexTemplateBuilder.addParameter("transformationClassName", customTransformation.classPath());
}

LaunchConfig.Builder options =
LaunchConfig.builder(getClass().getSimpleName(), TEMPLATE_SPEC_PATH);
options
.addEnvironment("maxWorkers", maxWorkers)
.addEnvironment("numWorkers", numWorkers)
.addEnvironment("additionalExperiments", Collections.singletonList("use_runner_v2"));
flexTemplateBuilder
.addEnvironmentVariable("maxWorkers", maxWorkers)
.addEnvironmentVariable("numWorkers", numWorkers)
.addEnvironmentVariable(
"additionalExperiments", Collections.singletonList("use_runner_v2"));

options.setParameters(params);
PipelineLauncher.LaunchInfo jobInfo = pipelineLauncher.launch(project, region, options.build());
PipelineLauncher.LaunchInfo jobInfo = flexTemplateBuilder.build().launchJob();
return jobInfo;
}

Expand Down
Loading