-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==18.0.0
Apache Airflow version
3.1.0+astro.1
Operating System
Linux
Deployment
Astronomer
Deployment details
No response
What happened
When a DAG is triggered by Asset event, due to some changes in Airflow 3 the logical_date is not passed through the context anymore with asset triggered dagruns.
Operators BigQueryToGCSOperator and are broken GCSToBigQueryOperator due this issue:
KeyError: 'logical_date'
File "/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920 in run
File "/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1302 in _execute_task
File "/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 416 in wrapper
File "/usr/local/airflow/plugins/operators/gcs_to_bigquery_operator.py", line 27 in execute
File "/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 416 in wrapper
File "/usr/local/lib/python3.12/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 340 in execute
What you think should happen instead
BigQueryToGCSOperator and GCSToBigQueryOperator should be able to generate a job_id regardless of whether it is triggered in a scheduled / manually triggered DAG or through an asset event.
I don't know much about Airflow models, but possibly changing logical_date by dag_run_id would do the work ?
How to reproduce
Create DAG triggered by Asset that has BigQueryToGCSOperator and GCSToBigQueryOperator
Anything else
this issue was fixed for BigQueryInsertJobOperator in #55092, but we have the other 2 with same issue.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct