-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BigQueryInsertJobOperator fails when there are templated variables in default args #23129
Comments
The below test case works where I am passing a variable "gcp_project" whose value is rendered and passed properly during task instance run. Can you please add a full script or a complete dag example to reproduce this? The handling of dictionary is done here where the templates are rendered recursively in case of a dictionary : airflow/airflow/models/abstractoperator.py Lines 403 to 404 in e9f9d33
@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_execute_params(self, mock_hook, mock_md5, create_task_instance_of_operator):
Variable.set(key="gcp_project", value="test_gcp_project")
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
mock_md5.return_value.hexdigest.return_value = hash_
configuration = {
"query": {
"query": "SELECT 1",
},
"destinationTable": {
"projectId": '{{var.value.gcp_project}}',
}
}
rendered_configuration = {
"query": {
"query": "SELECT 1",
},
"destinationTable": {
"projectId": 'test_gcp_project',
}
}
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
ti = create_task_instance_of_operator(
BigQueryInsertJobOperator,
dag_id=TEST_DAG_ID,
task_id=TASK_ID,
configuration=configuration,
location=TEST_DATASET_LOCATION,
project_id=TEST_GCP_PROJECT_ID,
job_id=job_id
)
ti.run()
mock_hook.return_value.insert_job.assert_called_once_with(
configuration=rendered_configuration,
location=TEST_DATASET_LOCATION,
job_id=real_job_id,
project_id=TEST_GCP_PROJECT_ID,
retry=DEFAULT_RETRY,
timeout=None,
) |
Hi, yep! Here's a complete DAG import datetime
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
PROJECT_NAME = '{{var.value.gcp_project}}'
BQ_DATASET_NAME="bigquery-public-data.ghcn_d.ghcnd_2021"
BQ_DESTINATION_DATASET_NAME="holiday_weather"
BQ_DESTINATION_TABLE_NAME="holidays_weather_joined"
WEATHER_HOLIDAYS_JOIN_QUERY = f"""
SELECT Holidays.Date, Holiday, id, element, value
FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id LIKE "US%") AS Weather
ON Holidays.Date = Weather.Date;
"""
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': PROJECT_NAME,
'region': '{{ var.value.gce_region }}',
}
with models.DAG(
'example_bug',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
bq_join_holidays_weather_data = BigQueryInsertJobOperator(
task_id="bq_join_holidays_weather_data",
configuration={
"query": {
"query": WEATHER_HOLIDAYS_JOIN_QUERY,
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_NAME,
"datasetId": BQ_DESTINATION_DATASET_NAME,
"tableId": BQ_DESTINATION_TABLE_NAME
}
}
},
location="US",
)
bq_join_holidays_weather_data` |
UPDATE: It's not the nested dictionary - it's the project name being passed in as the default arg |
Yeah. The default_args should not containt tamplateable I think - I think maybe we could improve it in the future, but I think it should not be needed. Maybe a doc update is needed that you should not use templates in default_args ? |
Makes sense. I've been trying to use templated variables everywhere |
It's still the case.
Technically yes, but I think it's not processed by template engine :). Maybe it could but I think it's not and this is the problem. The problem with it is that template engine processes the parameter with JINJA separately for each task and the jinja templated-args are "per task". So processing them with JINJA just before execute() makes sense. This is a bit different with default args - because they are "per dag" not "per task". While we could agree that this means that tha a copy of each of the default_arg is separately processed by each task, it's not at all obvious because the scope of the default_args is "per dag" rather than "per task". So one could argue - should the default_arg be processed once per dag? or for every task? should we process them also in the tasks that do not use them? Should we allow different values of default arg applied for different tasks (that would be possible if we allow templating). Also I belive the default args can be aplied at TaskGroup level - which makes it even broader scope :). I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA" (and I am guessing here - I am not 100% sure) :) |
Also @leahecole TIL - while answering someone's question. I think you COULD use user-defined macros to achieve what you want: user_defined_macros at DAG level
|
Sorry for late response!! I will be checking this out with my intern this summer.
Re this ^ that definitely makes sense given the scope. I'll make a docs update about templating + default args just in case anyone else tries this 😁 |
I don't think this is true. I used this code:
@leahecole in your code example you try to template airflow/airflow/providers/google/cloud/operators/bigquery.py Lines 2070 to 2074 in 8e0bdda
Can you please try it with:
|
Good point @eladkal - it looks like in other BigQuery operators we DO template project ID (for example: airflow/airflow/providers/google/cloud/operators/bigquery.py Lines 790 to 797 in 8e0bdda
|
Sure we can do that. will you start a PR? |
Will do! |
Apache Airflow Provider(s)
google
Versions of Apache Airflow Providers
apache-airflow-providers-google==6.8.0
Apache Airflow version
2.2.3
Operating System
n/a
Deployment
Composer
Deployment details
Hi! I'm using composer-2.0.6-airflow-2.2.3 - it's a Public IP environment without any configuration overrides. This is a super basic sandbox environment I use for testing.
What happened
I was experimenting with the BigQueryInsertJobOperator and had a failure when I tried to utilize Airflow variables within a Job configuration.
Error
DAG pseudocode
(I copy pasted the relevant bits of my DAG)
BQ_DESTINATION_TABLE_NAME
andBQ_DESTINATION_DATASET_NAME
are strings, not Airflow variables, so they're doing great.WEATHER_HOLIDAYS_JOIN_QUERY
is a SQL query also defined as a string and as far as I can tell is also doing great.PROJECT_NAME
is using a templated Airflow variable that is defined and is successfully being used in other operators in this and other DAGs.Some things I tried/researched
I experimented a little bit with adding
"configuration.query.destination_table": "json"
to this line but did not have success. Additionally, I checked out the DataprocSubmitJobOperator to see if I could find some clues because I know Dataproc configurations also often have many nested dictionaries and I'm like 90% certain I've templated values there. I had to timebox this though because I do have a workaround (just not using the Airflow variable) and I thought I'd open an issue to see if someone who is more familiar with the underlying template rendering might be able to more easily decipher what's happeningWhat you think should happen instead
I think that I should be allowed to use an Airflow variable here 😁
How to reproduce
Run a
query
job using theBigQueryInsertJobOperator
that writes the query to a destination table with a fully qualifiedTableReference
object and pass in theprojectId
parameter as a templated Airflow variable, also havingproject
as a default arg pointing to a templated variableAnything else
I am willing to submit a PR, but if someone else also wants to, they might be faster than I will, especially between now and the summit
Also, it's been awhile since I submitted an issue and this form is INCREDIBLE well done friends
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: