Skip to content

BeamRunPythonPipelineOperator doesn't work with Google Application Default Credentials ADC #42396

@fpopic

Description

@fpopic

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.3 (problem occurs in latest version as well, will try to download latest and post log as well)

What happened?

When manually submitting an Apache Beam Python job to Google Dataflow runner using BeamRunPythonPipelineOperator

  • without specifying as operator parameter BeamRunPythonPipelineOperator.pipeline_options.service_account_name
  • without having any Google Auth explicit environment variable set GOOGLE_APPLICATION_CREDENTIALS
  • without having any Google Auth explicit environment variable set GCP_PROJECT
  • with previously executing Google Auth gcloud auth application-default login --disable-quota-project
  • with previously executing Google Auth gcloud auth login
  • with previously executing Google Auth gcloud config set project <project>
  • using the default airflow gcp connection google_cloud_default with the following content
    {
      "conn_type": "google_cloud_platform", 
      "extra": {
        "scope": "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/drive,https://www.googleapis.com/auth/bigquery", 
        "project": "<project>", 
        "num_retries": 5
      }
    }
  • with specifying BeamRunPythonPipelineOperator.DataflowConfiguration.project_id

Task gets stuck in apitools that use oauth2client

and try to initiate in-browser Google sign-in, which fails. I don't understand why the authentication flow ends in that execution branch since the credentials of type authorized_userexist in the well-known path ~/.config/gcloud/application_default_credentials.json.

[2024-09-21, 21:05:58 UTC] {taskinstance.py:1328} INFO - Executing <Task(BeamRunPythonPipelineOperator): submit_beam_job> on 2022-01-01 00:00:00+00:00
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:57} INFO - Started process 2105 to run task
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'dag-example-beam-dataflow-python', 'submit_beam_job', 'scheduled__2022-01-01T00:00:00+00:00', '--job-id', '716', '--raw', '--subdir', 'DAGS_FOLDER/dag_example_beam_dataflow_python/dag_example_beam_dataflow_python.py', '--cfg-path', '/tmp/tmprcjz83eb']
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:85} INFO - Job 716: Subtask submit_beam_job
[2024-09-21, 21:05:58 UTC] {task_command.py:414} INFO - Running <TaskInstance: dag-example-beam-dataflow-python.submit_beam_job scheduled__2022-01-01T00:00:00+00:00 [running]> on host 9743bdb39e14
[2024-09-21, 21:05:58 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dag-example-beam-dataflow-python' AIRFLOW_CTX_TASK_ID='submit_beam_job' AIRFLOW_CTX_EXECUTION_DATE='2022-01-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2022-01-01T00:00:00+00:00'
[2024-09-21, 21:05:58 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.

[2024-09-21, 21:05:58 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-09-21, 21:05:58 UTC] {beam.py:198} INFO - {'job_name': 'simple-beam-job-7d346a20', 'project': 'XXX', 'region': 'europe-west1', 'labels': {'airflow-version': 'v2-6-3-composer'}}
[2024-09-21, 21:05:59 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-09-21, 21:05:59 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-09-21, 21:05:59 UTC] {logging_mixin.py:150} WARNING - /opt/python3.8/lib/python3.8/site-packages/google/auth/_default.py:78 UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. See the following page for troubleshooting: https://cloud.google.com/docs/authentication/adc-troubleshooting/user-creds.

[2024-09-21, 21:06:05 UTC] {beam.py:271} INFO - Beam version: 2.50.0
[2024-09-21, 21:06:05 UTC] {beam.py:131} INFO - Running command: python3 /home/airflow/gcs/dags/dag_example_beam_dataflow_python/src/beam/job_example_beam_dataflow_python.py --runner=DataflowRunner --job_name=simple-beam-job-7d346a20 --project=XXX --region=europe-west1 --labels=airflow-version=v2-6-3-composer --worker_machine_type=n1-standard-1 --disk_size_gb=10 --num_workers=1
[2024-09-21, 21:06:05 UTC] {beam.py:142} INFO - Start waiting for Apache Beam process to complete.
[2024-09-21, 21:06:07 UTC] {beam.py:113} INFO - 0

[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - Generating new OAuth credentials ...
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - Your browser has been opened to visit:
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -     https://accounts.google.com/o/oauth2/v2/auth?client_id=XXXX.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8090%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute.readonly+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email&access_type=offline&response_type=code
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - If your browser is on a different machine then exit and re-run this
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - application with the command-line parameter
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -   --noauth_local_webserver
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -

What you think should happen instead?

Airflow should submit a job to Dataflow using Application Default Credentials, the same way standalone Apache Beam Python (without Airflow) submits the job to Dataflow.

I see that Apache Beam already solved that problem apache/beam#15004, hence running Apache Beam Python without Airflow using ADC works.

How to reproduce

Prepare env. variables:

unset GOOGLE_APPLICATION_CREDENTIALS
unset GCP_PROJECT
gcloud auth application-default login
gcloud config set project <project>

and execute the following DAG

# -*- coding: utf-8 -*-
import os
from datetime import datetime
from airflow.models import DAG

from airflow.providers.apache.beam.operators.beam import (
    BeamRunPythonPipelineOperator,
)
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowConfiguration,
)

current_path = os.path.dirname(__file__)

with DAG(
    dag_id="dag-example-beam-dataflow-python-adc",
    default_args={"owner": "airflow"},
    start_date=datetime(2024, 1, 1),
    schedule_interval="@once",
    catchup=True,
) as dag:
    start_dag = EmptyOperator(task_id="start_dag")
    end_dag = EmptyOperator(task_id="end_dag")

    submit_beam_job = BeamRunPythonPipelineOperator(
        task_id="submit_beam_job_with_dataflow_using_adc",
        py_file=os.path.join("job_example_beam_dataflow_python_adc.py"),
        runner="DataflowRunner",
        pipeline_options={
            "temp_location": "<bucket>",
            "staging_location":  "<bucket>",
        },
        dataflow_config=DataflowConfiguration(
            job_name="submit_beam_job_with_dataflow_using_adc",
            project_id="<project_id>",
            location="<location>",
            wait_until_finished=True,
        ),
        do_xcom_push=True,
    )

    start_dag >> submit_beam_job >> end_dag

and Apache Beam job source code job_example_beam_dataflow_python_adc.py

# -*- coding: utf-8 -*-
import argparse
import logging
import apache_beam as beam
from apache_beam import Create
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions


class PrintElementDoFn(beam.DoFn):
    def process(self, element, *args, **kwargs):
        print(f"Processing element {element}.")


def run(argv=None):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = parser.parse_known_args(argv)
    print(known_args.sleep)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)

    p | "create dummy events" >> Create([1]) | "print dummy elements" >> beam.ParDo(
        PrintElementDoFn()
    )

    p.run()


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    run()

Operating System

macOS 14.7

Versions of Apache Airflow Providers

apache-airflow-providers-apache-beam==5.3.0

Deployment

Docker Airflow / Composer Airflow (doesn't matter, problem occurs in the latest version as well).

Deployment details

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions