Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,12 @@ def _update_container_image_for_dataflow(beam_container_image_url):
# By default Dataflow pipelines use containers hosted in Dataflow GCR
# instead of Docker Hub.
image_suffix = beam_container_image_url.rsplit('/', 1)[1]

# trim "RCX" as release candidate tag exists on Docker Hub but not GCR
check_rc = image_suffix.lower().split('rc')
if len(check_rc) == 2:
image_suffix = image_suffix[:-2 - len(check_rc[1])]

return names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + image_suffix

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,43 @@ def test_dataflow_container_image_override_prime(self):

self._verify_dataflow_container_image_override(pipeline_options)

def _verify_dataflow_container_image_override_rc(self, pipeline_options):
pipeline = Pipeline(options=pipeline_options)
pipeline | Create([1, 2, 3]) | ParDo(DoFn()) # pylint:disable=expression-not-assigned

dummy_env = DockerEnvironment(
container_image='apache/beam_dummy_name:2.00.0RC10')
proto_pipeline, _ = pipeline.to_runner_api(
return_context=True, default_environment=dummy_env)

# Accessing non-public method for testing.
apiclient.DataflowApplicationClient._apply_sdk_environment_overrides(
proto_pipeline, {}, pipeline_options)

from apache_beam.utils import proto_utils
found_override = False
trimed_rc = True
for env in proto_pipeline.components.environments.values():
docker_payload = proto_utils.parse_Bytes(
env.payload, beam_runner_api_pb2.DockerPayload)
if docker_payload.container_image.startswith(
names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY):
found_override = True
if docker_payload.container_image.split(':')[-1] != '2.00.0':
trimed_rc = False

self.assertTrue(found_override)
self.assertTrue(trimed_rc)

def test_dataflow_container_image_override_rc(self):
pipeline_options = PipelineOptions([
'--experiments=use_runner_v2',
'--temp_location',
'gs://any-location/temp'
])

self._verify_dataflow_container_image_override_rc(pipeline_options)

def _verify_non_apache_container_not_overridden(self, pipeline_options):
pipeline = Pipeline(options=pipeline_options)
pipeline | Create([1, 2, 3]) | ParDo(DoFn()) # pylint:disable=expression-not-assigned
Expand Down