Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@

_APACHE_BEAM_VERSION_SCRIPT = "import apache_beam; print(apache_beam.__version__)"

# Map defined with option names to flag names for boolean options
# that have a destination(dest) in parser.add_argument() different
# from the flag name and whose default value is `None`.
# or other SDK flags that should allow false value
_FLAG_THAT_SETS_FALSE_VALUE = {
"use_public_ips": "no_use_public_ips",
}

_FLAG_THAT_SETS_FALSE_VALUE_JAVA = {
"usePublicIps": "usePublicIps=false", # Allow False flag value for Java SDK
}


class BeamRunnerType:
"""
Expand All @@ -68,8 +80,10 @@ def beam_options_to_args(options: dict) -> list[str]:
Return a formatted pipeline options from a dictionary of arguments.

The logic of this method should be compatible with Apache Beam:
https://github.com/apache/beam/blob/b56740f0e8cd80c2873412847d0b336837429fb9/sdks/python/
apache_beam/options/pipeline_options.py#L230-L251
https://github.com/apache/beam/blob/77f57d1fc498592089e32701b45505bbdccccd47/sdks/python/
apache_beam/options/pipeline_options.py#L260-L268

WARNING: In case of amending please check the latest main branch implementation!

:param options: Dictionary with options
:return: List of arguments
Expand All @@ -79,12 +93,29 @@ def beam_options_to_args(options: dict) -> list[str]:

args: list[str] = []
for attr, value in options.items():
if value is None or (isinstance(value, bool) and value):
args.append(f"--{attr}")
elif isinstance(value, bool) and not value:
continue
if isinstance(value, bool):
if value:
args.append(f"--{attr}")
elif attr in _FLAG_THAT_SETS_FALSE_VALUE:
# Capture overriding flags, which have a different dest
# from the flag name defined in the parser.add_argument
# Eg: no_use_public_ips, which has the dest=use_public_ips
# different from flag name
flag_that_disables_the_option = _FLAG_THAT_SETS_FALSE_VALUE[attr]
args.append(f"--{flag_that_disables_the_option}")
elif attr in _FLAG_THAT_SETS_FALSE_VALUE_JAVA:
# Capture Java flags that should not be skipped by having
# False value
false_value_flag = _FLAG_THAT_SETS_FALSE_VALUE_JAVA[attr]
args.append(f"--{false_value_flag}")
elif isinstance(value, list):
args.extend([f"--{attr}={v}" for v in value])
elif isinstance(value, dict):
args.append(f"--{attr}={json.dumps(value)}")
elif value is None:
# explicitly skip None values,as later they might be passed as string 'None',
# and override value by default https://github.com/apache/beam/pull/24948
continue
else:
args.append(f"--{attr}={value}")
return args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,14 @@ class TestBeamOptionsToArgs:
"options, expected_args",
[
({"key": "val"}, ["--key=val"]),
({"key": None}, ["--key"]),
({"key": None}, []),
({"key": True}, ["--key"]),
({"key": False}, []),
({"key": ["a", "b", "c"]}, ["--key=a", "--key=b", "--key=c"]),
({"key": {"a_key": "a_val", "b_key": "b_val"}}, ['--key={"a_key": "a_val", "b_key": "b_val"}']),
# Sets False value cases
({"use_public_ips": False}, ["--no_use_public_ips"]),
({"usePublicIps": False}, ["--usePublicIps=false"]),
],
)
def test_beam_options_to_args(self, options, expected_args):
Expand Down