Bugfix/dataflow job location passing #41887
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary: this PR fixes the issue of using the default
location, namelyus-central1, instead of the user-suppliedlocationargument.locationis not passed through in several places inproviders/apache/beam/operators/beam.pyandproviders/google/cloud/operators/dataflow.pycausing the dataflow API calls to always useDEFAULT_DATAFLOW_LOCATION = 'us-central1':BeamRunJavaPipelineOperatorjob is marked assuccess/failed:locationparameter in the calls ofcancel_jobdefined inproviders/google/cloud/hooks/dataflow.pyCheckJobRunning.WaitForRunsetting), it should wait until the previous job with the same name is cancelled/drained. When non defaultlocationis usedeurope-west1, it's ignored and, thus, the previous job cannot be found at the same location, leading to the new job starting.is_job_dataflow_running, however, because of the of the@_fallback_to_location_from_variablesannotation an exception is raised regarding mutually exclusive parameters being specified:@_fallback_to_location_from_variablesshould ensure that a fallback location value is used fromvariables(pipeline_options) whenlocationis not supplied, this never happens because the defaultlocation: str = DEFAULT_DATAFLOW_LOCATIONis set in the function, resulting inus-central1to always be used aslocation@_fallback_to_location_from_variablesannotation fromis_job_dataflow_runningdefined inproviders/google/cloud/hooks/dataflow.py. I also removed the defaultlocationvalue setting inis_job_dataflow_runningbecause, to my mind, it cause more harm than it is useful. In all the calls ofis_job_dataflow_runningI passed location values available in scope, which by default is set tolocation: str = DEFAULT_DATAFLOW_LOCATIONanyway when looking at the class constructors.How my testing task is configured:
I'm running Airflow with these changes and I can confirm that
BeamRunJavaPipelineOperatorbehaves as it should, fixing the 2 points above.I've used the following versions for testing:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.