Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQueryInsertJobOperator fails when there are templated variables in default args #23129

Closed
2 tasks done
leahecole opened this issue Apr 20, 2022 · 12 comments
Closed
2 tasks done
Labels
area:providers kind:bug This is a clearly a bug provider:google Google (including GCP) related issues

Comments

@leahecole
Copy link
Contributor

leahecole commented Apr 20, 2022

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==6.8.0

Apache Airflow version

2.2.3

Operating System

n/a

Deployment

Composer

Deployment details

Hi! I'm using composer-2.0.6-airflow-2.2.3 - it's a Public IP environment without any configuration overrides. This is a super basic sandbox environment I use for testing.

What happened

I was experimenting with the BigQueryInsertJobOperator and had a failure when I tried to utilize Airflow variables within a Job configuration.
Error

google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/%7B%7Bvar.value.gcp_project%7D%7D/jobs?prettyPrint=false: Invalid project ID '{{var.value.gcp_project}}'. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash.

DAG pseudocode
(I copy pasted the relevant bits of my DAG)

  • BQ_DESTINATION_TABLE_NAME and BQ_DESTINATION_DATASET_NAME are strings, not Airflow variables, so they're doing great. WEATHER_HOLIDAYS_JOIN_QUERY is a SQL query also defined as a string and as far as I can tell is also doing great. PROJECT_NAME is using a templated Airflow variable that is defined and is successfully being used in other operators in this and other DAGs.
PROJECT_NAME = '{{var.value.gcp_project}}'
 bq_join_holidays_weather_data = bigquery.BigQueryInsertJobOperator(
        task_id="bq_join_holidays_weather_data",
        configuration={
            "query": {
                "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                        "projectId": PROJECT_NAME,
                        "datasetId": BQ_DESTINATION_DATASET_NAME,
                        "tableId": BQ_DESTINATION_TABLE_NAME
                    }
            }
        },
        location="US", 
    )

Some things I tried/researched
I experimented a little bit with adding "configuration.query.destination_table": "json" to this line but did not have success. Additionally, I checked out the DataprocSubmitJobOperator to see if I could find some clues because I know Dataproc configurations also often have many nested dictionaries and I'm like 90% certain I've templated values there. I had to timebox this though because I do have a workaround (just not using the Airflow variable) and I thought I'd open an issue to see if someone who is more familiar with the underlying template rendering might be able to more easily decipher what's happening

What you think should happen instead

I think that I should be allowed to use an Airflow variable here 😁

How to reproduce

Run a query job using the BigQueryInsertJobOperator that writes the query to a destination table with a fully qualified TableReference object and pass in the projectId parameter as a templated Airflow variable, also having project as a default arg pointing to a templated variable

Anything else

I am willing to submit a PR, but if someone else also wants to, they might be faster than I will, especially between now and the summit

Also, it's been awhile since I submitted an issue and this form is INCREDIBLE well done friends

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@leahecole leahecole added kind:bug This is a clearly a bug area:providers provider:google Google (including GCP) related issues labels Apr 20, 2022
@tirkarthi
Copy link
Contributor

The below test case works where I am passing a variable "gcp_project" whose value is rendered and passed properly during task instance run. Can you please add a full script or a complete dag example to reproduce this?

The handling of dictionary is done here where the templates are rendered recursively in case of a dictionary :

elif isinstance(value, dict):
return {key: self.render_template(value, context, jinja_env) for key, value in value.items()}

@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_execute_params(self, mock_hook, mock_md5, create_task_instance_of_operator):
    Variable.set(key="gcp_project", value="test_gcp_project")
    job_id = "123456"
    hash_ = "hash"
    real_job_id = f"{job_id}_{hash_}"
    mock_md5.return_value.hexdigest.return_value = hash_

    configuration = {
        "query": {
            "query": "SELECT 1",
        },
        "destinationTable": {
            "projectId": '{{var.value.gcp_project}}',
        }
    }

    rendered_configuration = {
        "query": {
            "query": "SELECT 1",
        },
        "destinationTable": {
            "projectId": 'test_gcp_project',
        }
    }

    mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)

    ti = create_task_instance_of_operator(
        BigQueryInsertJobOperator,
        dag_id=TEST_DAG_ID,
        task_id=TASK_ID,
        configuration=configuration,
        location=TEST_DATASET_LOCATION,
        project_id=TEST_GCP_PROJECT_ID,
        job_id=job_id
    )
    ti.run()

    mock_hook.return_value.insert_job.assert_called_once_with(
        configuration=rendered_configuration,
        location=TEST_DATASET_LOCATION,
        job_id=real_job_id,
        project_id=TEST_GCP_PROJECT_ID,
        retry=DEFAULT_RETRY,
        timeout=None,
    )

@leahecole
Copy link
Contributor Author

Hi, yep! Here's a complete DAG

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator



PROJECT_NAME = '{{var.value.gcp_project}}'

BQ_DATASET_NAME="bigquery-public-data.ghcn_d.ghcnd_2021"
BQ_DESTINATION_DATASET_NAME="holiday_weather"
BQ_DESTINATION_TABLE_NAME="holidays_weather_joined"


WEATHER_HOLIDAYS_JOIN_QUERY = f"""
SELECT Holidays.Date, Holiday, id, element, value
FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id LIKE "US%") AS Weather
ON Holidays.Date = Weather.Date;
"""

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': PROJECT_NAME,
    'region': '{{ var.value.gce_region }}',

}

with models.DAG(
        'example_bug',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    bq_join_holidays_weather_data = BigQueryInsertJobOperator(
        task_id="bq_join_holidays_weather_data",
        configuration={
            "query": {
                "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                        "projectId": PROJECT_NAME,
                        "datasetId": BQ_DESTINATION_DATASET_NAME,
                        "tableId": BQ_DESTINATION_TABLE_NAME
                    }
            }
        },
        location="US", 
    )
    bq_join_holidays_weather_data`

@leahecole
Copy link
Contributor Author

UPDATE: It's not the nested dictionary - it's the project name being passed in as the default arg

@leahecole leahecole changed the title BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries BigQueryInsertJobOperator fails when there are templated variables in default args Apr 29, 2022
@potiuk
Copy link
Member

potiuk commented May 8, 2022

Yeah. The default_args should not containt tamplateable I think - I think maybe we could improve it in the future, but I think it should not be needed. Maybe a doc update is needed that you should not use templates in default_args ?

@leahecole
Copy link
Contributor Author

Yeah. The default_args should not containt tamplateable I think - I think maybe we could improve it in the future, but I think it should not be needed. Maybe a doc update is needed that you should not use templates in default_args ?

Makes sense. I've been trying to use templated variables everywhere models.Variables.get because I know that once upon a time (not sure if still true) itw as not great to have models.Variable.get outside of the DAG object because it would make the call to the DB at parse time to populate it. To make code pretty, I just did jinja templating everywhere, but, technically, default_args is part of the DAG object right? So I don't need to worry about that issue in this specific case?

@potiuk
Copy link
Member

potiuk commented May 9, 2022

I've been trying to use templated variables everywhere models.Variables.get because I know that once upon a time (not sure if still true) itw as not great to have models.Variable.get outside of the DAG object because it would make the call to the DB at parse time to populate it

It's still the case.

technically, default_args is part of the DAG object right? So I don't need to worry about that issue in this specific case?

Technically yes, but I think it's not processed by template engine :). Maybe it could but I think it's not and this is the problem. The problem with it is that template engine processes the parameter with JINJA separately for each task and the jinja templated-args are "per task". So processing them with JINJA just before execute() makes sense.

This is a bit different with default args - because they are "per dag" not "per task". While we could agree that this means that tha a copy of each of the default_arg is separately processed by each task, it's not at all obvious because the scope of the default_args is "per dag" rather than "per task". So one could argue - should the default_arg be processed once per dag? or for every task? should we process them also in the tasks that do not use them? Should we allow different values of default arg applied for different tasks (that would be possible if we allow templating).

Also I belive the default args can be aplied at TaskGroup level - which makes it even broader scope :).

I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA" (and I am guessing here - I am not 100% sure) :)

@potiuk
Copy link
Member

potiuk commented May 9, 2022

Also @leahecole TIL - while answering someone's question.

I think you COULD use user-defined macros to achieve what you want: user_defined_macros at DAG level

user_defined_macros = { "project_id" : PROJECT_ID }

....


task_1( project_id = '{{ project_id }}')

...
task_2( project_id = '{{ project_id }}')

@leahecole
Copy link
Contributor Author

Sorry for late response!! I will be checking this out with my intern this summer.

I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA" (and I am guessing here - I am not 100% sure) :)

Re this ^ that definitely makes sense given the scope. I'll make a docs update about templating + default args just in case anyone else tries this 😁

@eladkal
Copy link
Contributor

eladkal commented Jun 15, 2022

I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA"

I don't think this is true. default_args just "unpack" at operator contractor - this is done while parsing the dag not during run time so by the time operator becomes a task the value from default_args is already assigned so the value do pass through the Jinja engine. Am I wrong here?

I used this code:


import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    "bash_command": "echo {{ ds }}"
}

with DAG(
    dag_id='testing_default_args',
    schedule_interval='0 0 * * *',
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    default_args=default_args
) as dag:

    run_this = BashOperator(
        task_id='run_after_loop',
    )

and it worked fine:
Screen Shot 2022-06-15 at 18 50 07

@leahecole in your code example you try to template region, project_id in BigQueryInsertJobOperator but both are not templated fields:

template_fields: Sequence[str] = (
"configuration",
"job_id",
"impersonation_chain",
)

Can you please try it with:

class MyBigQueryInsertJobOperator(BigQueryInsertJobOperator):
    template_fields: Sequence[str] = (
        "region",
        "project_id",
    ) + BigQueryInsertJobOperator.template_fields

@leahecole
Copy link
Contributor Author

Good point @eladkal - it looks like in other BigQuery operators we DO template project ID (for example:

template_fields: Sequence[str] = (
'dataset_id',
'table_id',
'project_id',
'gcs_schema_object',
'labels',
'view',
'materialized_view',
). I tried your suggestion, and it does work (I did location instead of region, which is what BQ expects). Would it make sense to template project_id for all of the BQ operators?

@eladkal
Copy link
Contributor

eladkal commented Jun 30, 2022

Sure we can do that. will you start a PR?
In the meantime I'm closing the issue as there is no bug to fix :)

@eladkal eladkal closed this as not planned Won't fix, can't repro, duplicate, stale Jun 30, 2022
@leahecole
Copy link
Contributor Author

Will do!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug provider:google Google (including GCP) related issues
Projects
None yet
Development

No branches or pull requests

4 participants