Skip to content

Conversation

@lukas-mi
Copy link
Contributor

Summary: this PR fixes the issue of using the default location, namely us-central1, instead of the user-supplied location argument.

location is not passed through in several places in providers/apache/beam/operators/beam.py and providers/google/cloud/operators/dataflow.py causing the dataflow API calls to always use DEFAULT_DATAFLOW_LOCATION = 'us-central1':

  1. During job cancellation when, for example, BeamRunJavaPipelineOperator job is marked as success/failed:
    • This leads to the following exception:
      [2024-08-28, 11:24:29 UTC] {beam.py:657} INFO - Dataflow job with id: `2024-08-28_04_23_24-13750676948168932211` was requested to be cancelled.
      [2024-08-28, 11:24:30 UTC] {taskinstance.py:441} ▼ Post task execution logs
      [2024-08-28, 11:24:30 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
      Traceback (most recent call last):
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
          result = _execute_callable(context=context, **execute_callable_kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
          return execute_callable(context=context, **execute_callable_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
          return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py", line 556, in execute
          return self.execute_sync(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py", line 583, in execute_sync
          self.beam_hook.start_java_pipeline(
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 315, in start_java_pipeline
          self._start_pipeline(
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 206, in _start_pipeline
          run_beam_command(
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 161, in run_beam_command
          process_fd(proc, readable_fd, log, process_line_callback, check_job_status_callback)
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 115, in process_fd
          for line in iter(fd.readline, b""):
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2612, in signal_handler
          self.task.on_kill()
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py", line 658, in on_kill
          self.dataflow_hook.cancel_job(
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 559, in inner_wrapper
          return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 1112, in cancel_job
          jobs_controller.cancel()
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 511, in cancel
          job for job in self.get_jobs() if job["currentState"] not in DataflowJobStatus.TERMINAL_STATES
                      ^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 479, in get_jobs
          self._refresh_jobs()
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 400, in _refresh_jobs
          self._jobs = self._get_current_jobs()
                      ^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 263, in _get_current_jobs
          return [self.fetch_job_by_id(self._job_id)]
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 290, in fetch_job_by_id
          .execute(num_retries=self._num_retries)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
          return wrapped(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/googleapiclient/http.py", line 938, in execute
          raise HttpError(resp, content, uri=self.uri)
      googleapiclient.errors.HttpError: <HttpError 404 when requesting https://dataflow.googleapis.com/v1b3/projects/my-project/locations/us-central1/jobs/2024-08-28_04_23_24-13750676948168932211?alt=json returned "(4f38a283a310abc0): Information about job 2024-08-28_04_23_24-13750676948168932211 could not be found in our system. Please double check that the API being used is projects.locations.jobs.get (https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get). If the API being used is projects.locations.jobs.get, please double check the id (2024-08-28_04_23_24-13750676948168932211) is correct. If it is please contact customer support.". Details: "(4f38a283a310abc0): Information about job 2024-08-28_04_23_24-13750676948168932211 could not be found in our system. Please double check that the API being used is projects.locations.jobs.get (https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/get). If the API being used is projects.locations.jobs.get, please double check the id (2024-08-28_04_23_24-13750676948168932211) is correct. If it is please contact customer support.">
      
    • I've fixed it by passing through location parameter in the calls of cancel_job defined in providers/google/cloud/hooks/dataflow.py
  2. When a new job is launched (with CheckJobRunning.WaitForRun setting), it should wait until the previous job with the same name is cancelled/drained. When non default location is used europe-west1, it's ignored and, thus, the previous job cannot be found at the same location, leading to the new job starting.
    • I tried excplicitly passing the location to is_job_dataflow_running, however, because of the of the @_fallback_to_location_from_variables annotation an exception is raised regarding mutually exclusive parameters being specified:
      [2024-08-28, 11:49:18 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
      Traceback (most recent call last):
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
          result = _execute_callable(context=context, **execute_callable_kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
          return execute_callable(context=context, **execute_callable_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
          return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py", line 556, in execute
          return self.execute_sync(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py", line 574, in execute_sync
          is_running = self.dataflow_hook.is_job_dataflow_running(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/bitnami/airflow/venv/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 125, in inner_wrapper
          raise AirflowException(
      airflow.exceptions.AirflowException: The mutually exclusive parameter `location` and `region` key in `variables` parameter are both present. Please remove one.
      
    • While @_fallback_to_location_from_variables should ensure that a fallback location value is used from variables (pipeline_options) when location is not supplied, this never happens because the default location: str = DEFAULT_DATAFLOW_LOCATION is set in the function, resulting in us-central1 to always be used as location
    • I've fixed this issue by removing @_fallback_to_location_from_variables annotation from is_job_dataflow_running defined in providers/google/cloud/hooks/dataflow.py. I also removed the default location value setting in is_job_dataflow_running because, to my mind, it cause more harm than it is useful. In all the calls of is_job_dataflow_running I passed location values available in scope, which by default is set to location: str = DEFAULT_DATAFLOW_LOCATION anyway when looking at the class constructors.

How my testing task is configured:

start_beam_pipeline = BeamRunJavaPipelineOperator(
    task_id="start-beam-pipeline",
    runner="DataflowRunner",
    jar=jar_location,
    job_class=job_class,
    # https://cloud.google.com/dataflow/docs/reference/pipeline-options#java
    pipeline_options={
        "tempLocation": f"gs://my-bucket/tmp",
        "stagingLocation": f"gs://my-bucket/staging",
        "enableStreamingEngine": True,
        "workerMachineType": "n2d-standard-4",
        "maxNumWorkers": 3,
    },
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/dataflow/index.html#airflow.providers.google.cloud.operators.dataflow.DataflowConfiguration
    dataflow_config=DataflowConfiguration(
        job_name="{{task.task_id}}",
        append_job_name=False,
        project_id="my_project',
        location="europe-west1",
        drain_pipeline=True,
        check_if_running=CheckJobRunning.WaitForRun
    )
)

I'm running Airflow with these changes and I can confirm that BeamRunJavaPipelineOperator behaves as it should, fixing the 2 points above.

I've used the following versions for testing:

  • Airflow: 2.10.0
  • Beam provider: 5.8.0
  • Google provider: 10.22.0

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:apache-beam provider:google Google (including GCP) related issues labels Aug 30, 2024
@lukas-mi lukas-mi force-pushed the bugfix/dataflow-job-location-passing branch 2 times, most recently from 7dac639 to e5ae7ce Compare September 16, 2024 07:16
@lukas-mi
Copy link
Contributor Author

lukas-mi commented Sep 17, 2024

I've tried messaging in #contributors channel last week. Not sure who to ping so @potiuk @jedcunningham @uranusjr 🤔

@lukas-mi lukas-mi force-pushed the bugfix/dataflow-job-location-passing branch from e5ae7ce to bfdca60 Compare September 17, 2024 09:23
@eladkal eladkal requested a review from shahar1 September 18, 2024 14:24
@lukas-mi lukas-mi force-pushed the bugfix/dataflow-job-location-passing branch 5 times, most recently from c74045e to 8cc8d44 Compare September 24, 2024 08:09
@lukas-mi lukas-mi force-pushed the bugfix/dataflow-job-location-passing branch from 8cc8d44 to ee787fc Compare September 24, 2024 08:34
@shahar1 shahar1 self-requested a review September 24, 2024 11:08
@shahar1 shahar1 merged commit eed1d0d into apache:main Sep 24, 2024
@lukas-mi lukas-mi deleted the bugfix/dataflow-job-location-passing branch September 24, 2024 11:35
joaopamaral pushed a commit to joaopamaral/airflow that referenced this pull request Oct 21, 2024
* pass through user specified location to Dataflow API calls

* update non-db tests with location passing

* fix formatting

* add a deprecation warning for the default location usage

* fix formatting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants