Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1040,9 +1040,12 @@ protected RunnerApi.Pipeline applySdkEnvironmentOverrides(
&& !updated
// don't update if the container image is already configured by DataflowRunner
&& !containerImage.equals(getContainerImageForJob(options))) {
String imageAndTag =
normalizeDataflowImageAndTag(
containerImage.substring(containerImage.lastIndexOf("/")));
containerImage =
DataflowRunnerInfo.getDataflowRunnerInfo().getContainerImageBaseRepository()
+ containerImage.substring(containerImage.lastIndexOf("/"));
+ imageAndTag;
}
environmentBuilder.setPayload(
RunnerApi.DockerPayload.newBuilder()
Expand All @@ -1055,6 +1058,23 @@ protected RunnerApi.Pipeline applySdkEnvironmentOverrides(
return pipelineBuilder.build();
}

static String normalizeDataflowImageAndTag(String imageAndTag) {
if (imageAndTag.startsWith("/beam_java")
|| imageAndTag.startsWith("/beam_python")
|| imageAndTag.startsWith("/beam_go_")) {
int tagIdx = imageAndTag.lastIndexOf(":");
if (tagIdx > 0) {
// For release candidates, apache/beam_ images has rc tag while Dataflow does not
String tag = imageAndTag.substring(tagIdx); // e,g, ":2.xx.0rc1"
int mayRc = tag.toLowerCase().lastIndexOf("rc");
if (mayRc > 0) {
imageAndTag = imageAndTag.substring(0, tagIdx) + tag.substring(0, mayRc);
}
}
}
return imageAndTag;
}

@VisibleForTesting
protected RunnerApi.Pipeline resolveArtifacts(RunnerApi.Pipeline pipeline) {
RunnerApi.Pipeline.Builder pipelineBuilder = pipeline.toBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,45 +1224,32 @@ public void testNoStagingLocationAndNoTempLocationFails() {
DataflowRunner.fromOptions(options);
}

private static RunnerApi.Pipeline containerUrlToPipeline(String url) {
return RunnerApi.Pipeline.newBuilder()
.setComponents(
RunnerApi.Components.newBuilder()
.putEnvironments(
"env",
RunnerApi.Environment.newBuilder()
.setUrn(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
.setPayload(
RunnerApi.DockerPayload.newBuilder()
.setContainerImage(url)
.build()
.toByteString())
.build()))
.build();
}

@Test
public void testApplySdkEnvironmentOverrides() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:latest";
String gcrPythonContainerUrl = "gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest";
options.setSdkHarnessContainerImageOverrides(".*python.*," + gcrPythonContainerUrl);
DataflowRunner runner = DataflowRunner.fromOptions(options);
RunnerApi.Pipeline pipeline =
RunnerApi.Pipeline.newBuilder()
.setComponents(
RunnerApi.Components.newBuilder()
.putEnvironments(
"env",
RunnerApi.Environment.newBuilder()
.setUrn(
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
.setPayload(
RunnerApi.DockerPayload.newBuilder()
.setContainerImage(dockerHubPythonContainerUrl)
.build()
.toByteString())
.build()))
.build();
RunnerApi.Pipeline expectedPipeline =
RunnerApi.Pipeline.newBuilder()
.setComponents(
RunnerApi.Components.newBuilder()
.putEnvironments(
"env",
RunnerApi.Environment.newBuilder()
.setUrn(
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
.setPayload(
RunnerApi.DockerPayload.newBuilder()
.setContainerImage(gcrPythonContainerUrl)
.build()
.toByteString())
.build()))
.build();
RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl);
RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl);
assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline));
}

Expand All @@ -1272,38 +1259,19 @@ public void testApplySdkEnvironmentOverridesByDefault() throws IOException {
String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:latest";
String gcrPythonContainerUrl = "gcr.io/cloud-dataflow/v1beta3/beam_python3.9_sdk:latest";
DataflowRunner runner = DataflowRunner.fromOptions(options);
RunnerApi.Pipeline pipeline =
RunnerApi.Pipeline.newBuilder()
.setComponents(
RunnerApi.Components.newBuilder()
.putEnvironments(
"env",
RunnerApi.Environment.newBuilder()
.setUrn(
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
.setPayload(
RunnerApi.DockerPayload.newBuilder()
.setContainerImage(dockerHubPythonContainerUrl)
.build()
.toByteString())
.build()))
.build();
RunnerApi.Pipeline expectedPipeline =
RunnerApi.Pipeline.newBuilder()
.setComponents(
RunnerApi.Components.newBuilder()
.putEnvironments(
"env",
RunnerApi.Environment.newBuilder()
.setUrn(
BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER))
.setPayload(
RunnerApi.DockerPayload.newBuilder()
.setContainerImage(gcrPythonContainerUrl)
.build()
.toByteString())
.build()))
.build();
RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl);
RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl);
assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline));
}

@Test
public void testApplySdkEnvironmentOverridesRcByDefault() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
String dockerHubPythonContainerUrl = "apache/beam_python3.9_sdk:2.68.0rc2";
String gcrPythonContainerUrl = "gcr.io/cloud-dataflow/v1beta3/beam_python3.9_sdk:2.68.0";
DataflowRunner runner = DataflowRunner.fromOptions(options);
RunnerApi.Pipeline pipeline = containerUrlToPipeline(dockerHubPythonContainerUrl);
RunnerApi.Pipeline expectedPipeline = containerUrlToPipeline(gcrPythonContainerUrl);
assertThat(runner.applySdkEnvironmentOverrides(pipeline, options), equalTo(expectedPipeline));
}

Expand Down
Loading